

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将自定义指标与 Amazon Managed Service for Apache Flink 结合使用
<a name="monitoring-metrics-custom"></a>

适用于 Apache Flink 的托管服务公开了 19 个指标 CloudWatch，包括资源使用量和吞吐量指标。此外，您可以创建自己的指标来跟踪应用程序特定的数据，例如处理事件或访问外部资源。

**Topics**
+ [工作原理](#monitoring-metrics-custom-howitworks)
+ [查看创建映射类的示例](#monitoring-metrics-custom-examples)
+ [查看自定义指标](#monitoring-metrics-custom-examples-viewing)

## 工作原理
<a name="monitoring-metrics-custom-howitworks"></a>

Managed Service for Apache Flink 中的自定义指标使用 Apache Flink 指标系统。Apache Flink 指标具有以下属性：
+ **类型：**指标的类型描述了它如何衡量和报告数据。可用的 Apache Flink 指标类型包括计数、计量表、直方图和计量器。有关 Apache Flink 指标类型的更多信息，请参阅[指标类型](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#metric-types)。
**注意**  
AWS CloudWatch 指标不支持 Histogram Apache Flink 指标类型。 CloudWatch 只能显示计数、仪表和仪表类型的 Apache Flink 指标。
+ **范围：**指标的范围由其标识符和一组键值对组成，这些键值对表示将如何报告该指标。 CloudWatch指标的标识符由以下内容组成：
  + 系统范围，表示报告指标的级别（例如运算符）。
  + 用户范围，用于定义诸如用户变量或指标组名称之类的属性。这些属性是使用[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-java.lang.String-)或[https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-](https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/metrics/MetricGroup.html#addGroup-java.lang.String-)定义的。

  有关指标范围的更多信息，请参阅[范围](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html#scope)。

有关 Apache Flink 指标的更多信息，请参阅 [Apache Flink 文档](https://nightlies.apache.org/flink/flink-docs-release-1.15/)中的[指标](https://nightlies.apache.org/flink/flink-docs-release-1.15/monitoring/metrics.html)。

要在 Managed Service for Apache Flink 中创建自定义指标，您可以从任何通过调用 [https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/functions/RuntimeContext.html#getMetricGroup--) 扩展 `RichFunction` 的用户函数访问 Apache Flink 指标系统。此方法返回一个可用于创建和注册自定义指标的[MetricGroup](https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/metrics/MetricGroup.html)对象。适用于 Apache 的托管服务 Flink 报告使用组密钥`KinesisAnalytics`创建的所有指标。 CloudWatch您定义的自定义指标具有以下特征：
+ 您的自定义指标具有指标名称和组名称。根据 [Prometheus 命名规则](https://prometheus.io/docs/instrumenting/writing_exporters/#naming)，这些名称必须由字母数字字符组成。
+ 您在用户范围（`KinesisAnalytics`指标组除外）中定义的属性将作为 CloudWatch 维度发布。
+ 默认情况下，自定义指标是在该`Application`级别发布的。
+ 维度（任务/运算符/并行度）将根据应用程序的监控级别添加到指标中。您可以使用操作的参数或[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)操作的或[MonitoringConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfiguration.html)参数来设置应用程序的[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)监控级别。[MonitoringConfigurationUpdate](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_MonitoringConfigurationUpdate.html)

## 查看创建映射类的示例
<a name="monitoring-metrics-custom-examples"></a>

以下代码示例演示如何创建映射类以创建和增加自定义指标，以及如何通过将映射类添加到 `DataStream` 对象以在应用程序中实现该映射类。

### 记录计数自定义指标
<a name="monitoring-metrics-custom-examples-recordcount"></a>

以下代码示例演示如何创建映射类，该映射类用于创建对数据流中的记录进行计数的指标（功能与`numRecordsIn`指标相同）：

```
    private static class NoOpMapperFunction extends RichMapFunction<String, String> {
        private transient int valueToExpose = 0;
        private final String customMetricName;
 
        public NoOpMapperFunction(final String customMetricName) {
            this.customMetricName = customMetricName;
        }
 
        @Override
        public void open(Configuration config) {
            getRuntimeContext().getMetricGroup()
                    .addGroup("KinesisAnalytics")
                    .addGroup("Program", "RecordCountApplication")
                    .addGroup("NoOpMapperFunction")
                    .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose);
        }
 
        @Override
        public String map(String value) throws Exception {
            valueToExpose++;
            return value;
        }
    }
```

在前面的示例中，应用程序处理的每条记录的`valueToExpose`变量都会递增。

定义映射类后，您将创建一个实现映射的应用程序内部流：

```
DataStream<String> noopMapperFunctionAfterFilter =
    kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));
```

有关此应用程序的完整代码，请参阅[记录计数自定义指标应用程序](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/RecordCount)。

### 字数计数自定义指标
<a name="monitoring-metrics-custom-examples-wordcount"></a>

以下代码示例演示如何创建映射类，该映射类用于创建对数据流中的字数进行计数的指标：

```
private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("KinesisAnalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
```

在前面的示例中，应用程序处理的每个字的`counter`变量都会递增。

定义映射类后，您将创建一个实现映射的应用程序内部流：

```
// Split up the lines in pairs (2-tuples) containing: (word,1), and
// group by the tuple field "0" and sum up tuple field "1"
DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1);
     
// Serialize the tuple to string format, and publish the output to kinesis sink
wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());
```

有关此应用程序的完整代码，请参阅[字数计数自定义指标应用程序](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/CustomMetrics/WordCount)。

## 查看自定义指标
<a name="monitoring-metrics-custom-examples-viewing"></a>

应用程序的自定义指标显示在控制**AWS/KinesisAnalytics**面板的 CloudWatch Metrics 控制台的**应用程序**指标组下。