

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 回應 Amazon EMR 叢集執行個體機群調整大小逾時事件
<a name="emr-events-response-timeout-events"></a>

## 概觀
<a name="emr-events-response-timeout-events-overview"></a>

Amazon EMR 叢集會在執行個體機群叢集的調整大小操作時發出[事件](emr-manage-cloudwatch-events.md#emr-cloudwatch-instance-fleet-resize-events)。當 Amazon EMR 在逾時到期後停止佈建機群的 Spot 或隨需容量時，就會發出佈建逾時事件。逾時持續時間可由使用者設定，作為執行個體機群[調整大小規格](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html)的一部分。對於相同執行個體機群的連續調整大小，當目前調整大小操作的逾時到期時，Amazon EMR 會發出 `Spot provisioning timeout - continuing resize` 或 `On-Demand provisioning timeout - continuing resize` 事件。然後，它開始為機群的下一個調整大小操作佈建容量。

## 回應執行個體機群調整大小逾時事件
<a name="emr-events-response-timeout-events-rec"></a>

建議您使用下列其中一種方法來回應佈建逾時事件：
+ 重新檢視[調整大小規格](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html)，然後重試調整大小操作。隨著容量頻繁變化，一旦 Amazon EC2 容量變為可用，叢集就會成功調整大小。對於要求更嚴格 SLA 的作業的逾時持續時間，建議客戶設定較低的值。
+ 或者，您也可以：
  + 根據[執行個體和可用區域彈性的最佳實務](emr-flexibility.md#emr-flexibility-types)，啟動具有多樣化執行個體類型的新叢集，或者
  + 啟動具有隨需容量的叢集
+ 對於佈建逾時，繼續調整大小事件，您還可以等待處理調整大小操作。Amazon EMR 將繼續依序處理針對機群觸發的調整大小操作，並遵守設定的調整大小規格。

也可以為此事件設定規則或自動回應，如下一節所述。

## 從佈建逾時事件中自動復原
<a name="emr-events-response-timeout-events-ex"></a>

可以建置自動化來回應具有 `Spot Provisioning timeout` 事件代碼的 Amazon EMR 事件。例如，下列 AWS Lambda 函數會終止 EMR 叢集，它具有使用任務節點的 Spot 執行個體的執行個體機群，然後建立新的 EMR 叢集，其執行個體機群包含比原始請求更多樣化的執行個體類型。在此範例中，針對任務節點發出的 `Spot Provisioning timeout` 事件將觸發 Lambda 函數的執行。

**Example 用於回應 `Spot Provisioning timeout` 事件的範例函數**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR
 
import json
import boto3
import datetime
from datetime import timezone
 
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize"
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = (
    "Spot Provisioning timeout"
)
 
CLIENT = boto3.client("emr", region_name="us-east-1")
 
# checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout'
def is_spot_provisioning_timeout_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE
        )
 
 
# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceFleetType could be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # Check if instance fleet receiving Spot provisioning timeout event is TASK
    if (instanceFleetType == "TASK"):
        return True
    else:
        return False
 
 
# create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceFleetType cloud be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"
   
    # select new instance types to include in the new createCluster request
    instanceTypesForTask = [
        "m5.xlarge",
        "m5.2xlarge",
        "m5.4xlarge",
        "m5.8xlarge",
        "m5.12xlarge"
    ]
    
    print("Starting to create cluster...")
    instances = {
        "InstanceFleets": [
            {
                "InstanceFleetType":"MASTER",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestMaster,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"CORE",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestCore,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"TASK",
                "TargetOnDemandCapacity":0,
                "TargetSpotCapacity":100,
                "LaunchSpecifications":{},
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesForTask[0],
                        "WeightedCapacity":1,
                    },
                    {
                        'InstanceType': instanceTypesForTask[1],
                        "WeightedCapacity":2,
                    },
                    {
                        'InstanceType': instanceTypesForTask[2],
                        "WeightedCapacity":4,
                    },
                    {
                        'InstanceType': instanceTypesForTask[3],
                        "WeightedCapacity":8,
                    },
                    {
                        'InstanceType': instanceTypesForTask[4],
                        "WeightedCapacity":12,
                    }
                ],
                "ResizeSpecifications": {
                    "SpotResizeSpecification": {
                        "TimeoutDurationMinutes": 30
                    }
                }
            }
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )
 
    return response["JobFlowId"]
 
 
# terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")
 
 
def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response
 
 
def lambda_handler(event, context):
    if is_spot_provisioning_timeout_event(event):
        print(
            "Received spot provisioning timeout event for instanceFleet, clusterId: "
            + event["detail"]["clusterId"]
        )
 
        describeClusterResponse = describe_cluster(event)
 
        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)
 
            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )
 
    else:
        print("Received event is not spot provisioning timeout event, skipping")
```