

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Flink オートスケーラー
<a name="flink-autoscaler"></a>

## 概要:
<a name="flink-autoscaler-overview"></a>

Amazon EMR リリース 6.15.0 以降では、Flink オートスケーラーがサポートされています。ジョブオートスケーラー機能は、実行中の Flink ストリーミングジョブからメトリクスを収集し、個々のジョブ頂点を自動的にスケーリングします。これにより、バックプレッシャーが軽減され、設定した使用率目標を達成できます。

詳細については、「Apache Flink Kubernetes Operator ドキュメント」の「[オートスケーラー](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/)」セクションを参照してください。

## 考慮事項
<a name="flink-autoscaler-considerations"></a>
+ Flink オートスケーラーは、Amazon EMR 6.15.0 以降でサポートされています。
+ Flink Autoscaler は、ストリーミングジョブでのみサポートされています。
+ アダプティブスケジューラーのみがサポートされます。デフォルトのスケジューラーはサポートされていません。
+ 動的なリソースプロビジョニングを可能にするために、クラスタースケーリングを有効化することを推奨します。メトリクスの評価は 5～10 秒ごとに行われるため、Amazon EMR Managed Scaling が推奨されます。この間隔では、クラスターは必要なクラスターリソースの変化により簡単に適応できます。

## オートスケーラーを有効にする
<a name="flink-autoscaler-start"></a>

EC2 クラスターで Amazon EMR を作成するときに、次のステップを使用して Flink オートスケーラーを有効にします。

1. Amazon EMR コンソールで、EMR クラスターを新規作成します。

   1. Amazon EMR リリース `emr-6.15.0` 以降を選択します。**[Flink]** アプリケーションバンドルを選択し、クラスターに含めるその他のアプリケーションを選択します。  
![\[Application bundle options for Amazon EMRクラスター, with Flink highlighted and selected.\]](http://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/images/emr-flink-cluster-create.png)

   1. **[クラスターのスケーリングとプロビジョニング]** で **[EMR マネージドスケーリングを使用]** を選択します。  
![\[クラスター scaling options: manual, EMR-managed (selected), or custom automatic scaling.\]](http://docs.aws.amazon.com/ja_jp/emr/latest/ReleaseGuide/images/emr-flink-cluster-managedscaling.png)

1. **[ソフトウェア設定]** セクションで、次の設定を入力して Flink オートスケーラーを有効にします。テストシナリオでは、検証を容易にするために、決定間隔、メトリクスウィンドウ間隔、および安定化間隔を低い値に設定し、ジョブがすぐにスケーリングを決定できるようにします。

   ```
   [
     {
       "Classification": "flink-conf",
       "Properties": {
         "job.autoscaler.enabled": "true",
         "jobmanager.scheduler": "adaptive",
         "job.autoscaler.stabilization.interval": "60s",
         "job.autoscaler.metrics.window": "60s",
         "job.autoscaler.decision.interval": "10s",
         "job.autoscaler.debug.logs.interval": "60s"
       }
     }
   ]
   ```

1. 必要に応じて他の設定を選択または構成し、Flink オートスケーラー対応クラスターを作成します。

## オートスケーラーの設定
<a name="flink-autoscaler-config"></a>

このセクションでは、特定のニーズに基づいて変更できる構成のうちほとんどについて説明します。

**注記**  
`time`、`interval`、`window` 設定のような時間ベースの構成では、単位が指定されていない場合のデフォルト単位はミリ秒です。そのため、サフィックスなしの `30` の値は 30 ミリ秒になります。他の時間単位には、秒には `s`、分には `m`、時間には `h` という適切なサフィックスを含めてください。

**Topics**
+ [ループ設定](#flink-autoscaler-config-loop)
+ [メトリクスと履歴の設定](#flink-autoscaler-config-metrics)
+ [頂点の設定](#flink-autoscaler-config-vertex)
+ [バックログ設定](#flink-autoscaler-config-backlog)
+ [スケール操作設定](#flink-autoscaler-config-scale)

### オートスケーラーのループ設定
<a name="flink-autoscaler-config-loop"></a>

オートスケーラーは、設定可能な数回の時間間隔ごとにジョブ頂点レベルのメトリクスを取得してスケールアクションに変換し、新しいジョブ頂点の並列処理を推定して、ジョブスケジューラーに推奨します。メトリクスは、ジョブの再起動時間とクラスターの安定化間隔が過ぎた後にのみ収集されます。


| 設定キー | デフォルトの値 | 説明 | 値の例 | 
| --- | --- | --- | --- | 
| job.autoscaler.enabled | false | Flink クラスターでオートスケーリングを有効にします。 | true, false | 
| job.autoscaler.decision.interval | 60s | オートスケーラーの決定間隔。 | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.restart.time | 3m | オペレーターが履歴から確実に再起動を判断できるようになるまでの予想される再起動時間。 | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.stabilization.interval | 300s | 新しいスケーリングが実行されない安定化期間。 | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.debug.logs.interval | 300s | オートスケーラーのデバッグログの間隔。 | 30 (デフォルト単位はミリ秒)、5m、1h | 

### メトリクスの集計と履歴の設定
<a name="flink-autoscaler-config-metrics"></a>

オートスケーラーはメトリクスを取得し、時間ベースのスライディングウィンドウに沿って集計し、評価してスケーリングを決定します。各ジョブ頂点のスケーリング決定履歴は、新しい並列処理の見積もりに利用されます。これらには、時間ベースの有効期限と履歴サイズ (少なくとも 1) の両方があります。


| 設定キー | デフォルトの値 | 説明 | 値の例 | 
| --- | --- | --- | --- | 
| job.autoscaler.metrics.window | 600s | Scaling metrics aggregation window size. | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.history.max.count | 3 | 頂点ごとに保持できる過去のスケーリング決定の最大数。 | 1～Integer.MAX\$1VALUE | 
| job.autoscaler.history.max.age | 24h | 頂点ごとに保持する過去のスケーリング決定の最小数。 | 30 (デフォルト単位はミリ秒)、5m、1h | 

### ジョブ頂点レベルの設定
<a name="flink-autoscaler-config-vertex"></a>

各ジョブの頂点の並列処理は、ターゲットの使用率に基づいて変更され、最小/最大並列処理の制限が適用されます。目標使用率を 100% に近い値 (つまり 1) に設定することは推奨されず、使用率境界は中間の負荷変動を処理するバッファーの役割を果たします。


| 設定キー | デフォルトの値 | 説明 | 値の例 | 
| --- | --- | --- | --- | 
| job.autoscaler.target.utilization | 0.7 | 目標とする頂点使用率。 | 0 - 1 | 
| job.autoscaler.target.utilization.boundary | 0.4 | 目標とする頂点利用率境界。現在の処理速度が [target\$1rate / (target\$1utilization - boundary) および (target\$1rate / (target\$1utilization \$1 boundary)] 以内の場合、スケーリングは実行されません。 | 0 - 1 | 
| job.autoscaler.vertex.min-parallelism | 1 | オートスケーラーが使用できる最低限の並列処理。 | 0 - 200 | 
| job.autoscaler.vertex.max-parallelism | 200 | オートスケーラーが使用できる最大並列処理。この制限は、Flink 構成または各オペレーターで直接構成された最大並列処理よりも高い場合、無視されることに注意してください。 | 0 - 200 | 

### バックログ処理設定
<a name="flink-autoscaler-config-backlog"></a>

ジョブ頂点には、スケール操作期間中に蓄積される保留中のイベント、つまりバックログを処理するための追加リソースが必要です。これは `catch-up` 期間とも呼ばれます。バックログの処理時間が設定された `lag -threshold` 値を超えると、ジョブ頂点ターゲットの使用率は最大レベルまで増加します。これにより、バックログの処理中に不要なスケーリング操作が行われるのを防ぐことができます。


| 設定キー | デフォルトの値 | 説明 | 値の例 | 
| --- | --- | --- | --- | 
| job.autoscaler.backlog-processing.lag-threshold | 5m | ラグのしきい値。ラグの原因となる保留中のメッセージを削除しながら、不要なスケーリングを防止します。 | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.catch-up.duration | 15m | スケーリング操作後にバックログを完全に処理するまでの目標時間。0 に設定すると、バックログベースのスケーリングが無効になります。 | 30 (デフォルト単位はミリ秒)、5m、1h | 

### スケール操作設定
<a name="flink-autoscaler-config-scale"></a>

オートスケーラーは、猶予期間内のスケールアップ操作の直後には、スケールダウン操作を実行しません。これにより、一時的な負荷変動によって発生する、スケールアップ、スケールダウンが繰り返し発生する不要なサイクルを防ぐことができます。

スケールダウン操作比率を利用して並列処理を徐々に減らし、一時的な負荷の急上昇に対応するためにリソースを解放することができます。また、大規模なスケールダウンの後に不要かつマイナーなスケールアップ操作が行われるのを防ぐのにも役立ちます。

過去のジョブ頂点スケーリング決定履歴に基づいて無効なスケールアップ操作を検出し、さらなる並列処理の変更を防ぐことができます。


| 設定キー | デフォルトの値 | 説明 | 値の例 | 
| --- | --- | --- | --- | 
| job.autoscaler.scale-up.grace-period | 1h | 頂点をスケールアップした後に、その頂点をスケールダウンできない期間。 | 30 (デフォルト単位はミリ秒)、5m、1h | 
| job.autoscaler.scale-down.max-factor | 0.6 | 最大スケールダウン係数。1 の値は、スケールダウンに制限がなく、0.6 はジョブを元の並列処理の 60% でのみスケールダウンできることを意味します。 | 0 - 1 | 
| job.autoscaler.scale-up.max-factor | 100000. | 最大スケールアップ率。2.0 の値は、現在の並列処理の 200% でのみジョブをスケールアップできることを意味します。 | 0 - Integer.MAX\$1VALUE | 
| job.autoscaler.scaling.effectiveness.detection.enabled | false | 効果のないスケーリング操作の検出を有効にし、オートスケーラーが今後のスケールアップをブロックできるようにするかどうか。 | true, false | 