本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
创建并运行适用于 Apache Flink 的托管服务应用程序
在本练习中,您将创建 Managed Service for Apache Flink 应用程序,并将数据流作为源和接收器。
本节包含以下步骤。
创建相关资源
在本练习中创建 Managed Service for Apache Flink之前,您需要创建以下从属资源:
-
存储应用程序代码和写入应用程序输出的 Amazon S3 存储桶。
注意
本教程假设您在 us-east-1 区域中部署应用程序。如果您使用其他区域,则必须相应地调整所有步骤。
创建 Amazon S3 存储桶
您可以使用控制台来创建 Amazon S3 存储桶。有关创建该资源的说明,请参阅以下主题:
-
《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称。
注意
请确保在用于本教程的区域中创建存储桶。教程的默认区域为 us-east-1。
其他 资源
在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):
-
名为
/AWS/KinesisAnalytics-java/<my-application>的日志组。 -
名为
kinesis-analytics-log-stream的日志流。
设置本地开发环境
对于开发和调试,您可以直接从所选的 IDE 在计算机上运行 Apache Flink 应用程序。使用 Maven 作为普通的 Java 依赖项处理任何 Apache Flink 依赖项。
注意
在开发计算机上,必须安装 Java JDK 11、Maven 和 Git。我们建议您使用开发环境(如 Eclipse Java Neon
对您的 AWS 会话进行身份验证
该应用程序使用 Kinesis 数据流来发布数据。在本地运行时,您必须拥有有效的 AWS 经过身份验证的会话,并具有写入 Kinesis 数据流的权限。按照以下步骤对会话进行身份验证:
-
如果您没有配置带有有效凭据 AWS CLI 的命名配置文件,请参阅设置 AWS Command Line Interface (AWS CLI)。
-
如果您的 IDE 有要集成的插件 AWS,则可以使用该插件将凭据传递给 IDE 中运行的应用程序。有关更多信息,请参阅适用于 IntelliJ IDEA 的AWS Toolkit
和用于编译应用程序或运行 Eclipse 的AWS Toolkit。
下载并检查 Apache Flink 流式处理 Java 代码
此示例的应用程序代码可从中获得 GitHub。
下载 Java 应用程序代码
-
使用以下命令克隆远程存储库:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
导航到
./java/GettingStartedTable目录。
审核应用程序组件
该应用程序完全在 com.amazonaws.services.msf.BasicTableJob 类中实施。main() 方法定义源、转换和接收器。由此方法末尾的执行语句启动执行。
注意
为获得最佳的开发人员体验,该应用程序设计为无需更改任何代码即可同时在 Amazon Managed Service for Apache Flink 上和本地运行,以便在您的 IDE 中进行开发。
-
要读取运行时配置,使其在 Amazon Managed Service for Apache Flink 和 IDE 中能够正常运行,应用程序会自动检测它是否在 IDE 中本地独立运行。在这种情况下,应用程序加载运行时配置的方式会有所不同:
-
当应用程序检测到其在 IDE 中以独立模式运行时,会形成包含在项目 resources 文件夹中的
application_properties.json文件。该文件的内容如下所示。 -
当应用程序在 Amazon Managed Service for Apache Flink 中运行时,默认行为会根据您将在 Amazon Managed Service for Apache Flink 应用程序中定义的运行时属性加载应用程序配置。请参阅创建并配置 Managed Service for Apache Flink 应用程序。
private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
-
-
main()方法定义应用程序数据流程并运行它。-
初始化默认的流环境。在此示例中,我们展示了如何创建
StreamExecutionEnvironment要用于 DataStream API 的,以及StreamTableEnvironment要用于 SQL 和表 API 的。这两个环境对象是对同一个运行时环境的两个单独引用,用法不同 APIs。StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build()); -
加载应用程序配置参数。这将自动从正确的位置加载这些参数,具体取决于应用程序的运行位置:
Map<String, Properties> applicationParameters = loadApplicationProperties(env); -
当 Flink 完成检查点
时,应用程序用于将结果写入 Amazon S3 输出文件的FileSystem 接收器连接器 。必须启用检查点,才能将文件写入目标。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,应用程序配置会控制检查点并默认启用该检查点。相反,在本地运行时,默认情况下检查点处于禁用状态。该应用程序检测到它在本地运行,并且每 5,000 毫秒配置一次检查点。 if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); } -
此应用程序不接收来自实际外部源的数据。它生成随机数据以通过DataGen 连接
器进行处理。此连接器可用于 DataStream API、SQL 和表 API。为了演示两者之间的集成 APIs,应用程序使用 DataStram API 版本,因为它提供了更大的灵活性。在此案例中,每条记录都是由称为 StockPriceGeneratorFunction的生成器函数生成的,您可以在该函数中放置自定义逻辑。DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class)); -
在 DataStream API 中,记录可以有自定义类。这些类必须遵循特定的规则,这样 Flink 才能将其用作记录。有关更多信息,请参阅支持的数据类型
。在此示例中, StockPrice类是一个 POJO。 -
然后将源代码附加到执行环境中,并且生成
StockPrice的DataStream。此应用程序不使用事件时间语义,也不会生成水印。以 1 的并行度运行 DataGenerator 源代码,与应用程序其余部分的并行度无关。 DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1); -
数据处理流程中的后续内容是使用表 API 和 SQL 定义的。为此,我们将 o DataStream f StockPrices 转换为表格。表的架构从
StockPrice类中自动推断。Table stockPricesTable = tableEnv.fromDataStream(stockPrices); -
以下代码片段展示如何使用编程的表 API 定义视图和查询:
Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable); -
定义接收器表以将结果作为 JSON 文件写入 Amazon S3 存储桶。为说明与编程方式定义视图的区别,在表 API 中,使用 SQL 定义接收器表。
tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")"); -
最后一步是
executeInsert(),其中将筛选后的股票价格视图插入接收器表中。此方法启动我们到目前为止定义的数据流程的执行。filteredStockPricesTable.executeInsert("s3_sink");
-
使用 pom.xml 文件
pom.xml 文件定义应用程序所需的所有依赖项,并且设置 Maven Shade 插件来构建包含 Flink 所需的所有依赖项的 fat-jar。
-
有些依赖项具有
provided范围。当应用程序在 Amazon Managed Service for Apache Flink 中运行时,这些依赖项自动可用。它们是应用程序或 IDE 中本地应用程序所必需的条件。有关更多信息,请参阅(表 API 的更新)在本地运行应用程序。请确保使用的 Flink 版本与 Amazon Managed Service for Apache Flink 中使用的运行时版本相同。要使用表 API 和 SQL,必须包括flink-table-planner-loader和flink-table-runtime-dependencies(两者都具有provided范围)。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
-
必须使用默认范围向 pom 添加其他 Apache Flink 依赖项。例如,DataGen 连接器
、FileSystem SQL 连接器 和 JSON 格式 。 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
为了在本地运行时写入 Amazon S3,也要包含 S3 Hadoop 文件系统并采用
provided范围。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> -
Maven Java 编译器插件确保根据 Java 11 编译代码,Java 11 是 Apache Flink 目前支持的 JDK 版本。
-
Maven Shade 插件打包 fat-jar,但不包括运行时提供的一些库。它还指定两个转换器:
ServicesResourceTransformer和ManifestResourceTransformer。后者配置包含启动应用程序的main方法的类。如果重命名主类,请不要忘记更新此转换器。 -
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>
在本地运行应用程序
您可以在 IDE 中本地运行和调试 Flink 应用程序。
注意
在继续之前,请确认输入和输出流是否可用。请参阅创建两个 Amazon Kinesis 数据流。此外,请确认您是否具有从两个流中读取和写入的权限。请参阅对您的 AWS 会话进行身份验证。
设置本地开发环境需要 Java 11 JDK、Apache Maven 和 IDE 来进行 Java 开发。确认您满足所需的先决条件。请参阅满足完成练习的先决条件。
将 Java 项目导入您的 IDE
要开始在 IDE 中使用该应用程序,必须将其作为 Java 项目导入。
您克隆的存储库包含多个示例。每个示例都是一个单独的项目。在本教程中,请将 ./jave/GettingStartedTable 子目录中的内容导入 IDE。
使用 Maven 将代码作为现有 Java 项目插入。
注意
导入新 Java 项目的确切过程因所使用的 IDE 而异。
修改本地应用程序配置
在本地运行时,应用程序使用 ./src/main/resources 下项目资源文件夹中的 application_properties.json 文件内的配置。对于本教程应用程序,配置参数是存储桶的名称和写入数据的路径。
编辑配置并修改 Amazon S3 存储桶名称,使其与您在本教程开头创建的存储桶相匹配。
[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
注意
配置属性 name 必须仅包含存储桶名称,例如 my-bucket-name。请勿包含任何前缀(例如 s3://)或尾部斜杠。
如果修改路径,请省略所有前导或尾部斜杠。
设置 IDE 运行配置
您可以像运行任何 Java 应用程序一样,通过运行主类 com.amazonaws.services.msf.BasicTableJob 直接从 IDE 运行和调试 Flink 应用程序。运行应用程序前,必须设置“运行”配置。设置取决于您使用的 IDE。例如,请参阅 IntelliJ IDEA 文档中的运行/调试配置
-
将
provided依赖项添加到类路径中。这是确保在本地运行时将具有provided范围的依赖项传递给应用程序所必需的条件。如果不进行此设置,应用程序会立即显示class not found错误。 -
将访问 Kinesis 直播的 AWS 凭证传递给应用程序。最快的方法是使用适用于 IntelliJ IDEA 的AWS Toolkit
。在 “运行” 配置中使用此 IDE 插件,可以选择特定的 AWS 配置文件。 AWS 使用此配置文件进行身份验证。您无需直接传递 AWS 凭证。 -
验证 IDE 是否使用 JDK 11 运行应用程序。
在 IDE 中运行应用程序
设置 BasicTableJob 的“运行”配置后,您可以像常规 Java 应用程序一样运行或调试它。
注意
不能从命令行使用 java -jar
... 直接运行 Maven 生成的 fat-jar。此 jar 不包含独立运行应用程序所需的 Flink 核心依赖项。
当应用程序成功启动时,它会记录一些有关独立微型集群和连接器初始化的信息。接下来是 Flink 通常在应用程序启动时发出的许多信息和一些警告日志。
21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...
初始化完成后,应用程序不会再发出任何日志条目。当数据流动时,不会发出任何日志。
要验证应用程序是否正确处理数据,您可以检查输出存储桶的内容,如下一节所述。
注意
Flink 应用程序的正常行为是不发出有关流动数据的日志。在每条记录上发出日志可能便于调试,但在生产环境中运行时可能会增加大量开销。
观察将数据写入 S3 存储桶的应用程序
此示例应用程序在内部生成随机数据,并将这些数据写入您配置的目标 S3 存储桶。除非您修改默认配置路径,否则数据将以 ./output/<yyyy-MM-dd>/<HH> 格式写入 output 路径,后跟日期和小时分区。
FileSystem 接收器连接器
if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
浏览 S3 存储桶并观察应用程序写入的文件
-
打开 Amazon S3 控制台,网址为 https://console.aws.amazon.com/s3/
。
-
选择您之前创建的存储桶。
-
导航到
output路径,然后导航到与 UTC 时区当前时间相对应的日期和小时文件夹。 -
定期刷新以观察每 5 秒钟出现的新文件。
-
选择并下载一个文件以观察其内容。
注意
默认情况下,这些文件没有扩展名。该内容的格式设置为 JSON。您可以使用任何文本编辑器打开文件来检查内容。
停止在本地运行的应用程序
停止在 IDE 中运行的应用程序。IDE 通常会提供“停止”选项。确切的位置和方法取决于 IDE。
编译并打包您的应用程序代码
在本节中,您将使用 Apache Maven 编译 Java 代码并将其打包到 JAR 文件中。您可以使用 Maven 命令行工具或 IDE 编译和打包代码。
要使用 Maven 命令行进行编译和打包,请执行以下操作:
移至包含 Jave GettingStarted 项目的目录并运行以下命令:
$ mvn package
使用 IDE 进行编译和打包
从 IDE Maven 集成中运行 mvn package。
在这两种情况下,都会创建 JAR 文件 target/amazon-msf-java-table-app-1.0.jar。
注意
从 IDE 运行构建项目可能不会创建 JAR 文件。
上传应用程序代码 JAR 文件
在本节中,您将在上一节中创建的 JAR 文件上传到在教程开始时创建的 Amazon S3 存储桶。如果已经执行此操作,请完成 创建 Amazon S3 存储桶。
上传应用程序代码
打开 Amazon S3 控制台,网址为 https://console.aws.amazon.com/s3/
。 -
选择您之前为应用程序代码创建的存储桶。
-
选择上传字段。
-
选择添加文件。
-
导航到上一节中生成的 JAR 文件:
target/amazon-msf-java-table-app-1.0.jar。 -
在不更改任何其他设置的情况下选择上传。
警告
确保在
<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar中选择正确的 JAR 文件。目标目录还包含您无需上传的其他 JAR 文件。
创建并配置 Managed Service for Apache Flink 应用程序
您可以使用控制台或 AWS CLI创建和配置 Managed Service for Apache Flink 应用程序。在本教程中,您将使用控制台。
注意
当您使用控制台创建应用程序时,系统会为您创建您的 AWS Identity and Access Management (IAM) 和 A CloudWatch mazon Logs 资源。当您使用 AWS CLI创建应用程序时,您必须单独创建这些资源。
创建应用程序
登录并通过 /f AWS 管理控制台 link 打开亚马逊 MSF 控制台。 https://console.aws.amazon.com
-
确认选择正确的区域:美国东部(弗吉尼亚州北部)us-east-1。
-
在右侧菜单上,选择 Apache Flink 应用程序,然后选择创建流应用程序。或者,在初始页面的入门部分中选择创建流应用程序。
-
在创建流应用程序页面上,完成以下操作:
-
在选择设置流处理应用程序的方法中,选择从头开始创建。
-
对于 Apache Flink 配置,应用程序 Flink 版本,请选择 Apache Flink 1.19。
-
在应用程序配置部分中,完成以下步骤:
-
对于 应用程序名称 ,输入
MyApplication。 -
对于描述,输入
My Java Table API test app。 -
要访问应用程序资源,请选择使用所需策略创建/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1。
-
-
在应用程序设置模板中,完成以下操作:
-
对于模板,请选择开发。
-
-
-
选择创建流应用程序。
注意
在使用控制台创建应用程序的 Managed Service for Apache Flink时,您可以选择为应用程序创建 IAM 角色和策略。您的应用程序使用此角色和策略访问其从属资源。这些 IAM 资源是使用您的应用程序名称和区域命名的,如下所示:
-
策略:
kinesis-analytics-service-MyApplication-us-east-1 -
角色:
kinesisanalytics-MyApplication-us-east-1
编辑 IAM 策略
编辑 IAM policy 以添加访问 Amazon S3 数据流的权限。
编辑 IAM policy 以添加 S3 存储桶权限
使用 https://console.aws.amazon.com/iam/
打开 IAM 控制台。 -
选择策略。选择控制台在上一部分中为您创建的
kinesis-analytics-service-MyApplication-us-east-1策略。 -
选择编辑,然后选择 JSON 选项卡。
-
将以下策略示例中突出显示的部分添加到策略中。将示例账户 ID (
012345678901) 替换为您的账户 ID 和<bucket-name>您创建的 S3 存储桶的名称。 -
选择下一步,然后选择保存更改。
配置应用程序
编辑应用程序以设置应用程序代码构件。
配置应用程序
-
在MyApplication页面上,选择配置。
-
在应用程序代码位置部分,选择配置。
-
对于 Amazon S3 存储桶,请选择之前为应用程序代码创建的存储桶。选择浏览并选择正确的存储桶,然后选择选择。请勿单击存储桶名称。
-
在 Amazon S3 对象的路径中,输入
amazon-msf-java-table-app-1.0.jar。
-
-
对于访问权限,请选择 创建/更新 IAM 角色
kinesis-analytics-MyApplication-us-east-1。 -
在运行时属性部分中,添加以下属性。
-
选择添加新项目并添加以下每个参数:
组 ID 键 值 bucketnameyour-bucket-namebucketpathoutput -
请勿修改任何其他设置。
-
选择保存更改。
注意
当您选择启用 Amazon CloudWatch 日志时,适用于 Apache Flink 的托管服务会为您创建一个日志组和日志流。这些资源的名称如下所示:
-
日志组:
/aws/kinesis-analytics/MyApplication -
日志流:
kinesis-analytics-log-stream
运行应用程序
应用程序现已完成配置,准备好运行。
运行应用程序
-
返回适用于 Apache Flink 的亚马逊托管服务中的控制台页面并选择。MyApplication
-
选择运行以启动应用程序。
-
在应用程序还原配置中,选择使用最新快照运行。
-
选择运行。
应用程序详细信息中的状态会从
Ready转换到Starting,然后在应用程序启动后转换到Running。
当应用程序处于 Running 状态时,您可以打开 Flink 控制面板。
打开控制面板并查看作业
-
选择打开 Apache Flink 控制面板。控制面板将在新页面中打开。
-
在正在运行的作业列表中,选择您可以看到的单个作业。
注意
如果您错误设置运行时属性或编辑 IAM 策略,则应用程序状态可能会更改为
Running,但是 Flink 控制面板显示任务正在持续重新启动。在应用程序配置错误或缺乏访问外部资源的权限时,通常会出现这种故障场景。发生这种情况时,请检查 Flink 控制面板中的异常选项卡以调查问题的原因。
观察运行中应用程序的指标
在该MyApplication页面的 Amazon CloudWatch 指标部分,您可以看到正在运行的应用程序中的一些基本指标。
查看指标
-
在刷新按钮旁边,从下拉列表中选择 10 秒。
-
当应用程序运行且运行状况良好时,您可以看到正常运行时间指标不断增加。
-
完全重新启动指标应为零。如果该指标增加,则配置可能存在问题。查看 Flink 控制面板上的异常选项卡以调查问题。
-
在运行状况良好的应用程序中,失败的检查点数指标应为零。
注意
此控制面板显示一组固定的指标,且粒度为 5 分钟。您可以使用仪表板中的任何指标创建自定义应用程序 CloudWatch 控制面板。
观察将数据写入目标存储桶的应用程序
现在,您可以观察在 Amazon Managed Service for Apache Flink 中运行的应用程序,该应用程序将文件写入 Amazon S3。
要观察这些文件,请按照应用程序在本地运行时检查写入中文件的相同过程进行操作。请参阅 观察将数据写入 S3 存储桶的应用程序。
请记住,应用程序会在 Flink 检查点上写入新文件。在 Amazon Managed Service for Apache Flink 上运行时,检查点默认处于启用状态,每 60 秒运行一次。该应用程序大约每 1 分钟创建一次新文件。
停止应用程序
要停止应用程序,请转至名为 MyApplication 的 Managed Service for Apache Flink 应用程序的控制台页面。
停止应用程序
-
从操作下拉列表中,选择停止。
-
应用程序详细信息中的状态会从
Running转换到Stopping,然后在应用程序完全停止时转换到Ready。注意
请勿忘记还要停止从 Python 脚本或 Kinesis Data Generator 向输入流发送数据。