

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 从 Amazon Kinesis Data Streams 加载流数据
<a name="integrations-kinesis"></a>

您可以将流数据从 Kinesis Data Streams 加载 OpenSearch 到服务。到达此数据流的新数据将向 Lambda 触发事件通知，这将运行自定义代码以执行索引编制。此节包括一些简单的 Python 示例代码。

## 先决条件
<a name="integrations-kinesis-lambda-prereq"></a>

继续操作之前，必须具有以下资源。


| 先决条件 | 说明 | 
| --- | --- | 
| Amazon Kinesis Data Stream | Lambda 函数的事件源。要了解更多信息，请参阅 [Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)。 | 
| OpenSearch 服务域 | Lambda 函数处理数据之后数据的目的地。有关更多信息，请参阅 [创建 OpenSearch 服务域](createupdatedomains.md#createdomains) | 
| IAM 角色 |  此角色必须具有基本的 OpenSearch 服务、Kinesis 和 Lambda 权限，例如：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "es:ESHttpPost",
        "es:ESHttpPut",
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
      ],
      "Resource": "*"
    }
  ]
}
```     角色必须拥有以下信任关系：   JSON   

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
```     要了解更多信息，请参阅 *IAM 用户手册*中的[创建 IAM 角色](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.html)。  | 

# 创建 Lambda 函数
<a name="integrations-kinesis-lambda"></a>

按照[创建 Lambda 部署程序包](integrations-s3-lambda.md#integrations-s3-lambda-deployment-package)中的说明操作，但创建一个名为 `kinesis-to-opensearch` 的目录并对 `sample.py` 使用以下代码：

```
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth

region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-kine-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'

headers = { "Content-Type": "application/json" }

def handler(event, context):
    count = 0
    for record in event['Records']:
        id = record['eventID']
        timestamp = record['kinesis']['approximateArrivalTimestamp']

        # Kinesis data is base64-encoded, so decode here
        message = base64.b64decode(record['kinesis']['data'])

        # Create the JSON document
        document = { "id": id, "timestamp": timestamp, "message": message }
        # Index the document
        r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
        count += 1
    return 'Processed ' + str(count) + ' items.'
```

编辑 `region` 和 `host` 的变量。

[安装 pip](https://pip.pypa.io/en/stable/installation/)——如果您尚未安装，则使用以下命令安装依赖项：

```
cd kinesis-to-opensearch

pip install --target ./package requests
pip install --target ./package requests_aws4auth
```

然后按照[创建 Lambda 函数](integrations-s3-lambda.md#integrations-s3-lambda-create)中的说明操作，但指定[先决条件](integrations-kinesis.md#integrations-kinesis-lambda-prereq)中的 IAM 角色和以下触发器设置：
+ **Kinesis stream**：您的 Kinesis stream
+ **批处理大小**：100
+ **起始位置**：时间范围

有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/working-with-kinesis.html)。

此时，您已拥有一整套资源：Kinesis 数据流、在流接收新数据并索引该数据之后运行的函数，以及用于搜索和可视化的 OpenSearch 服务域。

# 测试 Lambda 函数
<a name="integrations-kinesis-testing"></a>

创建此函数后，可以通过使用 AWS CLI将新记录添加到数据流来测试它：

```
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1
```

然后使用 OpenSearch 服务控制台或 OpenSearch 仪表板验证是否`lambda-kine-index`包含文档。还可使用以下请求：

```
GET https://domain-name/lambda-kine-index/_search
{
  "hits" : [
    {
      "_index": "lambda-kine-index",
      "_type": "_doc",
      "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
      "_score": 1,
      "_source": {
        "timestamp": 1523648740.051,
        "message": "My test data.",
        "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
      }
    }
  ]
}
```