

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# 継続的にストリーミングされたデータを処理するためのストリーミングジョブ


EMR Serverless のストリーミングジョブは、ストリーミングデータをほぼリアルタイムで分析、処理できるジョブモードです。これらの長時間実行されるジョブは、ストリーミングデータをポーリングし、データが到着すると継続的に結果を処理します。ストリーミングジョブは、ほぼリアルタイムの分析、不正検出、レコメンデーションエンジンなど、リアルタイムのデータ処理を必要とするタスクに最適です。EMR Serverless ストリーミングジョブは、組み込みジョブの耐障害性、リアルタイムモニタリング、拡張ログ管理、ストリーミングコネクタとの統合などの最適化を提供します。

ストリーミングジョブのユースケースを次に示します。
+ **ほぼリアルタイムの分析** – Amazon EMR Serverless のストリーミングジョブを使用すると、ストリーミングデータをほぼリアルタイムで処理できるため、ログデータ、センサーデータ、クリックストリームデータなどの継続的なデータストリームに対してリアルタイム分析を実行してインサイトを取得し、最新の情報に基づいてタイムリーな意思決定を行うことができます。
+ **不正検出** – データストリームを分析し、疑わしいパターンや異常が発生したときに特定することにより、ストリーミングジョブを使用して、金融取引、クレジットカードオペレーション、オンラインアクティビティでほぼリアルタイムの不正検出を実行します。
+ **レコメンデーションエンジン** – ストリーミングジョブは、ユーザーアクティビティデータを処理してレコメンデーションモデルを更新できます。そうすることで、行動や好みに基づいてパーソナライズされたリアルタイムのレコメンデーションの可能性が広がります。
+ **ソーシャルメディア分析** – ストリーミングジョブは、ツイート、コメント、投稿などのソーシャルメディアデータを処理できるため、組織はトレンドの監視、感情分析、ブランドの評判の管理をほぼリアルタイムで行うことができます。
+ **モノのインターネット (IoT) 分析** – ストリーミングジョブは、IoT デバイス、センサー、接続された機械からの高速ストリームを処理、分析できるので、異常検出、予知保全、その他の IoT 分析のユースケースを実行します。
+ **クリックストリーム分析** – ストリーミングジョブは、ウェブサイトやモバイルアプリケーションからのクリックストリームデータを処理して分析できます。このようなデータを使用する企業は、分析を実行してユーザーの行動を詳細に把握し、ユーザーエクスペリエンスをパーソナライズし、マーケティングキャンペーンを最適化できます。
+ **ログの監視と分析** – ストリーミングジョブは、サーバー、アプリケーション、ネットワークデバイスからのログデータを処理することもできます。これにより、異常検出、トラブルシューティング、システムの正常性とパフォーマンスが得られます。

**主な利点**

EMR Serverless でのストリーミングジョブは、*ジョブの耐障害性*を自動的に提供します。これは、以下の要素の組み合わせです。
+ **自動再試行** – EMR Serverless は、失敗したジョブをユーザーから手動で入力することなく自動的に再試行します。
+ **アベイラビリティーゾーン (AZ) の耐障害性** – EMR Serverless は、元の AZ で問題が発生するとストリーミングジョブを正常な AZ に自動的に切り替えます。
+ **ログ管理:**
  + **ログローテーション** – ディスクストレージ管理をより効率的にするために、EMR Serverless は長時間のストリーミングジョブのログを定期的にローテーションします。これにより、すべてのディスク領域を消費する可能性のあるログの蓄積を防ぐことができます。
  + **ログの圧縮** — マネージド永続化でログファイルを効率的に管理および最適化できます。圧縮により、マネージド Spark 履歴サーバーを使用する場合のデバッグのエクスペリエンスも向上します。

**サポートされているデータソースとデータシンク**

EMR Serverless は、多数の入力データソースと出力データシンクで動作します。
+ サポートされている入力データソース – Amazon Kinesis Data Streams、Amazon Managed Streaming for Apache Kafka、セルフマネージド Apache Kafka クラスター。デフォルトでは、Amazon EMR リリース 7.1.0 以降には [Amazon Kinesis Data Streams コネクタ](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html)が含まれているため、追加のパッケージを構築またはダウンロードする必要はありません。
+ サポートされている出力データシンク – AWS Glue データカタログテーブル、Amazon S3、Amazon Redshift、MySQL、PostgreSQL Oracle、Oracle、Microsoft SQL、Apache Iceberg、Delta Lake、Apache Hudi。

## 考慮事項と制限事項


ストリーミングジョブを使用するときは、以下の考慮事項と制限事項に留意してください。
+ ストリーミングジョブは、[Amazon EMR リリース 7.1.0 以降](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html)でサポートされています。
+ EMR Serverless ではストリーミングジョブが長時間実行されることが予想されるため、ジョブの実行時間を制限する実行タイムアウトを設定できません。
+ ストリーミングジョブは、[構造化ストリーミングフレームワーク](https://spark.apache.org/streaming/)に構築された Spark エンジンとのみ互換性があります。
+ EMR Serverless はストリーミングジョブを無期限に再試行します。最大試行回数はカスタマイズできません。失敗した試行回数が 1 時間ごとの時間枠で設定されたしきい値を超えた場合、スラッシュ防止が自動的に組み込まれてジョブの再試行が停止します。デフォルトのしきい値は、1 時間あたり 5 回の試行の失敗です。このしきい値は、1～10 回試行するように設定できます。詳細については、「[Job resiliency](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/SECTION-jobs-resiliency.xml.html)」を参照してください。
+ ストリーミングジョブにはランタイムの状態と進行状況を保存するチェックポイントがあるため、EMR Serverless は最新のチェックポイントからストリーミングジョブを再開できます。詳細については、Apache Spark ドキュメントの「[Recovering from failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)」を参照してください。

# ストリーミングジョブの使用開始
開始方法

ストリーミングジョブを使用開始する方法については、以下の手順を参照してください。

1. [「Amazon EMR Serverless の使用を開始する」に従ってアプリケーションを作成します。](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/getting-started.html)アプリケーションは [Amazon EMR リリース 7.1.0](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-710-release.html) 以降を実行する必要があります。

1. アプリケーションの準備ができたら、 `mode`パラメータを に設定`STREAMING`して、次の AWS CLI 例のようにストリーミングジョブを送信します。

   ```
   aws emr-serverless start-job-run \
   --application-id <APPPLICATION_ID> \
   --execution-role-arn <JOB_EXECUTION_ROLE> \
   --mode 'STREAMING' \
   --job-driver '{
       "sparkSubmit": {
           "entryPoint": "s3://<streaming script>",
           "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
           "sparkSubmitParameters": "--conf spark.executor.cores=4
               --conf spark.executor.memory=16g 
               --conf spark.driver.cores=4
               --conf spark.driver.memory=16g 
               --conf spark.executor.instances=3"
       }
   }'
   ```

# サポートされているストリーミングコネクタ
ストリーミングコネクタ

ストリーミングコネクタは、ストリーミングソースからのデータの読み取りを容易にし、ストリーミングシンクにデータを書き込むこともできます。

サポートされているストリーミングコネクタは次のとおりです。

**Amazon Kinesis Data Streams コネクタ**

Apache Spark 用の [Amazon Kinesis Data Streams コネクタ](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-structured-streaming-kinesis.html)を使用すると、Amazon Kinesis Data Streams との間でデータを消費し、データを書き込むストリーミングアプリケーションとパイプラインを構築できます。コネクタは、シャードあたり最大 2 MB/秒の専用の読み取りスループットレートで、拡張ファンアウト消費をサポートします。デフォルトでは、Amazon EMR Serverless 7.1.0 以降にはコネクタが組み込まれているため、追加のパッケージを構築またはダウンロードする必要はありません。コネクタの詳細については、[GitHub の spark-sql-kinesis-connector ページ](https://github.com/awslabs/spark-sql-kinesis-connector/)を参照してください。

以下は、Kinesis Data Streams コネクタの依存関係を使用してジョブ実行を開始する方法の例です。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kinesis-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --jars /usr/share/aws/kinesis/spark-sql-kinesis/lib/spark-streaming-sql-kinesis-connector.jar"
    }
}'
```

Kinesis Data Streams に接続するには、VPC アクセスを使用して EMR Serverless アプリケーションを設定し、VPC エンドポイントを使用してプライベートアクセスを許可するか、NAT Gateway を使用してパブリックアクセスを取得します。詳細については、「[Configuring VPC access](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)」を参照してください。また、ジョブのランタイムロールに、必要とするデータストリームにアクセスするために必要な読み取り権限と書き込み権限があることを確認する必要があります。ジョブランタイムロールの設定方法の詳細については、「[Job runtime roles for Amazon EMR Serverless](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/security-iam-runtime-role.html)」を参照してください。必要なすべてのアクセス許可の完全なリストについては、[GitHub の spark-sql-kinesis-connector ページ](https://github.com/awslabs/spark-sql-kinesis-connector/?tab=readme-ov-file#how-to-use-it)を参照してください。

**Apache Kafka コネクタ**

Spark 構造化ストリーミング用の Apache Kafka コネクタは Spark コミュニティのオープンソースコネクタであり、Maven リポジトリで利用できます。このコネクタにより、Spark 構造化ストリーミングアプリケーションや、セルフマネージド Apache Kafka と Amazon Managed Streaming for Apache Kafka との間でデータを読み書きするのがが容易になります。コネクタの詳細については、Apache Spark ドキュメントの「[Structured Streaming \$1 Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)」を参照してください。

次の例は、ジョブ実行リクエストに Kafka コネクタを含める方法を示しています。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>"
    }
}'
```

Apache Kafka コネクタのバージョンは、EMR Serverless リリースバージョンと対応する Spark バージョンによって異なります。正しい Kafka バージョンを確認するには、「[Structured Streaming \$1 Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)」を参照してください。

IAM 認証で Amazon Managed Streaming for Apache Kafka を使用するには、Kafka コネクタが IAM で Amazon MSK に接続できるように別の依存関係を含めます。詳細については、[GitHub の aws-msk-iam-auth リポジトリ](https://github.com/aws/aws-msk-iam-auth)を参照してください。また、ジョブのランタイムロールに必要な IAM アクセス許可があることを確認する必要があります。次の例は、IAM 認証でコネクタを使用する方法を示しています。

```
aws emr-serverless start-job-run \
--application-id <APPLICATION_ID> \
--execution-role-arn <JOB_EXECUTION_ROLE> \
--mode 'STREAMING' \
--job-driver '{
    "sparkSubmit": {
        "entryPoint": "s3://<Kafka-streaming-script>",
        "entryPointArguments": ["s3://<DOC-EXAMPLE-BUCKET-OUTPUT>/output"],
        "sparkSubmitParameters": "--conf spark.executor.cores=4
                --conf spark.executor.memory=16g 
                --conf spark.driver.cores=4
                --conf spark.driver.memory=16g 
                --conf spark.executor.instances=3
                --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<KAFKA_CONNECTOR_VERSION>,software.amazon.msk:aws-msk-iam-auth:<MSK_IAM_LIB_VERSION>"
    }
}'
```

Amazon MSK から Kafka コネクタと IAM 認証ライブラリを使用するには、VPC アクセスを使用して EMR Serverless アプリケーションを設定します。サブネットにはインターネットアクセスが必要で、Maven の依存関係にアクセスするには NAT Gateway を使用する必要があります。詳細については、「[Configuring VPC access](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/vpc-access.html)」を参照してください。Kafka クラスターにアクセスするには、サブネットにネットワーク接続が必要です。これは、Kafka クラスターがセルフマネージド型かどうか、Amazon Managed Streaming for Apache Kafka を使用しているかどうか、に関係なく当てはまります。

# ストリーミングジョブのログ管理
ログ管理

ストリーミングジョブは、Spark アプリケーションログとイベントログのログローテーション、および Spark イベントログのログ圧縮をサポートしています。これにより、リソースを効果的に管理できます。

**ログローテーション**

ストリーミングジョブは、Spark アプリケーションログとイベントログのログローテーションをサポートします。ログローテーションにより、長時間のストリーミングジョブで大きなログファイルが生成して使用可能なディスク領域をすべて使ってしまうのを防ぎます。ログローテーションはディスクストレージを節約し、ディスク容量が少ないためにジョブが失敗するのを防ぐのに役立ちます。詳細については、「[Rotating logs](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/rotating-logs.html)」を参照してください。

**ログ圧縮**

ストリーミングジョブは、マネージドログ記録が利用可能な場合は必ず Spark イベントログのログ圧縮もサポートします。マネージドログ記録の詳細については、「[Logging with managed storage](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/logging.html#jobs-log-storage-managed-storage)」を参照してください。ストリーミングジョブは長時間実行できるため、イベントデータが時間の経過とともに蓄積されて、ログファイルのサイズが著しく増加する可能性があります。Spark History Server は、これらのイベントを読み取り、Spark アプリケーション UI のメモリにロードします。このプロセスは、特に Amazon S3 に保存されているイベントログが非常に大きい場合、レイテンシーが大きくなりコストが高くなる可能性があります。

ログ圧縮によりイベントログのサイズが小さくなるため、Spark History Server が常に 1 GB を超えるイベントログをロードする必要はなくなります。詳細については、Apache Spark ドキュメントの「[Monitoring and Instrumentation](https://spark.apache.org/docs/latest/monitoring.html)」を参照してください。