

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建并运行适用于 Apache Flink 的托管服务应用程序
<a name="gs-table-create"></a>

在本练习中，您将创建 Managed Service for Apache Flink 应用程序，并将数据流作为源和接收器。

**Topics**
+ [

## 创建相关资源
](#gs-table-resources)
+ [

## 设置本地开发环境
](#gs-table-2)
+ [

## 下载并检查 Apache Flink 流式处理 Java 代码
](#gs-table-5)
+ [

## 在本地运行应用程序
](#gs-table-run-locally)
+ [

## 观察将数据写入 S3 存储桶的应用程序
](#gs-table-input-output)
+ [

## 停止在本地运行的应用程序
](#gs-table-stop)
+ [

## 编译并打包您的应用程序代码
](#gs-table-5.5)
+ [

## 上传应用程序代码 JAR 文件
](#gs-table-6)
+ [

## 创建并配置 Managed Service for Apache Flink 应用程序
](#gs-table-7)

## 创建相关资源
<a name="gs-table-resources"></a>

在本练习中创建 Managed Service for Apache Flink之前，您需要创建以下从属资源：
+ 存储应用程序代码和写入应用程序输出的 Amazon S3 存储桶。
**注意**  
本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域，则必须相应地调整所有步骤。

### 创建 Amazon S3 存储桶
<a name="gs-table-resources-s3"></a>

您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明，请参阅以下主题：
+ *《Amazon Simple Storage Service 用户指南》*中的[如何创建 S3 存储桶？](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)。附加您的登录名，以便为 Amazon S3 存储桶指定全局唯一的名称。
**注意**  
请确保在用于本教程的区域中创建存储桶。教程的默认区域为 us-east-1。

### 其他 资源
<a name="gs-table-resources-cw"></a>

在您创建应用程序时，适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源（如果这些资源尚不存在）：
+ 名为 `/AWS/KinesisAnalytics-java/<my-application>` 的日志组。
+ 名为 `kinesis-analytics-log-stream` 的日志流。

## 设置本地开发环境
<a name="gs-table-2"></a>

对于开发和调试，您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。使用 Maven 作为普通的 Java 依赖项处理任何 Apache Flink 依赖项。

**注意**  
在开发计算机上，必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境（如 [Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) 或 [IntelliJ IDEA](https://www.jetbrains.com/idea/)）。要验证您是否满足所有先决条件，请参阅 [满足完成练习的先决条件](getting-started.md#setting-up-prerequisites)。您**无**需在计算机上安装 Apache Flink 集群。

### 对您的 AWS 会话进行身份验证
<a name="get-started-exercise-table-authenticate"></a>

该应用程序使用 Kinesis 数据流来发布数据。在本地运行时，您必须拥有有效的 AWS 经过身份验证的会话，并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证：

1. 如果您没有配置带有有效凭据 AWS CLI 的命名配置文件，请参阅[设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

1. 如果您的 IDE 有要集成的插件 AWS，则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息，请参阅[适用于 IntelliJ IDEA 的AWS Toolkit](https://aws.amazon.com/intellij/) 和[用于编译应用程序或运行 Eclipse 的AWS Toolkit](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)。

## 下载并检查 Apache Flink 流式处理 Java 代码
<a name="gs-table-5"></a>

此示例的应用程序代码可从中获得 GitHub。

**下载 Java 应用程序代码**

1. 使用以下命令克隆远程存储库：

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. 导航到 `./java/GettingStartedTable` 目录。

### 审核应用程序组件
<a name="get-started-exercise-table-components"></a>

该应用程序完全在 `com.amazonaws.services.msf.BasicTableJob` 类中实施。`main()` 方法定义源、转换和接收器。由此方法末尾的执行语句启动执行。

**注意**  
为获得最佳的开发人员体验，该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行，以便在您的 IDE 中进行开发。
+ 要读取运行时配置，使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行，应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下，应用程序加载运行时配置的方式会有所不同：

  1. 当应用程序检测到其在 IDE 中以独立模式运行时，会形成包含在项目 **resources** 文件夹中的 `application_properties.json` 文件。该文件的内容如下所示。

  1. 当应用程序在 Amazon Managed Service for Apache Flink 中运行时，默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅[创建并配置 Managed Service for Apache Flink 应用程序](get-started-exercise.md#get-started-exercise-7)。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` 方法定义应用程序数据流程并运行它。
  + 初始化默认的流环境。在此示例中，我们展示了如何创建`StreamExecutionEnvironment`要用于 DataStream API 的，以及`StreamTableEnvironment`要用于 SQL 和表 API 的。这两个环境对象是对同一个运行时环境的两个单独引用，用法不同 APIs。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + 加载应用程序配置参数。这将自动从正确的位置加载这些参数，具体取决于应用程序的运行位置：

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + 当 Flink 完成[检查点](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing)时，应用程序用于将结果写入 Amazon S3 输出文件的[FileSystem 接收器连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)。必须启用检查点，才能将文件写入目标。当应用程序在 Amazon Managed Service for Apache Flink 中运行时，应用程序配置会控制检查点并默认启用该检查点。相反，在本地运行时，默认情况下检查点处于禁用状态。该应用程序检测到它在本地运行，并且每 5,000 毫秒配置一次检查点。

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + 此应用程序不接收来自实际外部源的数据。它生成随机数据以通过[DataGen 连接](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)器进行处理。此连接器可用于 DataStream API、SQL 和表 API。为了演示两者之间的集成 APIs，应用程序使用 DataStram API 版本，因为它提供了更大的灵活性。在此案例中，每条记录都是由称为 `StockPriceGeneratorFunction` 的生成器函数生成的，您可以在该函数中放置自定义逻辑。

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + 在 DataStream API 中，记录可以有自定义类。这些类必须遵循特定的规则，这样 Flink 才能将其用作记录。有关更多信息，请参阅[支持的数据类型](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types)。在此示例中，`StockPrice` 类是一个 [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)。
  + 然后将源代码附加到执行环境中，并且生成 `StockPrice` 的 `DataStream`。此应用程序不使用[事件时间语义](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time)，也不会生成水印。以 1 的并行度运行 DataGenerator 源代码，与应用程序其余部分的并行度无关。

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + 数据处理流程中的后续内容是使用表 API 和 SQL 定义的。为此，我们将 o DataStream f StockPrices 转换为表格。表的架构从 `StockPrice` 类中自动推断。

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + 以下代码片段展示如何使用编程的表 API 定义视图和查询：

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + 定义接收器表以将结果作为 JSON 文件写入 Amazon S3 存储桶。为说明与编程方式定义视图的区别，在表 API 中，使用 SQL 定义接收器表。

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + 最后一步是 `executeInsert()`，其中将筛选后的股票价格视图插入接收器表中。此方法启动我们到目前为止定义的数据流程的执行。

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### 使用 pom.xml 文件
<a name="get-started-exercise-5-2"></a>

pom.xml 文件定义应用程序所需的所有依赖项，并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
+ 有些依赖项具有 `provided` 范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时，这些依赖项自动可用。它们是应用程序或 IDE 中本地应用程序所必需的条件。有关更多信息，请参阅（表 API 的更新）[在本地运行应用程序](get-started-exercise.md#get-started-exercise-5-run)。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。要使用表 API 和 SQL，必须包括 `flink-table-planner-loader` 和 `flink-table-runtime-dependencies`（两者都具有 `provided` 范围）。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ 必须使用默认范围向 pom 添加其他 Apache Flink 依赖项。例如，[DataGen 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)、[FileSystem SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/)和 [JSON 格式](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/)。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ 为了在本地运行时写入 Amazon S3，也要包含 S3 Hadoop 文件系统并采用 `provided` 范围。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Maven Java 编译器插件确保根据 Java 11 编译代码，Java 11 是 Apache Flink 目前支持的 JDK 版本。
+ Maven Shade 插件打包 fat-jar，但不包括运行时提供的一些库。它还指定两个转换器：`ServicesResourceTransformer` 和 `ManifestResourceTransformer`。后者配置包含启动应用程序的 `main` 方法的类。如果重命名主类，请不要忘记更新此转换器。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## 在本地运行应用程序
<a name="gs-table-run-locally"></a>

您可以在 IDE 中本地运行和调试 Flink 应用程序。

**注意**  
在继续之前，请确认输入和输出流是否可用。请参阅[创建两个 Amazon Kinesis 数据流](get-started-exercise.md#get-started-exercise-1)。此外，请确认您是否具有从两个流中读取和写入的权限。请参阅[对您的 AWS 会话进行身份验证](get-started-exercise.md#get-started-exercise-2-5)。  
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅[满足完成练习的先决条件](getting-started.md#setting-up-prerequisites)。

### 将 Java 项目导入您的 IDE
<a name="gs-table-import"></a>

要开始在 IDE 中使用该应用程序，必须将其作为 Java 项目导入。

您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中，请将 `./jave/GettingStartedTable` 子目录中的内容导入 IDE。

使用 Maven 将代码作为现有 Java 项目插入。

**注意**  
导入新 Java 项目的确切过程因所使用的 IDE 而异。

### 修改本地应用程序配置
<a name="gs-table-modify-2"></a>

在本地运行时，应用程序使用 `./src/main/resources` 下项目资源文件夹中的 `application_properties.json` 文件内的配置。对于本教程应用程序，配置参数是存储桶的名称和写入数据的路径。

编辑配置并修改 Amazon S3 存储桶名称，使其与您在本教程开头创建的存储桶相匹配。

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "<bucket-name>",
 "path": "output"
 }
 }
]
```

**注意**  
配置属性 `name` 必须仅包含存储桶名称，例如 `my-bucket-name`。请勿包含任何前缀（例如 `s3://`）或尾部斜杠。  
如果修改路径，请省略所有前导或尾部斜杠。

### 设置 IDE 运行配置
<a name="gs-table-setupIDE"></a>

您可以像运行任何 Java 应用程序一样，通过运行主类 `com.amazonaws.services.msf.BasicTableJob` 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前，必须设置“运行”配置。设置取决于您使用的 IDE。例如，请参阅 IntelliJ IDEA 文档中的[运行/调试配置](https://www.jetbrains.com/help/idea/run-debug-configuration.html)。具体而言，您必须设置以下内容：

1. **将 `provided` 依赖项添加到类路径中**。这是确保在本地运行时将具有 `provided` 范围的依赖项传递给应用程序所必需的条件。如果不进行此设置，应用程序会立即显示 `class not found` 错误。

1. **将访问 Kinesis 直播的 AWS 凭证传递给应用程序。**最快的方法是使用[适用于 IntelliJ IDEA 的AWS Toolkit](https://aws.amazon.com/intellij/)。在 “运行” 配置中使用此 IDE 插件，可以选择特定的 AWS 配置文件。 AWS 使用此配置文件进行身份验证。您无需直接传递 AWS 凭证。

1. 验证 IDE 是否使用 **JDK 11** 运行应用程序。

### 在 IDE 中运行应用程序
<a name="gs-table-runIDE"></a>

设置 `BasicTableJob` 的“运行”配置后，您可以像常规 Java 应用程序一样运行或调试它。

**注意**  
不能从命令行使用 `java -jar ...` 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。

当应用程序成功启动时，它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

初始化完成后，应用程序不会再发出任何日志条目。**当数据流动时，不会发出任何日志。**

要验证应用程序是否正确处理数据，您可以检查输出存储桶的内容，如下一节所述。

**注意**  
 Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试，但在生产环境中运行时可能会增加大量开销。

## 观察将数据写入 S3 存储桶的应用程序
<a name="gs-table-input-output"></a>

此示例应用程序在内部生成随机数据，并将这些数据写入您配置的目标 S3 存储桶。除非您修改默认配置路径，否则数据将以 `./output/<yyyy-MM-dd>/<HH>` 格式写入 `output` 路径，后跟日期和小时分区。

[ FileSystem 接收器连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)在 Flink 检查点上创建新文件。在本地运行时，应用程序每 5 秒（5,000 毫秒）运行一次检查点，如代码中所述。

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**浏览 S3 存储桶并观察应用程序写入的文件**

1. 

   1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您之前创建的存储桶。

1. 导航到 `output` 路径，然后导航到与 UTC 时区当前时间相对应的日期和小时文件夹。

1. 定期刷新以观察每 5 秒钟出现的新文件。

1. 选择并下载一个文件以观察其内容。
**注意**  
默认情况下，这些文件没有扩展名。该内容的格式设置为 JSON。您可以使用任何文本编辑器打开文件来检查内容。

## 停止在本地运行的应用程序
<a name="gs-table-stop"></a>

停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。

## 编译并打包您的应用程序代码
<a name="gs-table-5.5"></a>

在本节中，您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。

**要使用 Maven 命令行进行编译和打包，请执行以下操作**：

移至包含 Jave GettingStarted 项目的目录并运行以下命令：

```
$ mvn package
```

**使用 IDE 进行编译和打包**

从 IDE Maven 集成中运行 `mvn package`。

在这两种情况下，都会创建 JAR 文件 `target/amazon-msf-java-table-app-1.0.jar`。

**注意**  
从 IDE 运行*构建项目*可能不会创建 JAR 文件。

## 上传应用程序代码 JAR 文件
<a name="gs-table-6"></a>

在本节中，您将在上一节中创建的 JAR 文件上传到在教程开始时创建的 Amazon S3 存储桶。如果已经执行此操作，请完成 [创建 Amazon S3 存储桶](#gs-table-resources-s3)。

**上传应用程序代码**

1. 打开 Amazon S3 控制台，网址为 [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)。

1. 选择您之前为应用程序代码创建的存储桶。

1. 选择**上传**字段。

1. 选择**添加文件**。

1. 导航到上一节中生成的 JAR 文件：`target/amazon-msf-java-table-app-1.0.jar`。

1. 在不更改任何其他设置的情况下选择**上传**。
**警告**  
 确保在 `<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar` 中选择正确的 JAR 文件。  
目标目录还包含您无需上传的其他 JAR 文件。

## 创建并配置 Managed Service for Apache Flink 应用程序
<a name="gs-table-7"></a>

您可以使用控制台或 AWS CLI创建和配置 Managed Service for Apache Flink 应用程序。在本教程中，您将使用控制台。

**注意**  
当您使用控制台创建应用程序时，系统会为您创建您的 AWS Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。当您使用 AWS CLI创建应用程序时，您必须单独创建这些资源。

### 创建应用程序
<a name="gs-table-7-console-create"></a>

1. 登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com

1. 确认选择正确的区域：美国东部（弗吉尼亚州北部）us-east-1。

1. 在右侧菜单上，选择 **Apache Flink 应用程序**，然后选择**创建流应用程序**。或者，在初始页面的**入门**部分中选择**创建流应用程序**。

1. 在**创建流应用程序**页面上，完成以下操作：
   + 在**选择设置流处理应用程序的方法**中，选择**从头开始创建**。
   + 对于 **Apache Flink 配置，应用程序 Flink 版本**，请选择 **Apache Flink 1.19**。
   + 在**应用程序配置**部分中，完成以下步骤：
     + 对于 **应用程序名称 **，输入 **MyApplication**。
     + 对于**描述**，输入 **My Java Table API test app**。
     + **要访问应用程序资源**，请选择使用所需策略**创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1**。
   + 在**应用程序设置模板**中，完成以下操作：
     + 对于**模板**，请选择**开发**。

1. 选择**创建流应用程序**。

**注意**  
在使用控制台创建应用程序的 Managed Service for Apache Flink时，您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的，如下所示：  
策略：`kinesis-analytics-service-MyApplication-us-east-1`
角色：`kinesisanalytics-MyApplication-us-east-1`

### 编辑 IAM 策略
<a name="gs-table-7-console-iam"></a>

编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。

**编辑 IAM policy 以添加 S3 存储桶权限**

1. 使用 [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) 打开 IAM 控制台。

1. 选择**策略**。选择控制台在上一部分中为您创建的 **`kinesis-analytics-service-MyApplication-us-east-1`** 策略。

1. 选择**编辑**，然后选择 **JSON** 选项卡。

1. 将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (*012345678901*) 替换为您的账户 ID 和*<bucket-name>*您创建的 S3 存储桶的名称。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::amzn-s3-demo-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:123456789012:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:123456789012:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::amzn-s3-demo-bucket2"
               ]
          }
       ]
   }
   ```

------

1.  选择**下一步**，然后选择**保存更改**。

### 配置应用程序
<a name="gs-table-7-console-configure"></a>

编辑应用程序以设置应用程序代码构件。

**配置应用程序**

1. 在**MyApplication**页面上，选择**配置**。

1. 在**应用程序代码位置**部分，选择**配置**。
   + 对于 **Amazon S3 存储桶**，请选择之前为应用程序代码创建的存储桶。选择**浏览**并选择正确的存储桶，然后选择**选择**。请勿单击存储桶名称。
   + 在 **Amazon S3 对象的路径**中，输入 **amazon-msf-java-table-app-1.0.jar**。

1. 对于**访问权限**，请选择 **创建/更新 IAM 角色 `kinesis-analytics-MyApplication-us-east-1`**。

1. 在**运行时属性**部分中，添加以下属性。

1. 选择**添加新项目**并添加以下每个参数：    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/managed-flink/latest/java/gs-table-create.html)

1. 请勿修改任何其他设置。

1. 选择**保存更改**。

**注意**  
当您选择启用 Amazon CloudWatch 日志时，适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示：  
日志组：`/aws/kinesis-analytics/MyApplication`
日志流：`kinesis-analytics-log-stream`

### 运行应用程序
<a name="gs-table-7-console-run"></a>

应用程序现已完成配置，准备好运行。

**运行应用程序**

1. 返回适用于 Apache Flink 的亚马逊托管服务中的控制台页面并选择。**MyApplication**

1. 选择**运行**以启动应用程序。

1. 在**应用程序还原配置中**，选择**使用最新快照运行**。

1. 选择**运行**。

1. **应用程序详细信息**中的**状态**会从 `Ready` 转换到 `Starting`，然后在应用程序启动后转换到 `Running`。

当应用程序处于 `Running` 状态时，您可以打开 Flink 控制面板。

**打开控制面板并查看作业**

1. 选择**打开 Apache Flink 控制面板**。控制面板将在新页面中打开。

1. 在**正在运行的作业**列表中，选择您可以看到的单个作业。
**注意**  
如果您错误设置运行时属性或编辑 IAM 策略，则应用程序状态可能会更改为 `Running`，但是 Flink 控制面板显示任务正在持续重新启动。在应用程序配置错误或缺乏访问外部资源的权限时，通常会出现这种故障场景。  
发生这种情况时，请检查 Flink 控制面板中的**异常**选项卡以调查问题的原因。

### 观察运行中应用程序的指标
<a name="gs-observe-metrics"></a>

在该**MyApplication**页面的 **Amazon CloudWatch 指标**部分，您可以看到正在运行的应用程序中的一些基本指标。

**查看指标**

1. 在**刷新**按钮旁边，从下拉列表中选择 **10 秒**。

1. 当应用程序运行且运行状况良好时，您可以看到**正常运行时间**指标不断增加。

1. **完全重新启动**指标应为零。如果该指标增加，则配置可能存在问题。查看 Flink 控制面板上的**异常**选项卡以调查问题。

1. 在运行状况良好的应用程序中，**失败的检查点数**指标应为零。
**注意**  
此控制面板显示一组固定的指标，且粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。

### 观察将数据写入目标存储桶的应用程序
<a name="gs-observe-output"></a>

现在，您可以观察在 Amazon Managed Service for Apache Flink 中运行的应用程序，该应用程序将文件写入 Amazon S3。

要观察这些文件，请按照应用程序在本地运行时检查写入中文件的相同过程进行操作。请参阅 [观察将数据写入 S3 存储桶的应用程序](#gs-table-input-output)。

请记住，应用程序会在 Flink 检查点上写入新文件。在 Amazon Managed Service for Apache Flink 上运行时，检查点默认处于启用状态，每 60 秒运行一次。该应用程序大约每 1 分钟创建一次新文件。

### 停止应用程序
<a name="gs-table-7-console-stop"></a>

要停止应用程序，请转至名为 `MyApplication` 的 Managed Service for Apache Flink 应用程序的控制台页面。

**停止应用程序**

1. 从**操作**下拉列表中，选择**停止**。

1. **应用程序详细信息**中的**状态**会从 `Running` 转换到 `Stopping`，然后在应用程序完全停止时转换到 `Ready`。
**注意**  
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。