

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Step Functions 创建和管理 Amazon EMR 集群
<a name="connect-emr"></a>

了解如何使用提供的亚马逊 EMR 服务集成AWS Step Functions与亚马逊 EMR 集成。 APIs服务集成与相应 APIs 的 Amazon EMR 类似 APIs，但传递的字段和返回的响应有所不同。

要了解如何在 Step Functions 中与AWS服务集成，请参阅[集成 服务](integrate-services.md)和[在 Step Functions 中将参数传递给服务 API](connect-parameters.md)。

**经优化的 Amazon EMR 集成的主要功能**  
经过优化的亚马逊 EMR 服务集成有一组自定义的，用于封装底层 Amazon EMR APIs APIs，如下所述。因此，它与 Amazon EMR AWS SDK 服务集成有很大不同。
支持[运行作业 (.sync)](connect-to-resource.md#connect-sync) 集成模式。

如果执行停止，Step Functions 不会自动终止 Amazon EMR 集群。如果您的状态机在 Amazon EMR 集群终止之前停止，则您的集群可能会无限期地继续运行，并且可能会产生额外费用。为避免这种情况，请确保您创建的任何 Amazon EMR 集群都已正确终止。有关更多信息，请参阅:
+ 在 Amazon EMR 用户指南中[控制集群终止](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-termination.html)。
+ 服务集成模式[运行作业 (.sync)](connect-to-resource.md#connect-sync) 部分。

**注意**  
自 `emr-5.28.0` 起，您可以在创建集群时指定参数 `StepConcurrencyLevel`，以允许在单个集群上并行运行多个步骤。您可以使用 Step Functions `Map` 和 `Parallel` 状态将工作并行提交到集群。

亚马逊 EMR 服务集成的可用性取决于亚马逊 EMR 的可用性。 APIs请查看 [Amazon EMR](https://docs.aws.amazon.com//govcloud-us/latest/UserGuide/govcloud-emr.html) 文档，了解特殊区域的限制。

**注意**  
为了与 Amazon EMR 集成，Step Functions 在前 10 分钟具有硬编码的 60 秒作业轮询频率，10 分钟后为 300 秒作业轮询频率。

## 优化了亚马逊 EMR APIs
<a name="connect-emr-api"></a>

下表描述了每个亚马逊 EMR 服务集成 API 与相应的亚马逊 EMR 之间的区别。 APIs


| Amazon EMR 服务集成 API | 相应的 EMR API | 差异 | 
| --- | --- | --- | 
| CreateCluster 创建并开始运行集群（作业流程）。 Amazon EMR 与一种独特类型的 IAM 角色（称为服务相关角色）直接关联。要使 `createCluster` 和 `createCluster.sync` 起作用，您必须配置必要的权限以创建与服务关联的角色 `AWSServiceRoleForEMRCleanup`。有关此问题的更多信息，包括您可以添加到 IAM 权限策略的语句，请参阅[使用 Amazon EMR 的服务关联角色](https://docs.aws.amazon.com/emr/latest/ManagementGuide/using-service-linked-roles.html)。 | [runJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) | createCluster使用与相同的请求语法 [runJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html)，但以下语法除外：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/connect-emr.html)响应如下：<pre>{<br />  "ClusterId": "string"<br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "JobFlowId": "string"<br />}</pre>  | 
| createCluster.sync 创建并开始运行集群（作业流程）。 | [runJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) | 与 createCluster 相同，但等待集群达到 WAITING 状态。 | 
| setClusterTermination保护 锁定集群（任务流），这样集群中的 EC2 实例就不会因为用户干预、API 调用或任务流错误而终止。 | [setTerminationProtection](https://docs.aws.amazon.com/emr/latest/APIReference/API_SetTerminationProtection.html) | 请求使用：<pre>{<br />  "ClusterId": "string"<br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "JobFlowIds": ["string"]<br />}</pre>  | 
| terminateCluster 关闭集群（作业流程）。  | [terminateJobFlows](https://docs.aws.amazon.com/emr/latest/APIReference/API_TerminateJobFlows.html) | 请求使用：<pre>{<br />  "ClusterId": "string"<br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "JobFlowIds": ["string"]<br />}</pre> | 
| terminateCluster.sync关闭集群（作业流程）。 | [terminateJobFlows](https://docs.aws.amazon.com/emr/latest/APIReference/API_TerminateJobFlows.html) | 与 terminateCluster 相同，但等待集群终止。 | 
| addStep 向正在运行的集群添加新步骤。 另外，使用此 API 时，还能指定 `[ExecutionRoleArn](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#EMR-AddJobFlowSteps-request-ExecutionRoleArn)` 参数。 | [addJobFlow步骤](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html) | 请求使用密钥 "ClusterId"。Amazon EMR 使用 "JobFlowId"。请求使用单一步骤。<pre>{<br />  "Step": <"StepConfig object"><br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "Steps": [<StepConfig objects>]<br />}</pre> 响应如下：<pre>{<br />  "StepId": "string"<br />}</pre> Amazon EMR 返回以下内容：<pre>{<br />  "StepIds": [<strings>]<br />}</pre>  | 
| addStep.sync 向正在运行的集群添加新步骤。 另外，使用此 API 时，还能指定 `[ExecutionRoleArn](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html#EMR-AddJobFlowSteps-request-ExecutionRoleArn)` 参数。 | [addJobFlow步骤](https://docs.aws.amazon.com/emr/latest/APIReference/API_AddJobFlowSteps.html) | 与 addStep 相同，但等待步骤完成。 | 
| cancelStep 取消正在运行的集群中的一个待处理步骤。 | [cancelSteps](https://docs.aws.amazon.com/emr/latest/APIReference/API_CancelSteps.html) |  请求使用：<pre>{<br />  "StepId": "string"<br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "StepIds": [<strings>]<br />}</pre> 响应如下：<pre>{<br />  "CancelStepsInfo": <CancelStepsInfo object><br />}</pre> Amazon EMR 使用以下信息：<pre>{<br />  "CancelStepsInfoList": [<CancelStepsInfo objects>]<br />}</pre>  | 
| modifyInstanceFleetByName 使用指定的 `InstanceFleetName` 修改实例队列的目标按需容量和目标 Spot 容量。 | [modifyInstanceFleet](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceFleet.html) | 请求与 modifyInstanceFleet 相同，但以下情况除外：[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/step-functions/latest/dg/connect-emr.html)  | 
| modifyInstanceGroupByName 修改实例组的节点数和配置设置。 | [modifyInstanceGroups](https://docs.aws.amazon.com/emr/latest/APIReference/API_ModifyInstanceGroups.html) | 请求如下：<pre>{<br />  "ClusterId": "string",<br />  "InstanceGroup": <InstanceGroupModifyConfig object><br />}</pre> Amazon EMR 使用以下列表：<pre>{<br />  "ClusterId": ["string"],<br />  "InstanceGroups": [<InstanceGroupModifyConfig objects>]<br />}</pre> 在 `InstanceGroupModifyConfig` 对象中，不允许填写 `InstanceGroupId` 字段。 已添加一个新字段 `InstanceGroupName`。在运行时，通过调用 `ListInstanceGroups` 并解析结果，服务集成会自动确定 `InstanceGroupId`。  | 

## 工作流程示例
<a name="connect-emr-api-examples"></a>

以下内容包含一个创建集群的 `Task` 状态。

```
"Create_Cluster": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync",
    "Arguments": {
        "Name": "MyWorkflowCluster",
        "VisibleToAllUsers": true,
        "ReleaseLabel": "emr-5.28.0",
        "Applications": [
            {
                "Name": "Hive"
            }
        ],
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "LogUri": "s3n://aws-logs-account-id-us-east-1/elasticmapreduce/",
        "Instances": {
            "KeepJobFlowAliveWhenNoSteps": true,
            "InstanceFleets": [
                {
                    "InstanceFleetType": "MASTER",
                    "Name": "MASTER",   
                    "TargetOnDemandCapacity": 1,
                    "InstanceTypeConfigs": [
                        {
                            "InstanceType": "m4.xlarge"
                        }
                    ]
                },
                {
                    "InstanceFleetType": "CORE",
                    "Name": "CORE",
                    "TargetOnDemandCapacity": 1,
                    "InstanceTypeConfigs": [
                        {
                            "InstanceType": "m4.xlarge"
                        }
                    ]
                }
            ]
        }
    },
    "End": true
}
```

以下内容包括启用终止保护的 `Task` 状态。

```
"Enable_Termination_Protection": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection",
    "Arguments": {
        "ClusterId": "{% $ClusterId %}",
        "TerminationProtected": true
    },
    "End": true
}
```

以下内容包括向集群提交步骤的 `Task` 状态。

```
"Step_One": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
    "Arguments": {
        "ClusterId": "{% $ClusterId %}",
        "ExecutionRoleArn": "arn:aws:iam::account-id:role/myEMR-execution-role",
        "Step": {
            "Name": "The first step",
            "ActionOnFailure": "TERMINATE_CLUSTER",
            "HadoopJarStep": {
                "Jar": "command-runner.jar",
                "Args": [
                    "hive-script",
                    "--run-hive-script",
                    "--args",
                    "-f",
                    "s3://region.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q",
                    "-d",
                    "INPUT=s3://region.elasticmapreduce.samples",
                    "-d",
                    "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/"
                ]
            }
        }
    },
    "End": true
}
```

以下内容包括取消步骤的 `Task` 状态。

```
"Cancel_Step_One": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:cancelStep",
    "Arguments": {
        "ClusterId": "{% $ClusterId %}",
        "StepId": "{% $AddStepsResult.StepId %}"
    },
    "End": true
}
```

以下内容包括终止集群的 `Task` 状态。

```
"Terminate_Cluster": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync",
    "Arguments": {
        "ClusterId": "{% $ClusterId %}",
    },
    "End": true
}
```

以下内容包括为实例组向上或向下扩展集群的 `Task` 状态。

```
"ModifyInstanceGroupByName": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName",
    "Arguments": {
        "ClusterId": "j-account-id3",
        "InstanceGroupName": "MyCoreGroup",
        "InstanceGroup": {
            "InstanceCount": 8
        }
    },
    "End": true
}
```

以下内容包括为实例队列向上或向下扩展集群的 `Task` 状态。

```
"ModifyInstanceFleetByName": {
    "Type": "Task",
    "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName",
    "Arguments": {
        "ClusterId": "j-account-id3",
        "InstanceFleetName": "MyCoreFleet",
        "InstanceFleet": {
            "TargetOnDemandCapacity": 8,
            "TargetSpotCapacity": 0
        }
    },
    "End": true
}
```

## 用于调用 Amazon EMR 的 IAM 策略
<a name="emr-iam"></a>

以下示例模板展示了如何根据状态机定义中的资源AWS Step Functions生成 IAM 策略。有关更多信息，请参阅[Step Functions 如何为集成服务生成 IAM 策略](service-integration-iam-templates.md)和[探索 Step Functions 中的服务集成模式](connect-to-resource.md)。

### `addStep`
<a name="emr-iam-addstep"></a>

*静态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:AddJobFlowSteps",
                "elasticmapreduce:DescribeStep",
                "elasticmapreduce:CancelSteps"
            ],
            "Resource": [
                "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/clusterId"
            ]
        }
    ]
}
```

*动态资源*

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:AddJobFlowSteps",
        "elasticmapreduce:DescribeStep",
        "elasticmapreduce:CancelSteps"
      ],
      "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
    }
  ]
}
```

### `cancelStep`
<a name="emr-iam-cancelstep"></a>

*静态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "elasticmapreduce:CancelSteps",
            "Resource": [
                "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id"
            ]
        }
    ]
}
```

*动态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "elasticmapreduce:CancelSteps",
            "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
        }
    ]
}
```

### `createCluster`
<a name="emr-iam-createcluster"></a>

*静态资源*

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:RunJobFlow",
        "elasticmapreduce:DescribeCluster",
        "elasticmapreduce:TerminateJobFlows"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": [
        "arn:aws:iam::123456789012:role/myRoleName"
      ]
    }
  ]
}
```

### `setClusterTerminationProtection`
<a name="emr-iam-clusterterminationprotection"></a>

*静态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "elasticmapreduce:SetTerminationProtection",
            "Resource": [
                "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id"
            ]
        }
    ]
}
```

*动态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "elasticmapreduce:SetTerminationProtection",
            "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
        }
    ]
}
```

### `modifyInstanceFleetByName`
<a name="emr-iam-modifyinstancefleetbyname"></a>

*静态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ModifyInstanceFleet",
                "elasticmapreduce:ListInstanceFleets"
            ],
            "Resource": [
                "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id"
            ]
        }
    ]
}
```

*动态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ModifyInstanceFleet",
                "elasticmapreduce:ListInstanceFleets"
            ],
            "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
        }
    ]
}
```

### `modifyInstanceGroupByName`
<a name="emr-iam-modifyinstancegroupbyname"></a>

*静态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ModifyInstanceGroups",
                "elasticmapreduce:ListInstanceGroups"
            ],
            "Resource": [
                "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id"
            ]
        }
    ]
}
```

*动态资源*

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:ModifyInstanceGroups",
                "elasticmapreduce:ListInstanceGroups"
            ],
            "Resource": "*"
        }
    ]
}
```

### `terminateCluster`
<a name="emr-iam-terminatecluster"></a>

*静态资源*

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:TerminateJobFlows",
        "elasticmapreduce:DescribeCluster"
      ],
      "Resource": [
        "arn:aws:elasticmapreduce:us-east-1:123456789012:cluster/myCluster-id"
      ]
    }
  ]
}
```

*动态资源*

****  

```
{
  "Version":"2012-10-17",		 	 	 
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "elasticmapreduce:TerminateJobFlows",
        "elasticmapreduce:DescribeCluster"
      ],
      "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*"
    }
  ]
}
```