

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Managed Service for Apache Flink でカスタムメトリクスを使用する
<a name="monitoring-metrics-custom"></a>

Managed Service for Apache Flinkはリソースの使用量とスループットのメトリックスを含む19のメトリクスをCloudWatch に公開します。さらに、イベントの処理や外部リソースへのアクセスなど、アプリケーション固有のデータを追跡するための独自のメトリクスを作成できます。

**Topics**
+ [仕組み](#monitoring-metrics-custom-howitworks)
+ [マッピングクラスを作成するための例を表示する](#monitoring-metrics-custom-examples)
+ [カスタムメトリクスを表示する](#monitoring-metrics-custom-examples-viewing)

## 仕組み
<a name="monitoring-metrics-custom-howitworks"></a>

Managed Service for Apache Flinkのカスタムメトリクスは Apache Flink メトリックシステムを使用します。Managed Service Flink メトリクスには、以下の属性を持っています。
+ **タイプ:**メトリックのタイプは、データをどのように測定することを説明して報告します。Apache Flink メトリックのタイプには、カウント、ゲージ、ヒストグラム、メーターなどがあります。Apache Flink メトリクスタイプの詳細について、[メトリクスタイプ](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#metric-types)を参照してください。
**注記**  
AWS CloudWatch Metrics は、ヒストグラム Apache Flink メトリクスタイプをサポートしていません。CloudWatch は、カウント、ゲージ、メータータイプの Apache Flink メトリクスのみを表示できます。
+ **スコープ：**メトリクスのスコープは、その識別子と、メトリックスが CloudWatch にどのように報告されるかを示す一連のキーと値のペアで構成されます。メトリクスの ID は次で構成されます。
  + メトリクスが報告されるレベルを示すシステムスコープ (例：オペレーター)。
  + ユーザー変数やメトリックグループ名などの属性を定義するユーザースコープ。これらの属性は[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-)または[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-)によって定義されます。

  スコープの詳細については、[スコップ](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#scope)を参照してください。

Apache Flink メトリクスの詳細については、[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)の[メトリック](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html)を参照してください。

Managed Service for Apache Flinkでカスタムメトリクスを作成するには、`RichFunction`を拡張する任意のユーザー関数から[https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--)を呼び出して、Apache Flink メトリックシステムにアクセスできます。このメソッドは、カスタムメトリクスの作成と登録に使用できる[MetricGroup](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/metrics/MetricGroup.html)オブジェクトを返します。Managed Service for Apache Flink　は、グループキー`KinesisAnalytics`で作成されたすべてのメトリクスを CloudWatchにレポートします。定義したカスタムメトリクスには、次の特徴があります。
+ カスタムメトリックスにはメトリクス名とグループ名があります。これらの名前は、[Prometheus の命名規則](https://prometheus.io/docs/instrumenting/writing_exporters/#naming)に従って英数字で構成する必要があります。
+ ユーザースコープで定義した属性 (`KinesisAnalytics`メトリクスグループを除く)はCloudWatch ディメンションとして公開されます。
+ カスタムメトリックスはデフォルトで　`Application`　レベルで公開されます。
+ アプリケーションの監視レベルに基づいて、ディメンション (タスク/オペレーター/並列処理) がメトリックスに追加されます。[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) アクションの [MonitoringConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfiguration.html) パラメータ、または [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) アクションの [MonitoringConfigurationUpdate](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html) パラメータを使用してアプリケーションのモニタリングレベルを設定します。

## マッピングクラスを作成するための例を表示する
<a name="monitoring-metrics-custom-examples"></a>

以下のコード例は、カスタムメトリックを作成および増加させるマッピングクラスを作成する方法と、そのマッピングクラスを `DataStream` オブジェクトに追加してアプリケーションに実装する方法を示しています。

### レコードカウントのカスタムメトリック
<a name="monitoring-metrics-custom-examples-recordcount"></a>

以下のコード例は、データストリーム内のレコードをカウントするメトリクス (`numRecordsIn`メトリクスと同じ機能)を作成するマッピングクラスの作成方法を示しています。

```
    private static class NoOpMapperFunction extends RichMapFunction<String, String> {
        private transient int valueToExpose = 0;
        private final String customMetricName;
 
        public NoOpMapperFunction(final String customMetricName) {
            this.customMetricName = customMetricName;
        }
 
        @Override
        public void open(Configuration config) {
            getRuntimeContext().getMetricGroup()
                    .addGroup("KinesisAnalytics")
                    .addGroup("Program", "RecordCountApplication")
                    .addGroup("NoOpMapperFunction")
                    .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose);
        }
 
        @Override
        public String map(String value) throws Exception {
            valueToExpose++;
            return value;
        }
    }
```

前の例で、アプリケーションが処理するレコードごとに`valueToExpose`変数はインクリメントされます。

マッピングクラスを定義したら、マップを実装するアプリケーション内ストリームを作成します。

```
DataStream<String> noopMapperFunctionAfterFilter =
    kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
```

このアプリケーションの完全なコードについては、[レコードカウント:カスタムメトリックアプリケーション](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/RecordCount)を参照してください。

### ワードカウントのカスタムメトリック
<a name="monitoring-metrics-custom-examples-wordcount"></a>

次のコード例は、データストリーム内の単語数をカウントするメトリクスを作成するマッピングクラスを作成する方法を示しています。

```
private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("KinesisAnalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
```

前の例では、アプリケーションが処理する単語ごとに`counter`変数はインクリメントされます。

マッピングクラスを定義したら、マップを実装するアプリケーション内ストリームを作成します。

```
// Split up the lines in pairs (2-tuples) containing: (word,1), and
// group by the tuple field "0" and sum up tuple field "1"
DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1);
     
// Serialize the tuple to string format, and publish the output to kinesis sink
wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());
```

このアプリケーションの完全なコードについては、[ワードカウント:カスタムメトリック](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/WordCount)を参照してください。

## カスタムメトリクスを表示する
<a name="monitoring-metrics-custom-examples-viewing"></a>

アプリケーションのカスタムメトリックスは、**AWS/KinesisAnalytics**ダッシュボードの CloudWatch Metrics コンソールの**アプリケーション**メトリクスグループに表示されます。