

# DynamoDB Streams 和生存时间
<a name="time-to-live-ttl-streams"></a>

您可以通过在表中启用 Amazon DynamoDB Streams 并处理已过期项目的流记录来备份或者处理按[生存时间](TTL.md)（TTL）删除的项目。有关更多信息，请参阅 [读取和处理流](Streams.md#Streams.Processing)。

流记录包含用户身份字段`Records[<index>].userIdentity`。

在过期后被生存时间过程删除的项目包含以下字段：
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**注意**  
在全局表中使用 TTL 时，执行 TTL 的区域将设置 `userIdentity` 字段。复制删除操作时，不会在其它区域设置此字段。

以下 JSON 显示单个流记录的相关部分。

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## 使用 DynamoDB Streams 和 Lambda 归档已删除 TTL 的项目
<a name="streams-archive-ttl-deleted-items"></a>

结合使用 [DynamoDB 生存时间（TTL）](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html)、[DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) 和 [AWS Lambda](https://aws.amazon.com/lambda/) 有助于简化数据归档、降低 DynamoDB 存储成本并降低代码复杂性。使用 Lambda 作为流使用者提供了许多优势，最明显的是与 Kinesis Client Library (KCL) 等其他使用者相比，降低了成本。当通过 Lambda 来使用事件时，对 DynamoDB 流的 `GetRecords` API 调用不向您收费，并且 Lambda 可以通过识别流事件中的 JSON 模式来提供事件筛选。借助事件模式内容筛选，您可以定义多达五个不同的筛选条件来控制将哪些事件发送到 Lambda 进行处理。这有助于减少对 Lambda 函数的调用、简化代码并降低总体成本。

尽管 DynamoDB Streams 包含所有数据修改，例如 `Create`、`Modify` 和 `Remove` 操作，但这可能导致不必要地调用归档 Lambda 函数。例如，假设一个每小时有 200 万项数据修改的表流入流中，但其中不到 5％ 的数据修改是将在 TTL 流程中过期而需要归档的项目删除。使用 [Lambda 事件源筛选条件](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，Lambda 函数每小时只调用 100,000 次。事件筛选的结果是，您只需为所需的调用付费。在没有事件筛选的情况下，您需要为获得的 200 万次调用付费。

事件筛选应用于 [Lambda 事件源映射](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)，它是一个从选定事件（DynamoDB 流）读取并调用 Lambda 函数的资源。在下图中，您可以看到 Lambda 函数如何通过流和事件筛选条件使用已删除生存时间的项目。

![\[通过 TTL 流程删除的项目会启动使用流和事件筛选条件的 Lambda 函数。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### DynamoDB 生存时间事件筛选条件模式
<a name="ttl-event-filter-pattern"></a>

将以下 JSON 添加到源映射[筛选标准](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html)仅允许对已删除 TTL 的项目调用 Lambda 函数：

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### 创建 AWS Lambda 事件源映射
<a name="create-event-source-mapping"></a>

使用以下代码段创建筛选的事件源映射，您可以将其连接到表的 DynamoDB 流。每个代码块都包括事件筛选条件模式。

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------