

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將 DynamoDB 與 Amazon Managed Streaming for Apache Kafka 整合
<a name="msk-for-dynamodb"></a>

[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) 可透過全受管、高可用性的 Apache Kafka 服務，輕鬆即時擷取和處理串流資料。

[Apache Kafka](https://kafka.apache.org/) 是一種分散式資料存放區，針對即時擷取和處理串流資料進行最佳化。Kafka 可以處理記錄串流、以記錄產生的順序有效地存放記錄串流，以及發布和訂閱記錄串流。

由於這些功能，Apache Kafka 通常用於建置即時串流資料管道。*資料管道*能夠可靠地處理資料，並將資料從一個系統移動到另一個系統，當透過使用多個支援不同使用案例的資料庫以實現專用資料庫策略時，這可能會是重要的功能。

Amazon DynamoDB 是這些資料管道中的常見目標，可支援使用鍵值或文件資料模型的應用程式，並希望具有幾毫秒一致效能的無限可擴展性。

**Topics**
+ [運作方式](#msk-for-dynamodb-how-it-works)
+ [設定 Amazon MSK 與 DynamoDB 之間的整合](#msk-for-dynamodb-example)
+ [後續步驟](#msk-for-dynamodb-next-steps)

## 運作方式
<a name="msk-for-dynamodb-how-it-works"></a>

Amazon MSK 和 DynamoDB 之間的整合使用 [Lambda 函式](https://docs.aws.amazon.com/lambda/latest/dg/welcome.html)來取用來自 Amazon MSK 的記錄，並將其寫入 DynamoDB。

![本圖表顯示 Amazon MSK 和 DynamoDB 之間的整合，以及 Amazon MSK 如何使用 Lambda 函式來取用記錄並將其寫入 DynamoDB。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-diagram.png)


Lambda 會在內部輪詢 Amazon MSK 中的新訊息，然後同步調用目標 Lambda 函式。Lambda 函式的事件承載包含來自 Amazon MSK 的訊息批次。對於 Amazon MSK 和 DynamoDB 之間的整合，Lambda 函式會將這些訊息寫入 DynamoDB。

## 設定 Amazon MSK 與 DynamoDB 之間的整合
<a name="msk-for-dynamodb-example"></a>

**注意**  
您可以在下列 [GitHub 儲存庫](https://github.com/aws-samples/serverless-streaming-datastore-sample)下載此範例中使用的資源。

下列步驟說明如何設定 Amazon MSK 和 Amazon DynamoDB 之間的範例整合。此範例代表由物聯網 (IoT) 裝置產生並擷取至 Amazon MSK 的資料。當資料導入 Amazon MSK 時，它可以與分析服務或與 Apache Kafka 相容的第三方工具整合，從而實現各種分析使用案例。整合 DynamoDB 也提供個別裝置記錄的索引鍵值查詢。

此範例將示範 Python 指令碼如何將 IoT 感應器資料寫入 Amazon MSK。然後，Lambda 函式會將具有分割區索引鍵「`deviceid`」的項目寫入 DynamoDB。

提供的 CloudFormation 範本將建立下列資源：Amazon S3 儲存貯體、Amazon VPC、Amazon MSK 叢集，以及 AWS CloudShell 用於測試資料操作的 。

若要產生測試資料，請建立 Amazon MSK 主題，然後建立 DynamoDB 資料表。您可以從管理主控台使用 Session Manager 登入 CloudShell 作業系統並執行 Python 指令碼。

執行 CloudFormation 範本後，您可以執行下列操作來完成建置此架構。

1. 執行 CloudFormation 範本 `S3bucket.yaml` 以建立 S3 儲存貯體。對於任何後續指令碼或操作，請在相同區域中執行它們。輸入 `ForMSKTestS3` 做為 CloudFormation 堆疊名稱。  
![本圖顯示 CloudFormation 主控台堆疊建立畫面。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-create-stack.png)

   完成後，記下*輸出*下方的 S3 儲存貯體名稱輸出。您在步驟 3 中將需要此名稱。  
![輸出索引標籤顯示具有 S3 儲存貯體名稱值 for-msk-ddb-sample-466288479681 的 BucketName 金鑰。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-bucket-name.png)

1. 將下載的 ZIP 檔案 `fromMSK.zip` 上傳至您剛建立的 S3 儲存貯體。  
![本圖顯示您可以在 S3 主控台中上傳檔案的位置。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-zip.png)

1. 執行 CloudFormation 範本 `VPC.yaml` 來建立 VPC、Amazon MSK 叢集和 Lambda 函式。在參數輸入畫面上，輸入您在步驟 1 中被要求提供 S3 儲存貯體時建立的 S3 儲存貯體名稱。將 CloudFormation 堆疊名稱設定為 `ForMSKTestVPC`。  
![本圖顯示您在指定 CloudFormation 堆疊詳細資訊時需要填寫的欄位。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-vpc.png)

1. 準備在 CloudShell 中執行 Python 指令碼的環境。您可以在 AWS 管理主控台上使用 CloudShell。如需使用 CloudShell 的詳細資訊，請參閱 [AWS CloudShell入門](https://docs.aws.amazon.com/cloudshell/latest/userguide/getting-started.html)。啟動 CloudShell 後，建立屬於您剛建立的 VPC 的 CloudShell，以連線至 Amazon MSK 叢集。在私有子網路中建立 CloudShell。填寫下列欄位：

   1. **名稱** - 可以設定為任何名稱。例如 **MSK-VPC**

   1. **VPC** - 選取 **MSKTest**

   1. **子網路** - 選取 **MSKTest 私有子網路 (AZ1)**

   1. **SecurityGroup** - 選取 **ForMSKSecurityGroup**  
![CloudShell 介面顯示 ap-southeast-1 環境，並顯示開啟環境選項。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-cshell-1.png)  
![本圖顯示 CloudShell 環境，其中包含您必須指定的欄位。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-cshell-2.png)

   屬於私有子網路的 CloudShell 啟動後，請執行下列命令：

   ```
   pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
   ```

1. 從 S3 儲存貯體下載 Python 指令碼。

   ```
   aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./
   unzip pythonScripts.zip
   ```

1. 檢查管理主控台，並在 Python 指令碼中設定代理程式 URL 和區域值的環境變數。在管理主控台中檢查 Amazon MSK 叢集代理程式端點。  
![叢集 摘要頁面，其中箭頭指向檢視用戶端資訊按鈕。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-view-client-1.png)  
![TODO.](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-view-client-2.png)

1. 在 CloudShell 上設定環境變數。如果您使用的是美國西部 (奧勒岡)：

   ```
   export AWS_REGION="us-west-2"
   export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
   ```

1. 執行下列 Python 指令碼。

   建立 Amazon MSK 主題：

   ```
   python ./createTopic.py
   ```

   建立 DynamoDB 資料表：

   ```
   python ./createTable.py
   ```

   將測試資料寫入 Amazon MSK 主題：

   ```
   python ./kafkaDataGen.py
   ```

1. 檢查所建立 Amazon MSK、Lambda 和 DynamoDB 資源的 CloudWatch 指標，並使用 DynamoDB Data Explorer 驗證存放在 `device_status `資料表中的資料，以確保所有程序都正確執行。如果每個程序執行均無錯誤，您可以檢查從 CloudShell 寫入 Amazon MSK 的測試資料是否也會寫入 DynamoDB。  
![本圖顯示 DynamoDB 主控台，以及現在當您執行掃描時項目如何傳回。](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/msk-dynamodb-explore.png)

1. 當您完成此範例時，請刪除本教學課程中建立的資源。刪除兩個 CloudFormation 堆疊：`ForMSKTestS3` 和 `ForMSKTestVPC`。如果堆疊刪除成功完成，則會刪除所有資源。

## 後續步驟
<a name="msk-for-dynamodb-next-steps"></a>

**注意**  
如果您在遵循此範例時建立了資源，請記得將其刪除，以避免任何非預期的費用。

整合已識別連結 Amazon MSK 和 DynamoDB 的架構，以啟用串流資料以支援 OLTP 工作負載。從這裡，可以將 [DynamoDB 與 OpenSearch Service](OpenSearchIngestionForDynamoDB.md) 連結來實現更複雜的搜尋。請考慮與 EventBridge 整合以因應更複雜的事件驅動需求，以及與 [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html) 等擴充功能整合以因應更高輸送量和更低延遲的需求。