翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Kafka
Apache Kafka (Kafka) アクションは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、Confluent Cloud
注記
このトピックでは、Apache Kafka プラットフォームおよび関連概念について精通していることを前提としています。Apache Kafka の詳細については、「Apache Kafka
要件
このルールアクションには、以下の要件があります。
-
が 、
ec2:CreateNetworkInterface、ec2:DescribeNetworkInterfaces、、、ec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterface、ec2:DescribeVpcAttribute、およびec2:DescribeSecurityGroupsオペレーションを実行するために引き受け AWS IoT ることができる IAMec2:DescribeSubnetsec2:DescribeVpcsロール。このロールは、Kafka ブローカーに到達するために、Amazon Virtual Private Cloud への伸縮自在なネットワークインターフェイスを作成および管理します。詳細については、「必要なアクセスを AWS IoT ルールに付与する」を参照してください。AWS IoT コンソールで、このルールアクションを実行することを に許可 AWS IoT Core するロールを選択または作成できます。
ネットワークインターフェイスの詳細については、Amazon EC2 ユーザーガイドの「Elastic Network Interface」を参照してください。
指定したロールにアタッチされるポリシーは次の例のようになります。
-
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
AWS Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、 が
secretsmanager:GetSecretValueおよびsecretsmanager:DescribeSecretオペレーションを実行するために引き受け AWS IoT Core ることができる IAM ロールを作成する必要があります。指定したロールにアタッチされるポリシーは次の例のようになります。
-
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*" ] } ] }
-
Amazon Virtual Private Cloud (Amazon VPC) 内で Apache Kafka クラスターを実行できます。Apache Kafka Virtual Private Cloud (VPC) 送信先を作成し、サブネットで NAT ゲートウェイを使用して からパブリック Kafka クラスター AWS IoT にメッセージを転送する必要があります。 AWS IoT ルールエンジンは、送信先に記載されている各サブネットにネットワークインターフェイスを作成し、トラフィックを VPC に直接ルーティングします。送信先を指定すると、 AWS IoT ルールエンジンによって VPC ルールアクションが自動的に作成されます。VPC ルールアクションの詳細については、Apache Kafka Virtual Private Cloud (VPC) の送信先 を参照してください。
-
カスタマーマネージド AWS KMS key (KMS キー) を使用して保管中のデータを暗号化する場合、サービスには発信者に代わって KMS キーを使用するアクセス許可が必要です。詳細については、Amazon Managed Streaming for Apache Kafka デベロッパーガイドの Amazon MSK 暗号化を参照してください。
パラメータ
このアクションで AWS IoT ルールを作成するときは、次の情報を指定する必要があります。
- destinationArn
-
Apache Kafka Virtual Private Cloud (VPC) 送信先の Amazon リソースネーム (ARN)。送信先の作成の詳細については、「」を参照してくださいApache Kafka Virtual Private Cloud (VPC) の送信先。
- トピック
-
Kafka のブローカーに送信されるメッセージの Kafka のトピック。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- キー (オプション)
-
Kafka のメッセージキー
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- ヘッダー (オプション)
-
指定した Kafka ヘッダーのリスト。各ヘッダーは、Kafka アクションを作成するときに指定できるキーと値のペア (1 つのキーと 1 つの値) です。これらのヘッダーを使用して、メッセージペイロードを変更せずに IoT クライアントからダウンストリーム Kafka クラスターにデータをルーティングできます。
このフィールドは、置換テンプレートを使用して置換できます。インラインルールの関数を Kafka Action のヘッダーで代替テンプレートとして渡す方法については、「例」を参照してください。詳細については、「置換テンプレート」を参照してください。
注記
バイナリ形式のヘッダーはサポートされていません。
- パーティション (オプション)
-
Kafka のメッセージパーティション。
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。
- clientProperties
-
Apache Kafka プロデューサークライアントのプロパティを定義するオブジェクト。
- acks (オプション)
-
リクエストが完了したとみなされる前に、プロデューサーがサーバーに受信することを求める確認応答の数。
値として 0 を指定すると、プロデューサーはサーバーからの確認応答を待機しなくなります。サーバーがメッセージを受信しない場合、プロデューサーはメッセージの送信を再試行しません。
有効な値は、
-1、0、1、allです。デフォルト値は1です。 - bootstrap.servers
-
Kafka クラスターへの初期接続を確立するために使用されるホストとポートのペア (
host1:port1、host2:port2など) のリスト。 - compression.type (optional)
-
プロデューサーによって生成されるすべてのデータの圧縮タイプ。
有効な値:
none、gzip、snappy、lz4、zstd。デフォルト値はnoneです。 - security.protocol
-
Kafka ブローカーにアタッチするために使用されるセキュリティプロトコル。
有効な値:
SSL、SASL_SSL。デフォルト値はSSLです。 - key.serializer
-
ProducerRecordで提供するキーオブジェクトをバイトに変換する方法を指定します。有効な値:
StringSerializer。 - value.serializer
-
ProducerRecordで提供する値オブジェクトをバイトに変換する方法を指定します。有効な値:
ByteBufferSerializer。 - ssl.truststore
-
base64 形式のトラストストアファイル、または AWS Secrets Manager 内のトラストストアファイルの場所。トラストストアが Amazon 認証機関 (CA) によって信頼されている場合は、この値は必須ではありません。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合、
get_secretSQL 関数を使用してこのフィールドの値を取得できます。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。トラストストアがファイル形式の場合は、SecretBinaryパラメータを使用します。トラストストアが文字列の形式である場合は、SecretStringパラメータを使用します。この値の最大サイズは 65 KB です。
- ssl.truststore.password
-
信頼ストアのパスワード。この値は、トラストストアのパスワードを作成した場合にのみ必要です。
- ssl.keystore
-
キーストアファイル。
security.protocolの値としてSSLを指定する場合、この値は必須です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretBinaryパラメータを使用します。 - ssl.keystore.password
-
キーストアファイルのストアパスワード。
ssl.keystoreの値を指定している場合、この値は必須です。このフィールドの値はプレーンテキストにすることができます。このフィールドは、代替テンプレートもサポートします。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretStringパラメータを使用します。 - ssl.key.password
-
キーストアファイル内のプライベートキーのパスワード。
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretStringパラメータを使用します。 - sasl.mechanism
-
Kafka のブローカーに接続するために使用されるセキュリティメカニズム。この値は、
security.protocolのSASL_SSLを指定する場合に必要です。有効な値:
PLAIN、SCRAM-SHA-512、GSSAPI。注記
SCRAM-SHA-512は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。 - sasl.plain.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのPLAINを指定する場合に必要です。 - sasl.plain.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのPLAINを指定する場合に必要です。 - sasl.scram.username
-
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのSCRAM-SHA-512を指定する場合に必要です。 - sasl.scram.password
-
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのSCRAM-SHA-512を指定する場合に必要です。 - sasl.kerberos.keytab
-
Secrets Manager の Kerberos 認証用のキータブファイル。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、
get_secret関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secretSQL 関数の詳細については、「get_secret(secretId, secretType, key, roleArn)」を参照してください。SecretBinaryパラメータを使用します。 - sasl.kerberos.service.name
-
Apache Kafka が実行される Kerberos プリンシパル名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.krb5.kdc
-
Apache Kafka プロデューサークライアントが接続するキー配布センター (KDC) のホスト名。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.krb5.realm
-
Apache Kafka プロデューサークライアントが接続する領域。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。 - sasl.kerberos.principal
-
Kerberos 対応サービスにアクセスするためのチケットを Kerberos が割り当てることができる一意の Kerberos ID。この値は、
security.protocolのSASL_SSLおよびsasl.mechanismのGSSAPIを指定する場合に必要です。
例
次の JSON 例では、 AWS IoT ルールで Apache Kafka アクションを定義します。次の例では、sourceIp () インライン関数を Kafka Action ヘッダーの代替テンプレートとして渡します。
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Kerberos セットアップに関する重要な注意事項
-
キー配布センター (KDC) は、ターゲット VPC 内のプライベートドメインネームシステム (DNS) を介して解決可能である必要があります。考えられる方法の 1 つは、KDC DNS エントリをプライベートホストゾーンに追加することです。このアプローチの詳細については、「プライベートホストゾーンの使用」を参照してください。
-
各 VPC で DNS 解決が有効になっている必要があります。詳細については、「Using DNS with Your VPC」を参照してください。
-
VPC 送信先のネットワークインターフェイスセキュリティグループとインスタンスレベルのセキュリティグループは、次のポートで VPC 内からのトラフィックを許可する必要があります。
-
ブートストラップブローカーのリスナーポート上の TCP トラフィック (通常は 9092 ですが、9000~9100 の範囲内である必要があります)
-
KDC のポート 88 の TCP および UDP トラフィック
-
-
SCRAM-SHA-512は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。