

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon EMR クラスターのインスタンス容量不足のイベントに対応する
<a name="emr-events-response-insuff-capacity"></a>

## 概要:
<a name="emr-events-response-insuff-capacity-overview"></a>

Amazon EMR クラスターは、選択されたアベイラビリティーゾーンにクラスターの起動またはサイズ変更のリクエストに対応するのに十分な容量がない場合にイベントコード `EC2 provisioning - Insufficient Instance Capacity` を返します。Amazon EMR で容量不足の例外が繰り返し発生し、クラスターの起動またはクラスターのサイズ変更操作のプロビジョニングリクエストに対応できない場合、インスタンスグループとインスタンスフリートの両方でイベントが定期的に発生します。

このページでは、EMR クラスターでこのイベントタイプが発生した場合に最も適切に対応する方法について説明します。

## 容量不足イベントへの推奨対応
<a name="emr-events-response-insuff-capacity-rec"></a>

容量不足のイベントには、次のいずれかの方法で対応することをお勧めします。
+ 容量が回復するまで待ちます。容量は頻繁に変化するため、容量不足の例外は自然に解消される可能性があります。Amazon EC2 の容量が利用可能になり次第、クラスターが起動またはサイズ変更が完了します。
+ または、クラスターを終了して、インスタンスタイプの設定を変更し、更新されたクラスター設定のリクエストを使用して新しいクラスターを作成します。詳細については、「[Amazon EMR クラスターのアベイラビリティーゾーンの柔軟性](emr-flexibility.md)」を参照してください。

次のセクションで説明するように、容量不足のイベントに対するルールや自動応答を設定することも可能です。

## 容量不足のイベントからの自動回復
<a name="emr-events-response-insuff-capacity-ex"></a>

`EC2 provisioning - Insufficient Instance Capacity` などのイベントコードを含む Amazon EMR イベントに対応する自動化機能を構築できます。たとえば、次の AWS Lambda 関数は、オンデマンドインスタンスを使用するインスタンスグループを使用して EMR クラスターを終了し、元のリクエストとは異なるインスタンスタイプを含むインスタンスグループを使用して新しい EMR クラスターを作成します。

以下の条件によって自動化プロセスが開始されます。
+ プライマリノードまたはコアノードで容量不足のイベントが 20 分を超えて発生しています。
+ クラスターは **[準備完了]** または **[待機中]** 状態ではありません。EMR クラスターの状態の詳細については、「[クラスターライフサイクルについて](emr-overview.md#emr-overview-cluster-lifecycle)」を参照してください。

**注記**  
容量不足の例外に対する自動化プロセスを構築するときは、容量不足のイベントは回復可能である点を考慮してください。容量は頻繁に変化し、Amazon EC2 の容量が利用可能になり次第、クラスターはサイズ変更を再開したり、操作を開始したりします。

**Example 容量不足のイベントに対応する関数**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR

import json
import boto3
import datetime
from datetime import timezone

INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning"
INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = (
    "EC2 provisioning - Insufficient Instance Capacity"
)
ALLOWED_INSTANCE_TYPES_TO_USE = [
    "m5.xlarge",
    "c5.xlarge",
    "m5.4xlarge",
    "m5.2xlarge",
    "t3.xlarge",
]
CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"]
CLUSTER_START_SLA = 20

CLIENT = boto3.client("emr", region_name="us-east-1")

# checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity'
def is_insufficient_capacity_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE
        )


# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceGroupType could be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]
    clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][
        "CreationDateTime"
    ]
    clusterState = describeClusterResponse["Cluster"]["Status"]["State"]

    now = datetime.datetime.now()
    now = now.replace(tzinfo=timezone.utc)
    isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta(
        minutes=CLUSTER_START_SLA
    )

    # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER),
    # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING
    if (
        (instanceGroupType == "CORE" or instanceGroupType == "MASTER")
        and isClusterStartSlaBreached
        and clusterState not in CLUSTER_START_ACCEPTABLE_STATES
    ):
        return True
    else:
        return False


# Choose item from the list except the exempt value
def choice_excluding(exempt):
    for i in ALLOWED_INSTANCE_TYPES_TO_USE:
        if i != exempt:
            return i


# Create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceGroupType cloud be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]

    # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"

    # Select new instance types to include in the new createCluster request
    instanceTypeForMaster = (
        instanceTypesFromOriginalRequestMaster
        if instanceGroupType != "MASTER"
        else choice_excluding(instanceTypesFromOriginalRequestMaster)
    )
    instanceTypeForCore = (
        instanceTypesFromOriginalRequestCore
        if instanceGroupType != "CORE"
        else choice_excluding(instanceTypesFromOriginalRequestCore)
    )

    print("Starting to create cluster...")
    instances = {
        "InstanceGroups": [
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForMaster,
                "Market": "ON_DEMAND",
                "Name": "Master",
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForCore,
                "Market": "ON_DEMAND",
                "Name": "Core",
            },
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )

    return response["JobFlowId"]


# Terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")


def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response


def lambda_handler(event, context):
    if is_insufficient_capacity_event(event):
        print(
            "Received insufficient capacity event for instanceGroup, clusterId: "
            + event["detail"]["clusterId"]
        )

        describeClusterResponse = describe_cluster(event)

        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)

            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )

    else:
        print("Received event is not insufficient capacity event, skipping")
```