

# Tutorial: Analyze real-time stock data using Amazon Managed Service for Apache Flink
<a name="tutorial-stock-data"></a>

The scenario for this tutorial involves ingesting stock trades into a data stream and writing a simple [Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html) application that performs calculations on the stream. You will learn how to send a stream of records to Kinesis Data Streams and implement an application that consumes and processes the records in near real time.

With Amazon Managed Service for Apache Flink, you can use Java or Scala to process and analyze streaming data. The service lets you author and run Java or Scala code against streaming sources to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

You can build Flink applications in Managed Service for Apache Flink using open-source libraries based on [Apache Flink](https://flink.apache.org/). Apache Flink is a popular framework and engine for processing data streams. 

**Important**  
After you create two data streams and an application, your account incurs nominal charges for Kinesis Data Streams and Managed Service for Apache Flink usage because they are not eligible for the AWS Free Tier. When you are finished with this application, delete your AWS resources to stop incurring charges. 

The code does not access actual stock market data, but instead simulates the stream of stock trades. It does so by using a random stock trade generator. If you have access to a real-time stream of stock trades, you might be interested in deriving useful, timely statistics from that stream. For example, you might want to perform a sliding window analysis where you determine the most popular stock purchased in the last 5 minutes. Or you might want a notification whenever there is a sell order that is too large (that is, it has too many shares). You can extend the code in this series to provide such functionality.

The examples shown use the US West (Oregon) Region, but they work on any of the [AWS Regions that support Managed Service for Apache Flink](https://docs.aws.amazon.com/general/latest/gr/rande.html#ka_region).

**Topics**
+ [

## Prerequisites for completing the exercises
](#setting-up-prerequisites)
+ [

# Set up an AWS account and create an administrator user
](setting-up.md)
+ [

# Set up the AWS Command Line Interface (AWS CLI)
](setup-awscli.md)
+ [

# Create and run a Managed Service for Apache Flink application
](get-started-exercise.md)

## Prerequisites for completing the exercises
<a name="setting-up-prerequisites"></a>

To complete the steps in this guide, you must have the following:
+ [Java Development Kit](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) (JDK) version 8. Set the `JAVA_HOME` environment variable to point to your JDK install location.
+ We recommend that you use a development environment (such as [Eclipse Java Neon](http://www.eclipse.org/downloads/packages/release/neon/3) or [IntelliJ Idea](https://www.jetbrains.com/idea/)) to develop and compile your application.
+ [Git Client.](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) Install the Git client if you haven't already.
+ [Apache Maven Compiler Plugin](https://maven.apache.org/plugins/maven-compiler-plugin/). Maven must be in your working path. To test your Apache Maven installation, enter the following:

  ```
  $ mvn -version
  ```

To get started, go to [Set up an AWS account and create an administrator user](setting-up.md).

# Set up an AWS account and create an administrator user
<a name="setting-up"></a>

Before you use Amazon Managed Service for Apache Flink for the first time, complete the following tasks:

1. [Sign up for AWS](#setting-up-signup)

1. [Create an IAM user](#setting-up-iam)

## Sign up for AWS
<a name="setting-up-signup"></a>

When you sign up for Amazon Web Services (AWS), your AWS account is automatically signed up for all services in AWS, including Amazon Managed Service for Apache Flink. You are charged only for the services that you use.

With Managed Service for Apache Flink, you pay only for the resources that you use. If you are a new AWS customer, you can get started with Managed Service for Apache Flink for free. For more information, see [AWS Free Tier](https://aws.amazon.com/free/).

If you already have an AWS account, skip to the next task. If you don't have an AWS account, follow these steps to create one.

**To create an AWS account**

1. Open [https://portal.aws.amazon.com/billing/signup](https://portal.aws.amazon.com/billing/signup).

1. Follow the online instructions.

   Part of the sign-up procedure involves receiving a phone call or text message and entering a verification code on the phone keypad.

   When you sign up for an AWS account, an *AWS account root user* is created. The root user has access to all AWS services and resources in the account. As a security best practice, assign administrative access to a user, and use only the root user to perform [tasks that require root user access](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_root-user.html#root-user-tasks).

Note your AWS account ID because you'll need it for the next task.

## Create an IAM user
<a name="setting-up-iam"></a>

Services in AWS, such as Amazon Managed Service for Apache Flink, require that you provide credentials when you access them. This is so that the service can determine whether you have permissions to access the resources that are owned by that service. The AWS Management Console requires that you enter your password. 

You can create access keys for your AWS account to access the AWS Command Line Interface (AWS CLI) or API. However, we don't recommend that you access AWS using the credentials for your AWS account. Instead, we recommend that you use AWS Identity and Access Management (IAM). Create an IAM user, add the user to an IAM group with administrative permissions, and then grant administrative permissions to the IAM user that you created. You can then access AWS using a special URL and that IAM user's credentials.

If you signed up for AWS, but you haven't created an IAM user for yourself, you can create one using the IAM console.

The getting started exercises in this guide assume that you have a user (`adminuser`) with administrator permissions. Follow the procedure to create `adminuser` in your account.

**To create a group for administrators**

1. Sign in to the AWS Management Console and open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. In the navigation pane, choose **Groups**, and then choose **Create New Group**.

1. For **Group Name**, enter a name for your group, such as **Administrators**, and then choose **Next Step**.

1. In the list of policies, select the check box next to the **AdministratorAccess** policy. You can use the **Filter** menu and the **Search** box to filter the list of policies.

1. Choose **Next Step**, and then choose **Create Group**.

Your new group is listed under **Group Name**.

**To create an IAM user for yourself, add it to the Administrators group, and create a password**

1. In the navigation pane, choose **Users**, and then choose **Add user**.

1. In the **User name** box, enter a user name.

1. Choose both **Programmatic access** and **AWS Management Console access**.

1. Choose **Next: Permissions**.

1. Select the check box next to the **Administrators** group. Then choose **Next: Review**.

1. Choose **Create user**.

**To sign in as the new IAM user**

1. Sign out of the AWS Management Console.

1. Use the following URL format to sign in to the console:

   `https://aws_account_number.signin.aws.amazon.com/console/`

   The *aws\$1account\$1number* is your AWS account ID without any hyphens. For example, if your AWS account ID is 1234-5678-9012, replace *aws\$1account\$1number* with **123456789012**. For information about how to find your account number, see [Your AWS Account ID and Its Alias](https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html) in the *IAM User Guide*.

1. Enter the IAM user name and password that you just created. When you're signed in, the navigation bar displays *your\$1user\$1name* @ *your\$1aws\$1account\$1id*.

**Note**  
If you don't want the URL for your sign-in page to contain your AWS account ID, you can create an account alias.

**To create or remove an account alias**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. On the navigation pane, choose **Dashboard**.

1. Find the IAM users sign-in link.

1. To create the alias, choose **Customize**. Enter the name you want to use for your alias, and then choose **Yes, Create**.

1. To remove the alias, choose **Customize**, and then choose **Yes, Delete**. The sign-in URL reverts to using your AWS account ID.

To sign in after you create an account alias, use the following URL:

`https://your_account_alias.signin.aws.amazon.com/console/`

To verify the sign-in link for IAM users for your account, open the IAM console and check under **IAM users sign-in link** on the dashboard.

For more information about IAM, see the following:
+ [AWS Identity and Access Management (IAM)](https://aws.amazon.com/iam/)
+ [Getting started with IAM](https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started.html)
+ [IAM User Guide](https://docs.aws.amazon.com/IAM/latest/UserGuide/)

## Next step
<a name="setting-up-next-step-2"></a>

[Set up the AWS Command Line Interface (AWS CLI)](setup-awscli.md)

# Set up the AWS Command Line Interface (AWS CLI)
<a name="setup-awscli"></a>

In this step, you download and configure the AWS CLI to use with Amazon Managed Service for Apache Flink.

**Note**  
The getting started exercises in this guide assume that you are using administrator credentials (`adminuser`) in your account to perform the operations.

**Note**  
If you already have the AWS CLI installed, you might need to upgrade to get the latest functionality. For more information, see [ Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/installing.html) in the *AWS Command Line Interface User Guide*. To check the version of the AWS CLI, run the following command:  

```
aws --version
```
The exercises in this tutorial require the following AWS CLI version or later:  

```
aws-cli/1.16.63
```

**To set up the AWS CLI**

1. Download and configure the AWS CLI. For instructions, see the following topics in the *AWS Command Line Interface User Guide*: 
   + [Installing the AWS Command Line Interface](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-set-up.html)
   + [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html)

1. Add a named profile for the administrator user in the AWS CLI config file. You use this profile when executing the AWS CLI commands. For more information about named profiles, see [Named Profiles](https://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html) in the *AWS Command Line Interface User Guide*.

   ```
   [profile adminuser]
   aws_access_key_id = adminuser access key ID
   aws_secret_access_key = adminuser secret access key
   region = aws-region
   ```

   For a list of available AWS Regions, see [AWS Regions and Endpoints](https://docs.aws.amazon.com/general/latest/gr/rande.html) in the *Amazon Web Services General Reference*.

1. Verify the setup by entering the following help command at the command prompt: 

   ```
   aws help
   ```

After you set up an AWS account and the AWS CLI, you can try the next exercise, in which you configure a sample application and test the end-to-end setup.

## Next step
<a name="setting-up-next-step-3"></a>

[Create and run a Managed Service for Apache Flink application](get-started-exercise.md)

# Create and run a Managed Service for Apache Flink application
<a name="get-started-exercise"></a>

In this exercise, you create a Managed Service for Apache Flink application with data streams as a source and a sink.

**Topics**
+ [

## Create two Amazon Kinesis data streams
](#get-started-exercise-1)
+ [

## Write Sample Records to the Input Stream
](#get-started-exercise-2)
+ [

## Download and examine the Apache Flink streaming Java code
](#get-started-exercise-5)
+ [

## Compile the application code
](#get-started-exercise-5.5)
+ [

## Upload the Apache Flink streaming Java code
](#get-started-exercise-6)
+ [

## Create and run the Managed Service for Apache Flink application
](#get-started-exercise-7)

## Create two Amazon Kinesis data streams
<a name="get-started-exercise-1"></a>

Before you create a Amazon Managed Service for Apache Flink for this exercise, create two Kinesis data streams (`ExampleInputStream` and `ExampleOutputStream`). Your application uses these streams for the application source and destination streams.

You can create these streams using either the Amazon Kinesis console or the following AWS CLI command. For console instructions, see [Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html). 

**To create the data streams (AWS CLI)**

1. To create the first stream (`ExampleInputStream`), use the following Amazon Kinesis `create-stream` AWS CLI command.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

1. To create the second stream that the application uses to write output, run the same command, changing the stream name to `ExampleOutputStream`.

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-west-2 \
   --profile adminuser
   ```

## Write Sample Records to the Input Stream
<a name="get-started-exercise-2"></a>

In this section, you use a Python script to write sample records to the stream for the application to process.

**Note**  
This section requires the [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Create a file named `stock.py` with the following contents:

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

1. Later in the tutorial, you run the `stock.py` script to send data to the application. 

   ```
   $ python stock.py
   ```

## Download and examine the Apache Flink streaming Java code
<a name="get-started-exercise-5"></a>

The Java application code for this examples is available from GitHub. To download the application code, do the following:

1. Clone the remote repository with the following command:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
   ```

1. Navigate to the `GettingStarted` directory.

The application code is located in the `CustomSinkStreamingJob.java` and `CloudWatchLogSink.java` files. Note the following about the application code:
+ The application uses a Kinesis source to read from the source stream. The following snippet creates the Kinesis sink:

  ```
  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
                  new SimpleStringSchema(), inputProperties));
  ```

## Compile the application code
<a name="get-started-exercise-5.5"></a>

In this section, you use the Apache Maven compiler to create the Java code for the application. For information about installing Apache Maven and the Java Development Kit (JDK), see [Prerequisites for completing the exercises](tutorial-stock-data.md#setting-up-prerequisites).

Your Java application requires the following components:
+ A [Project Object Model (pom.xml)](https://maven.apache.org/guides/introduction/introduction-to-the-pom.html) file. This file contains information about the application's configuration and dependencies, including the Amazon Managed Service for Apache Flink libraries.
+ A `main` method that contains the application's logic.

**Note**  
**To use the Kinesis connector for the following application, you must download the source code for the connector and build it as described in the [Apache Flink documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/connectors/kinesis.html).**

**To create and compile the application code**

1. Create a Java/Maven application in your development environment. For information about creating an application, see the documentation for your development environment:
   + [ Creating your first Java project (Eclipse Java Neon)](https://help.eclipse.org/neon/index.jsp?topic=%2Forg.eclipse.jdt.doc.user%2FgettingStarted%2Fqs-3.htm)
   + [ Creating, Running and Packaging Your First Java Application (IntelliJ Idea)](https://www.jetbrains.com/help/idea/creating-and-running-your-first-java-application.html)

1. Use the following code for a file named `StreamingJob.java`. 

   ```
    
   package com.amazonaws.services.kinesisanalytics;
   
   import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
   import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
   import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
   
   import java.io.IOException;
   import java.util.Map;
   import java.util.Properties;
   
   public class StreamingJob {
   
       private static final String region = "us-east-1";
       private static final String inputStreamName = "ExampleInputStream";
       private static final String outputStreamName = "ExampleOutputStream";
   
       private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
           Properties inputProperties = new Properties();
           inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
   
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
       }
   
       private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
               throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
                   applicationProperties.get("ConsumerConfigProperties")));
       }
   
       private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
           Properties outputProperties = new Properties();
           outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
           outputProperties.setProperty("AggregationEnabled", "false");
   
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties);
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
           Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
           FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(),
                   applicationProperties.get("ProducerConfigProperties"));
   
           sink.setDefaultStream(outputStreamName);
           sink.setDefaultPartition("0");
           return sink;
       }
   
       public static void main(String[] args) throws Exception {
           // set up the streaming execution environment
           final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * DataStream<String> input = createSourceFromApplicationProperties(env);
            */
   
           DataStream<String> input = createSourceFromStaticConfig(env);
   
           /*
            * if you would like to use runtime configuration properties, uncomment the
            * lines below
            * input.addSink(createSinkFromApplicationProperties())
            */
   
           input.addSink(createSinkFromStaticConfig());
   
           env.execute("Flink Streaming Java API Skeleton");
       }
   }
   ```

   Note the following about the preceding code example:
   + This file contains the `main` method that defines the application's functionality.
   + Your application creates source and sink connectors to access external resources using a `StreamExecutionEnvironment` object. 
   + The application creates source and sink connectors using static properties. To use dynamic application properties, use the `createSourceFromApplicationProperties` and `createSinkFromApplicationProperties` methods to create the connectors. These methods read the application's properties to configure the connectors.

1. To use your application code, you compile and package it into a JAR file. You can compile and package your code in one of two ways:
   + Use the command line Maven tool. Create your JAR file by running the following command in the directory that contains the `pom.xml` file:

     ```
     mvn package
     ```
   + Use your development environment. See your development environment documentation for details.

   You can either upload your package as a JAR file, or you can compress your package and upload it as a ZIP file. If you create your application using the AWS CLI, you specify your code content type (JAR or ZIP).

1. If there are errors while compiling, verify that your `JAVA_HOME` environment variable is correctly set.

If the application compiles successfully, the following file is created:

`target/java-getting-started-1.0.jar`

## Upload the Apache Flink streaming Java code
<a name="get-started-exercise-6"></a>

In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and upload your application code.

**To upload the application code**

1. Open the Amazon S3 console at [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Choose **Create bucket**.

1. Enter **ka-app-code-*<username>*** in the **Bucket name** field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose **Next**.

1. In the **Configure options** step, keep the settings as they are, and choose **Next**.

1. In the **Set permissions** step, keep the settings as they are, and choose **Next**.

1. Choose **Create bucket**.

1. In the Amazon S3 console, choose the **ka-app-code-*<username>*** bucket, and choose **Upload**.

1. In the **Select files** step, choose **Add files**. Navigate to the `java-getting-started-1.0.jar` file that you created in the previous step. Choose **Next**.

1. In the **Set permissions** step, keep the settings as they are. Choose **Next**.

1. In the **Set properties** step, keep the settings as they are. Choose **Upload**.

Your application code is now stored in an Amazon S3 bucket where your application can access it.

## Create and run the Managed Service for Apache Flink application
<a name="get-started-exercise-7"></a>

You can create and run a Managed Service for Apache Flink application using either the console or the AWS CLI.

**Note**  
When you create the application using the console, your AWS Identity and Access Management (IAM) and Amazon CloudWatch Logs resources are created for you. When you create the application using the AWS CLI, you create these resources separately.

**Topics**
+ [

### Create and run the application (Console)
](#get-started-exercise-7-console)
+ [

### Create and run the application (AWS CLI)
](#get-started-exercise-7-cli)

### Create and run the application (Console)
<a name="get-started-exercise-7-console"></a>

Follow these steps to create, configure, update, and run the application using the console.

#### Create the application
<a name="get-started-exercise-7-console-create"></a>

1. Open the Kinesis console at [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. On the Amazon Kinesis dashboard, choose **Create analytics application**.

1. On the **Kinesis Analytics - Create application** page, provide the application details as follows:
   + For **Application name**, enter **MyApplication**.
   + For **Description**, enter **My java test app**.
   + For **Runtime**, choose **Apache Flink 1.6**.

1. For **Access permissions**, choose **Create / update IAM role `kinesis-analytics-MyApplication-us-west-2`**.

1. Choose **Create application**.

**Note**  
When you create an Amazon Managed Service for Apache Flink application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:  
Policy: `kinesis-analytics-service-MyApplication-us-west-2`
Role: `kinesis-analytics-MyApplication-us-west-2`

#### Edit the IAM policy
<a name="get-started-exercise-7-console-iam"></a>

Edit the IAM policy to add permissions to access the Kinesis data streams.

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. Choose **Policies**. Choose the **`kinesis-analytics-service-MyApplication-us-west-2`** policy that the console created for you in the previous section. 

1. On the **Summary** page, choose **Edit policy**. Choose the **JSON** tab.

1. Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (*012345678901*) with your account ID.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

#### Configure the application
<a name="get-started-exercise-7-console-configure"></a>

1. On the **MyApplication** page, choose **Configure**.

1. On the **Configure application** page, provide the **Code location**:
   + For **Amazon S3 bucket**, enter **ka-app-code-*<username>***.
   + For **Path to Amazon S3 object**, enter **java-getting-started-1.0.jar**.

1. Under **Access to application resources**, for **Access permissions**, choose **Create / update IAM role `kinesis-analytics-MyApplication-us-west-2`**.

1. Under **Properties**, for **Group ID**, enter **ProducerConfigProperties**.

1. Enter the following application properties and values:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/streams/latest/dev/get-started-exercise.html)

1. Under **Monitoring**, ensure that the **Monitoring metrics level** is set to **Application**.

1. For **CloudWatch logging**, select the **Enable** check box.

1. Choose **Update**.

**Note**  
When you choose to enable CloudWatch logging, Managed Service for Apache Flink creates a log group and log stream for you. The names of these resources are as follows:   
Log group: `/aws/kinesis-analytics/MyApplication`
Log stream: `kinesis-analytics-log-stream`

#### Run the application
<a name="get-started-exercise-7-console-run"></a>

1. On the **MyApplication** page, choose **Run**. Confirm the action.

1. When the application is running, refresh the page. The console shows the **Application graph**.

#### Stop the application
<a name="get-started-exercise-7-console-stop"></a>

On the **MyApplication** page, choose **Stop**. Confirm the action.

#### Update the application
<a name="get-started-exercise-7-console-update"></a>

Using the console, you can update application settings such as application properties, monitoring settings, and the location or file name of the application JAR. You can also reload the application JAR from the Amazon S3 bucket if you need to update the application code.

On the **MyApplication** page, choose **Configure**. Update the application settings and choose **Update**.

### Create and run the application (AWS CLI)
<a name="get-started-exercise-7-cli"></a>

In this section, you use the AWS CLI to create and run the Managed Service for Apache Flink application. Managed Service for Apache Flink uses the `kinesisanalyticsv2` AWS CLI command to create and interact with Managed Service for Apache Flink applications.

#### Create a Permissions Policy
<a name="get-started-exercise-7-cli-policy"></a>

First, you create a permissions policy with two statements: one that grants permissions for the `read` action on the source stream, and another that grants permissions for `write` actions on the sink stream. You then attach the policy to an IAM role (which you create in the next section). Thus, when Managed Service for Apache Flink assumes the role, the service has the necessary permissions to read from the source stream and write to the sink stream.

Use the following code to create the `KAReadSourceStreamWriteSinkStream` permissions policy. Replace `username` with the user name that you used to create the Amazon S3 bucket to store the application code. Replace the account ID in the Amazon Resource Names (ARNs) (`012345678901`) with your account ID.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": ["arn:aws:s3:::ka-app-code-username",
                "arn:aws:s3:::ka-app-code-username/*"
            ]
        },
        {
            "Sid": "ReadInputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
        },
        {
            "Sid": "WriteOutputStream",
            "Effect": "Allow",
            "Action": "kinesis:*",
            "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
        }
    ]
}
```

------

For step-by-step instructions to create a permissions policy, see [Tutorial: Create and Attach Your First Customer Managed Policy](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_managed-policies.html#part-two-create-policy) in the *IAM User Guide*.

**Note**  
To access other AWS services, you can use the AWS SDK for Java. Managed Service for Apache Flink automatically sets the credentials required by the SDK to those of the service execution IAM role that is associated with your application. No additional steps are needed.

#### Create an IAM Role
<a name="get-started-exercise-7-cli-role"></a>

In this section, you create an IAM role that Managed Service for Apache Flink can assume to read a source stream and write to the sink stream.

Managed Service for Apache Flink cannot access your stream without permissions. You grant these permissions via an IAM role. Each IAM role has two policies attached. The trust policy grants Managed Service for Apache Flink permission to assume the role, and the permissions policy determines what Managed Service for Apache Flink can do after assuming the role.

You attach the permissions policy that you created in the preceding section to this role.

**To create an IAM role**

1. Open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. In the navigation pane, choose **Roles**, **Create Role**.

1. Under **Select type of trusted identity**, choose **AWS Service**. Under **Choose the service that will use this role**, choose **Kinesis**. Under **Select your use case**, choose **Kinesis Analytics**.

   Choose **Next: Permissions**.

1. On the **Attach permissions policies** page, choose **Next: Review**. You attach permissions policies after you create the role.

1. On the **Create role** page, enter **KA-stream-rw-role** for the **Role name**. Choose **Create role**.

   Now you have created a new IAM role called `KA-stream-rw-role`. Next, you update the trust and permissions policies for the role.

1. Attach the permissions policy to the role.
**Note**  
For this exercise, Managed Service for Apache Flink assumes this role for both reading data from a Kinesis data stream (source) and writing output to another Kinesis data stream. So you attach the policy that you created in the previous step, [Create a Permissions Policy](#get-started-exercise-7-cli-policy).

   1. On the **Summary** page, choose the **Permissions** tab.

   1. Choose **Attach Policies**.

   1. In the search box, enter **KAReadSourceStreamWriteSinkStream** (the policy that you created in the previous section).

   1. Choose the **KAReadInputStreamWriteOutputStream** policy, and choose **Attach policy**.

You now have created the service execution role that your application uses to access resources. Make a note of the ARN of the new role.

For step-by-step instructions for creating a role, see [Creating an IAM Role (Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user.html#roles-creatingrole-user-console) in the *IAM User Guide*.

#### Create the Managed Service for Apache Flink Application
<a name="get-started-exercise-7-cli-create"></a>

1. Save the following JSON code to a file named `create_request.json`. Replace the sample role ARN with the ARN for the role that you created previously. Replace the bucket ARN suffix (`username`) with the suffix that you chose in the previous section. Replace the sample account ID (`012345678901`) in the service execution role with your account ID.

   ```
   {
       "ApplicationName": "test",
       "ApplicationDescription": "my java test app",
       "RuntimeEnvironment": "FLINK-1_6",
       "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role",
       "ApplicationConfiguration": {
           "ApplicationCodeConfiguration": {
               "CodeContent": {
                   "S3ContentLocation": {
                       "BucketARN": "arn:aws:s3:::ka-app-code-username",
                       "FileKey": "java-getting-started-1.0.jar"
                   }
               },
               "CodeContentType": "ZIPFILE"
           },
           "EnvironmentProperties":  { 
            "PropertyGroups": [ 
               { 
                  "PropertyGroupId": "ProducerConfigProperties",
                  "PropertyMap" : {
                       "flink.stream.initpos" : "LATEST",
                       "aws.region" : "us-west-2",
                       "AggregationEnabled" : "false"
                  }
               },
               { 
                  "PropertyGroupId": "ConsumerConfigProperties",
                  "PropertyMap" : {
                       "aws.region" : "us-west-2"
                  }
               }
            ]
         }
       }
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_CreateApplication.html) action with the preceding request to create the application: 

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
   ```

The application is now created. You start the application in the next step.

#### Start the Application
<a name="get-started-exercise-7-cli-start"></a>

In this section, you use the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) action to start the application.

**To start the application**

1. Save the following JSON code to a file named `start_request.json`.

   ```
   {
       "ApplicationName": "test",
       "RunConfiguration": {
           "ApplicationRestoreConfiguration": { 
            "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
            }
       }
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StartApplication.html) action with the preceding request to start the application:

   ```
   aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
   ```

The application is now running. You can check the Managed Service for Apache Flink metrics on the Amazon CloudWatch console to verify that the application is working.

#### Stop the Application
<a name="get-started-exercise-7-cli-stop"></a>

In this section, you use the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) action to stop the application.

**To stop the application**

1. Save the following JSON code to a file named `stop_request.json`.

   ```
   {"ApplicationName": "test"
   }
   ```

1. Execute the [https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_StopApplication.html) action with the following request to stop the application:

   ```
   aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
   ```

The application is now stopped.