

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 Amazon Managed Service for Apache Flink 使用自訂指標
<a name="monitoring-metrics-custom"></a>

Managed Service for Apache Flink 向 CloudWatch 公開了 19 個指標，包括資源使用率和輸送量的指標。此外，您可以建立自己的指標來追蹤應用程式特定的資料，例如處理事件或存取外部資源。

**Topics**
+ [運作方式](#monitoring-metrics-custom-howitworks)
+ [檢視建立映射類別的範例](#monitoring-metrics-custom-examples)
+ [檢視自訂指標](#monitoring-metrics-custom-examples-viewing)

## 運作方式
<a name="monitoring-metrics-custom-howitworks"></a>

Managed Service for Apache Flink 中的自訂指標使用 Apache Flink 指標系統。Apache Flink 指標具有下列屬性：
+ **類型：**指標的類型說明衡量和報告資料的方式。可用的 Apache Flink 指標類型包括「計數」、「量計」、「長條圖」和「計量」。如需 Apache Flink 指標類型的詳細資訊，請參閱[指標類型](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#metric-types)。
**注意**  
AWS CloudWatch Metrics 不支援直方圖 Apache Flink 指標類型。CloudWatch 只顯示「計數」、「量計」和「計量」類型的 Apache Flink 指標。
+ **範圍：**指標的範圍包含其識別碼和一組指示如何向 CloudWatch 報告指標的鍵值對。指標的識別碼包含下列項目：
  + 系統範圍，指出報告指標的層級 (例如「運算子」)。
  + 使用者範圍，定義諸如使用者變數或指標群組名稱等屬性。這些屬性使用 [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-) 或 [https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-) 定義。

  如需指標範圍的詳細資訊，請參閱[範圍](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#scope)。

如需 Apache Flink 指標的詳細資訊，請參閱 [Apache Flink 文件](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[指標](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html)。

若要在 Managed Service for Apache Flink 中建立自訂指標，您可以從任何透過呼叫 [https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--) 來擴充 `RichFunction` 的使用者函數存取 Apache Flink 指標系統。此方法會傳回 [MetricGroup](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/metrics/MetricGroup.html) 物件，您可以用它來建立和註冊自訂指標。Managed Service for Apache Flink 會將使用群組索引鍵 `KinesisAnalytics` 建立的所有指標報告給 CloudWatch。您定義的自訂指標具有下列特性：
+ 您的自訂指標具有指標名稱和群組名稱。這些名稱必須包含根據 [Prometheus 命名規則](https://prometheus.io/docs/instrumenting/writing_exporters/#naming)的英數字元。
+ 您在使用者範圍中定義的屬性 (`KinesisAnalytics` 指標群組除外) 會發佈為 CloudWatch 維度。
+ 依預設，自訂指標會在 `Application` 層級發佈。
+ 維度 (任務/運算子/平行處理層級) 會根據應用程式的監控層級新增至指標。您可以使用 [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) 動作的 [MonitoringConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfiguration.html) 參數或 [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) 動作的 [MonitoringConfigurationUpdate](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html) 參數來設定應用程式的監控層級。

## 檢視建立映射類別的範例
<a name="monitoring-metrics-custom-examples"></a>

下列程式碼範例示範如何建立建立並遞增自訂指標的映射類別，以及如何透過將映射類別新增至`DataStream`物件，在應用程式中實作映射類別。

### 記錄計數自訂指標
<a name="monitoring-metrics-custom-examples-recordcount"></a>

下列程式碼範例示範如何建立映射類別，以建立可計算資料串流中記錄數目的指標 (功能與 `numRecordsIn` 指標相同)：

```
    private static class NoOpMapperFunction extends RichMapFunction<String, String> {
        private transient int valueToExpose = 0;
        private final String customMetricName;
 
        public NoOpMapperFunction(final String customMetricName) {
            this.customMetricName = customMetricName;
        }
 
        @Override
        public void open(Configuration config) {
            getRuntimeContext().getMetricGroup()
                    .addGroup("KinesisAnalytics")
                    .addGroup("Program", "RecordCountApplication")
                    .addGroup("NoOpMapperFunction")
                    .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose);
        }
 
        @Override
        public String map(String value) throws Exception {
            valueToExpose++;
            return value;
        }
    }
```

在上述範例中，`valueToExpose` 變數會針對應用程式處理的每筆記錄遞增。

定義映射類別之後，您可以建立實作對應的應用程式內串流：

```
DataStream<String> noopMapperFunctionAfterFilter =
    kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
```

如需此應用程式的完整程式碼，請參閱[記錄計數自訂指標應用程式](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/RecordCount)。

### 單字計數自訂指標
<a name="monitoring-metrics-custom-examples-wordcount"></a>

下列程式碼範例示範如何建立映射類別，以建立可計算資料串流中字數的指標：

```
private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("KinesisAnalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
```

在上述範例中，`counter` 變數會針對應用程式處理的每個單字遞增。

定義映射類別之後，您可以建立實作對應的應用程式內串流：

```
// Split up the lines in pairs (2-tuples) containing: (word,1), and
// group by the tuple field "0" and sum up tuple field "1"
DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1);
     
// Serialize the tuple to string format, and publish the output to kinesis sink
wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());
```

如需此應用程式的完整程式碼，請參閱[單字計數自訂指標應用程式](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/WordCount)。

## 檢視自訂指標
<a name="monitoring-metrics-custom-examples-viewing"></a>

應用程式的自訂指標會顯示在 CloudWatch 指標主控台 **AWS/KinesisAnalytics** 儀表板的**應用程式**指標群組下。