

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
<a name="get-started-exercise"></a>

この演習では、データストリームをソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [2 つの Amazon Kinesis Data Streams を作成する](#get-started-exercise-1)
+ [入力ストリームへのサンプルレコードの書き込み](#get-started-exercise-2)
+ [Apache Flink Streaming Java Code のダウンロードと検証](#get-started-exercise-5)
+ [アプリケーションコードのコンパイル](#get-started-exercise-5.5)
+ [Apache Flink Streaming Java Code のアップロードしてください](#get-started-exercise-6)
+ [Managed Service for Apache Flink アプリケーションを作成して実行する](#get-started-exercise-7)

## 2 つの Amazon Kinesis Data Streams を作成する
<a name="get-started-exercise-1"></a>

この演習でAmazon Managed Service for Apache Flink を作成する前に、2 つの Kinesis データストリーム (`ExampleInputStream` と `ExampleOutputStream`) を作成する必要があります。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールを使用した手順については、[データストリームの作成および更新](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)を参照してください。

**データストリームを作成するには (AWS CLI)**

1. 最初のストリーム (`ExampleInputStream`) を作成するには、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を `ExampleOutputStream` に変更して同じコマンドを実行します。

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## 入力ストリームへのサンプルレコードの書き込み
<a name="get-started-exercise-2"></a>

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

**注記**  
このセクションでは [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/) が必要です。

1. 次の内容で、`stock.py` という名前のファイルを作成します。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. このチュートリアルの後半では、アプリケーションにデータを送信する `stock.py` スクリプトを実行します。

   ```
   $ python stock.py
   ```

## Apache Flink Streaming Java Code のダウンロードと検証
<a name="get-started-exercise-5"></a>

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. `GettingStarted` ディレクトリに移動します。

アプリケーションコードは `CustomSinkStreamingJob.java` ファイルと `CloudWatchLogSink.java` ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
+ アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis シンクが作成されます。

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## アプリケーションコードのコンパイル
<a name="get-started-exercise-5.5"></a>

このセクションでは、Apache Maven コンパイラを使用してアプリケーション用の Java コードを作成します。Apache Maven と Java 開発キット (JDK) をインストールする方法については、[演習を完了するための前提条件](tutorial-stock-data.md#setting-up-prerequisites)を参照してください。

Java アプリケーションには、次のコンポーネントが必要です。
+ [プロジェクトオブジェクトモデル (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) ファイル。ファイルには、Amazon Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。
+ アプリケーションのロジックを含む `main` メソッド。

**注記**  
**次のアプリケーション用の Kinesis コネクタを使用するには、コネクタのソースコードをダウンロードして、[Apache Flink ドキュメント](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html)で説明されているように構築する必要があります。**

**アプリケーションコードを作成してコンパイルするには**

1. Java/Maven アプリケーションを開発環境で作成します。アプリケーションを作成する方法については、開発環境のドキュメントを参照してください。
   + [ 最初の Java プロジェクトの作成 (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ 最初の Java アプリケーションの作成、実行、およびパッケージング (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. `StreamingJob.java` という名前のファイルに対して次のコードを使用します。

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   前述のコード例については、以下の点に注意してください。
   + このファイルには、アプリケーションの機能を定義する `main` メソッドが含まれています。
   + アプリケーションでは、ソースおよびシンクコネクタを作成し、`StreamExecutionEnvironment` オブジェクトを使用して外部リソースにアクセスします。
   + アプリケーションでは、静的プロパティを使用してソースおよびシンクコネクタを作成します。動的なアプリケーションプロパティを使用するには、`createSourceFromApplicationProperties` および `createSinkFromApplicationProperties` メソッドを使用してコネクタを作成します。これらのメソッドは、アプリケーションのプロパティを読み取ってコネクタを設定します。

1. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。コードのコンパイルとパッケージ化には次の 2 通りの方法があります。
   + Maven コマンドラインツールを使用します。`pom.xml` ファイルが格納されているディレクトリで次のコマンドを実行して JAR ファイルを作成します。

     ```
     mvn package
     ```
   + 開発環境を使用します。詳細については、開発環境のドキュメントを参照してください。

   パッケージは JAR ファイルとしてアップロードすることも、圧縮して ZIP ファイルとしてアップロードすることもできします。を使用してアプリケーションを作成する場合は AWS CLI、コードコンテンツタイプ (JAR または ZIP) を指定します。

1. コンパイル中にエラーが発生した場合は、`JAVA_HOME` 環境変数が正しく設定されていることを確認します。

アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

`target/java-getting-started-1.0.jar`

## Apache Flink Streaming Java Code のアップロードしてください
<a name="get-started-exercise-6"></a>

このセクションでは、Amazon Simple Storage Service (Amazon S3) バケットを作成し、アプリケーションコードをアップロードします。

**アプリケーションコードをアップロードするには**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. **[バケットを作成]** を選択します。

1. [**Bucket name (バケット名)**] フィールドに**ka-app-code-*<username>***と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[**次へ**] を選択します。

1. **設定オプション**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **アクセス許可の設定**のステップでは、設定をそのままにし、[**次へ**] を選択します。

1. **[バケットを作成]** を選択します。

1. Amazon S3 コンソールで **ka-app-code-*<username>*** バケットを選択し、[**アップロード**] を選択します。

1. **ファイルの選択**のステップで、[**ファイルを追加**] を選択します。前のステップで作成した `java-getting-started-1.0.jar` ファイルに移動します。[**次へ**] を選択します。

1. **アクセス許可の設定**のステップでは、設定をそのままにします。[**次へ**] を選択します。

1. **プロパティの設定**のステップでは、設定をそのままにします。**アップロード** を選択します。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

## Managed Service for Apache Flink アプリケーションを作成して実行する
<a name="get-started-exercise-7"></a>

コンソールまたは AWS CLIのいずれかを使用してManaged Service for Apache Flink を作成し、実行することができます。

**注記**  
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。を使用してアプリケーションを作成するときは AWS CLI、これらのリソースを個別に作成します。

**Topics**
+ [アプリケーションを作成して実行する (コンソール)](#get-started-exercise-7-console)
+ [アプリケーションを作成して実行する (AWS CLI)](#get-started-exercise-7-cli)

### アプリケーションを作成して実行する (コンソール)
<a name="get-started-exercise-7-console"></a>

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

#### アプリケーションの作成
<a name="get-started-exercise-7-console-create"></a>

1. Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. Amazon Kinesis ダッシュボードで、[**分析アプリケーションを作成する**] を選択します。

1. [**Kinesis Analytics - アプリケーションの作成**] ページで、次のようにアプリケーションの詳細を指定します。
   + [**アプリケーション名**] には**MyApplication**と入力します。
   + [**Description (説明)**] に**My java test app**と入力します。
   + [**ランタイム**] には、[**Apache Flink 1.6**] を選択します。

1. [**アクセス許可**] には、[**IAM ロールの作成 / 更新`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. [**アプリケーションを作成**] を選択します。

**注記**  
コンソールを使用して Amazon Managed Service for Apache Flink を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-west-2`
ロール: `kinesis-analytics-MyApplication-us-west-2`

#### IAM ポリシーを編集する
<a name="get-started-exercise-7-console-iam"></a>

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-west-2`** ポリシーを選択します。

1. [**概要**] ページで、[**ポリシーの編集**] を選択します。**JSON** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### アプリケーションを設定する
<a name="get-started-exercise-7-console-configure"></a>

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. [**Configure application**] ページで、[**Code location**] を次のように指定します。
   + [**Amazon S3 バケット**] で、**ka-app-code-*<username>***と入力します。
   + [**Amazon S3 オブジェクトへのパス**] で、**java-getting-started-1.0.jar**と入力します。

1. [**Access to application resources**] の [**Access permissions**] では、[**Create / update IAM role`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. [**Properties**] の [**Group ID**] には、**ProducerConfigProperties**と入力します。

1. 次のアプリケーションのプロパティと値を入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/get-started-exercise.html)

1. [**Monitoring**] の [**Monitoring metrics level**] が [**Application**] に設定されていることを確認します。

1. [**CloudWatch logging**] では、[**Enable**] チェックボックスをオンにします。

1. **[更新]** を選択します。

**注記**  
CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

#### アプリケーションを実行する
<a name="get-started-exercise-7-console-run"></a>

1. [**MyApplication**] ページで、[**Run**] を選択します。アクションを確認します。

1. アプリケーションが実行されたら、ページを更新します。コンソールには [**Application graph**] が示されます。

#### アプリケーションを停止する
<a name="get-started-exercise-7-console-stop"></a>

[**MyApplication**] ページで、[**Stop**] を選択します。アクションを確認します。

#### アプリケーションの更新
<a name="get-started-exercise-7-console-update"></a>

コンソールを使用して、アプリケーションのプロパティ、モニタリング設定、アプリケーション JAR の場所またはファイル名などのアプリケーション設定を更新できます。アプリケーションコードを更新する必要がある場合は、アプリケーション JAR を Amazon S3 バケットから再ロードすることもできます。

[**MyApplication**] ページで、[**Congirue**] を選択します。アプリケーションの設定を更新し、[**更新**] を選択します。

### アプリケーションを作成して実行する (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

このセクションでは、 AWS CLI を使用して Managed Service for Apache Flink アプリケーションを作成して実行します。Managed Service for Apache Flink は、 `kinesisanalyticsv2` AWS CLI コマンドを使用して Managed Service for Apache Flink アプリケーションを作成して操作します。

#### アクセス許可ポリシーを作成する
<a name="get-started-exercise-7-cli-policy"></a>

まず、2 つのステートメントを含むアクセス許可ポリシーを作成します。1 つは、ソースストリームの `read` アクションに対するアクセス許可を付与し、もう 1 つはシンクストリームの `write` アクションに対するアクセス許可を付与します。次に、IAM ロール (次のセクションで作成) にポリシーをアタッチします。そのため、 Managed Service for Apache Flinkがこのロールを引き受けると、ソースストリームからの読み取りとシンクストリームへの書き込みを行うために必要なアクセス許可がサービスに付与されます。

次のコードを使用して `KAReadSourceStreamWriteSinkStream` アクセス許可ポリシーを作成します。`username` を Amazon S3 バケットの作成に使用したユーザー名に置き換え、アプリケーションコードを保存します。Amazon リソースネーム (ARN) のアカウント ID (`012345678901`) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

許可ポリシーを作成する詳しい手順については、*IAM ユーザーガイド*の[チュートリアル: はじめてのカスタマーマネージドポリシーの作成とアタッチ](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy)を参照してください。

**注記**  
他の AWS サービスにアクセスするには、 を使用できます AWS SDK for Java。Managed Service for Apache Flink は、アプリケーションに関連付けられているサービス実行 IAM ロールに、SDK が必要とする認証情報を自動的に設定します。追加の手順は必要ありません。

#### IAM ロールを作成する
<a name="get-started-exercise-7-cli-role"></a>

このセクションでは、Managed Service for Apache Flink がソースストリームを読み取り、シンクストリームに書き込むために想定できる IAM ロールを作成します。

Apache Flink 用 Managed Service は、許可なしにはストリームにアクセスできません。IAM ロールを介してこれらの許可を付与します。各 IAM ロールには、2 つのポリシーがアタッチされます。信頼ポリシーは、ロールを引き受けるための許可を Managed Service for Apache Flink 付与し、許可ポリシーは、ロールを引き受けた後に Managed Service for Apache Flink が実行できる事柄を決定します。

前のセクションで作成したアクセス許可ポリシーをこのロールにアタッチします。

**IAM ロールを作成するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションペインで [**Roles (ロール)**]、[**Create Role (ロールの作成)**] の順に選択します。

1. [**信頼されるエンティティの種類を選択**] で、[**AWS のサービス**] を選択します。[**このロールを使用するサービスを選択**] で、[**Kinesis Analytics**] を選択します。[**ユースケースの選択**] で、[**Kinesis Analytics **] を選択します。

   **[Next: Permissions]** (次のステップ: 許可) を選択します。

1. [**アクセス権限ポリシーをアタッチする**] ページで、[**Next: Review**] (次: 確認) を選択します。ロールを作成した後に、アクセス許可ポリシーをアタッチします。

1. [**Create role (ロールの作成)**] ページで、**ロールの名前**に **KA-stream-rw-role** を入力します。[**ロールの作成**] を選択してください。

   これで、`KA-stream-rw-role` と呼ばれる新しい IAM ロールが作成されます。次に、ロールの信頼ポリシーとアクセス許可ポリシーを更新します。

1.  アクセス許可ポリシーをロールにアタッチします。
**注記**  
この演習では、Managed Service for Apache Flink が、Kinesis データストリーム (ソース) からのデータの読み取りと、別の Kinesis データストリームへの出力の書き込みの両方を実行するためにこのロールを引き受けます。このため、前のステップで作成したポリシー、[アクセス許可ポリシーを作成する](#get-started-exercise-7-cli-policy) をアタッチします。

   1. [**概要**] ページで、[**アクセス許可**] タブを選択します。

   1. [**Attach Policies (ポリシーのアタッチ)**] を選択します。

   1. 検索ボックスに**KAReadSourceStreamWriteSinkStream**(前のセクションで作成したポリシー) と入力します。

   1. [**KAReadInputStreamWriteOutputStream**] ポリシーを選択し、[**ポリシーのアタッチ**] を選択します。

これで、アプリケーションがリソースにアクセスするために使用するサービスの実行ロールが作成されました。新しいロールの ARN を書き留めておきます。

ロールを作成する手順については、*IAM ユーザーガイド*の [IAM ロールの作成 (コンソール)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console)を参照してください。

#### Apache Flink アプリケーション用 Managed Serviceの作成
<a name="get-started-exercise-7-cli-create"></a>

1. 次の JSON コードを `create_request.json` という名前のファイルに保存します。サンプルロールの ARN を、前に作成したロールの ARN に置き換えます。バケット ARN のサフィックス (`username`) を、前のセクションで選択したサフィックスに置き換えます。サービス実行ロールのサンプルのアカウント ID (`012345678901`) を、自分のアカウント ID に置き換えます。

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. 前述のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) アクションを実行し、アプリケーションを作成します。

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

これでアプリケーションが作成されました。次のステップでは、アプリケーションを起動します。

#### アプリケーションの起動
<a name="get-started-exercise-7-cli-start"></a>

このセクションでは、[https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) アクションを使用してアプリケーションを起動します。

**アプリケーションを起動するには**

1. 次の JSON コードを `start_request.json` という名前のファイルに保存します。

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. 前述のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) アクションを実行し、アプリケーションを起動します。

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

アプリケーションが実行されます。Amazon CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。

#### アプリケーションの停止
<a name="get-started-exercise-7-cli-stop"></a>

このセクションでは、[https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) アクションを使用してアプリケーションを停止します。

**アプリケーションを停止するには**

1. 次の JSON コードを `stop_request.json` という名前のファイルに保存します。

   ```
   {"ApplicationName": "test"
   }
   ```

1. 次のリクエストを指定して [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) アクションを実行し、アプリケーションを停止します。

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

アプリケーションが停止します。