

# Responding to CloudWatch events from Amazon EMR
<a name="emr-events-response"></a>

This section describes various ways that you can respond to actionable events that Amazon EMR emits as [CloudWatch event messages](emr-manage-cloudwatch-events.md). Ways you can respond to events include creating rules, setting alarms, and other responses. The sections that follow include links to procedures and recommneded responses to common evens.

**Topics**
+ [Creating rules for Amazon EMR events with CloudWatch](emr-events-cloudwatch-console.md)
+ [Setting alarms on CloudWatch metrics from Amazon EMR](UsingEMR_ViewingMetrics_Alarm.md)
+ [Responding to Amazon EMR cluster insufficient instance capacity events](emr-events-response-insuff-capacity.md)
+ [Responding to Amazon EMR cluster instance fleet resize timeout events](emr-events-response-timeout-events.md)

# Creating rules for Amazon EMR events with CloudWatch
<a name="emr-events-cloudwatch-console"></a>

Amazon EMR automatically sends events to a CloudWatch event stream. You can create rules that match events according to a specified pattern, and route the events to targets to take action, such as sending an email notification. Patterns are matched against the event JSON object. For more information about Amazon EMR event details, see [Amazon EMR events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/EventTypes.html#emr_event_type) in the *Amazon CloudWatch Events User Guide*.

For information about setting up CloudWatch event rules, see [Creating a CloudWatch rule that triggers on an event](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/Create-CloudWatch-Events-Rule.html).

# Setting alarms on CloudWatch metrics from Amazon EMR
<a name="UsingEMR_ViewingMetrics_Alarm"></a>

Amazon EMR pushes metrics to Amazon CloudWatch. In response, you can use CloudWatch to set alarms on your Amazon EMR metrics. For example, you can configure an alarm in CloudWatch to send you an email any time the HDFS utilization rises above 80%. For detailed instructions, see [Create or edit a CloudWatch alarm](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/ConsoleAlarms.html) in the *Amazon CloudWatch User Guide*. 

# Responding to Amazon EMR cluster insufficient instance capacity events
<a name="emr-events-response-insuff-capacity"></a>

## Overview
<a name="emr-events-response-insuff-capacity-overview"></a>

Amazon EMR clusters return the event code `EC2 provisioning - Insufficient Instance Capacity` when the selected Availability Zone doesn't have enough capacity to fulfill your cluster start or resize request. The event emits periodically with both instance groups and instance fleets if Amazon EMR repeatedly encounters insufficient capacity exceptions and can't fulfill your provisioning request for a cluster start or cluster resize operation.

This page describes how you can best respond to this event type when it occurs for your EMR cluster.

## Recommended response to an insufficient capacity event
<a name="emr-events-response-insuff-capacity-rec"></a>

We recommend that you respond to an insufficient-capacity event in one of the following ways:
+ Wait for capacity to recover. Capacity shifts frequently, so an insufficient capacity exception can recover on its own. Your clusters will start or finish resizing as soon as Amazon EC2 capacity becomes available.
+ Alternatively, you can terminate your cluster, modify your instance type configurations, and create a new cluster with the updated cluster configuration request. For more information, see [Availability Zone flexibility for an Amazon EMR cluster](emr-flexibility.md).

You can also set up rules or automated responses to an insufficient capacity event, as described in the next section.

## Automated recovery from an insufficient capacity event
<a name="emr-events-response-insuff-capacity-ex"></a>

You can build automation in response to Amazon EMR events such as the ones with event code `EC2 provisioning - Insufficient Instance Capacity`. For example, the following AWS Lambda function terminates an EMR cluster with an instance group that uses On-Demand instances, and then creates a new EMR cluster with an instance group that contains different instance types than the original request.

The following conditions trigger the automated process to occur:
+ The insufficient capacity event has been emitting for primary or core nodes for more than 20 minutes.
+ The cluster is not in a **READY** or **WAITING** state. For more information about EMR cluster states, see [Understanding the cluster lifecycle](emr-overview.md#emr-overview-cluster-lifecycle).

**Note**  
When you build an automated process for an insufficient capacity exception, you should consider that the insufficient capacity event is recoverable. Capacity often shifts and your clusters will resume the resize or start operation as soon as Amazon EC2 capacity becomes available.

**Example function to respond to insufficient capacity event**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR

import json
import boto3
import datetime
from datetime import timezone

INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE = "EMR Instance Group Provisioning"
INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE = (
    "EC2 provisioning - Insufficient Instance Capacity"
)
ALLOWED_INSTANCE_TYPES_TO_USE = [
    "m5.xlarge",
    "c5.xlarge",
    "m5.4xlarge",
    "m5.2xlarge",
    "t3.xlarge",
]
CLUSTER_START_ACCEPTABLE_STATES = ["WAITING", "RUNNING"]
CLUSTER_START_SLA = 20

CLIENT = boto3.client("emr", region_name="us-east-1")

# checks if the incoming event is 'EMR Instance Fleet Provisioning' with eventCode 'EC2 provisioning - Insufficient Instance Capacity'
def is_insufficient_capacity_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == INSUFFICIENT_CAPACITY_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == INSUFFICIENT_CAPACITY_EXCEPTION_EVENT_CODE
        )


# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceGroupType could be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]
    clusterCreationTime = describeClusterResponse["Cluster"]["Status"]["Timeline"][
        "CreationDateTime"
    ]
    clusterState = describeClusterResponse["Cluster"]["Status"]["State"]

    now = datetime.datetime.now()
    now = now.replace(tzinfo=timezone.utc)
    isClusterStartSlaBreached = clusterCreationTime < now - datetime.timedelta(
        minutes=CLUSTER_START_SLA
    )

    # Check if instance group receiving Insufficient capacity exception is CORE or PRIMARY (MASTER),
    # and it's been more than 20 minutes since cluster was created but the cluster state and the cluster state is not updated to RUNNING or WAITING
    if (
        (instanceGroupType == "CORE" or instanceGroupType == "MASTER")
        and isClusterStartSlaBreached
        and clusterState not in CLUSTER_START_ACCEPTABLE_STATES
    ):
        return True
    else:
        return False


# Choose item from the list except the exempt value
def choice_excluding(exempt):
    for i in ALLOWED_INSTANCE_TYPES_TO_USE:
        if i != exempt:
            return i


# Create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceGroupType cloud be CORE, MASTER OR TASK
    instanceGroupType = event["detail"]["instanceGroupType"]

    # Following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"

    # Select new instance types to include in the new createCluster request
    instanceTypeForMaster = (
        instanceTypesFromOriginalRequestMaster
        if instanceGroupType != "MASTER"
        else choice_excluding(instanceTypesFromOriginalRequestMaster)
    )
    instanceTypeForCore = (
        instanceTypesFromOriginalRequestCore
        if instanceGroupType != "CORE"
        else choice_excluding(instanceTypesFromOriginalRequestCore)
    )

    print("Starting to create cluster...")
    instances = {
        "InstanceGroups": [
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForMaster,
                "Market": "ON_DEMAND",
                "Name": "Master",
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 1,
                "InstanceType": instanceTypeForCore,
                "Market": "ON_DEMAND",
                "Name": "Core",
            },
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )

    return response["JobFlowId"]


# Terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")


def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response


def lambda_handler(event, context):
    if is_insufficient_capacity_event(event):
        print(
            "Received insufficient capacity event for instanceGroup, clusterId: "
            + event["detail"]["clusterId"]
        )

        describeClusterResponse = describe_cluster(event)

        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)

            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )

    else:
        print("Received event is not insufficient capacity event, skipping")
```

# Responding to Amazon EMR cluster instance fleet resize timeout events
<a name="emr-events-response-timeout-events"></a>

## Overview
<a name="emr-events-response-timeout-events-overview"></a>

Amazon EMR clusters emit [events](emr-manage-cloudwatch-events.md#emr-cloudwatch-instance-fleet-resize-events) while executing the resize operation for instance fleet clusters. The provisioning timeout events are emitted when Amazon EMR stops provisioning Spot or On-demand capacity for the fleet after the timeout expires. The timeout duration can be configured by the user as part of the [resize specifications](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html) for the instance fleets. In scenarios of consecutive resizes for the same instance fleet, Amazon EMR emits the `Spot provisioning timeout - continuing resize` or `On-Demand provisioning timeout - continuing resize` events when timeout for the current resize operation expires. It then starts provisioning capacity for the fleet’s next resize operation.

## Responding to instance fleet resize timeout events
<a name="emr-events-response-timeout-events-rec"></a>

We recommend that you respond to a provisioning timeout event in one of the following ways:
+ Revisit the [resize specifications](https://docs.aws.amazon.com/emr/latest/APIReference/API_InstanceFleetResizingSpecifications.html) and retry the resize operation. As capacity shifts frequently, your clusters will successfully resize as soon as Amazon EC2 capacity becomes available. We recommend customers to configure lower values for the timeout duration for the jobs that require stricter SLAs.
+ Alternatively, you can either:
  + Launch a new cluster with diversified instance types based on the [best practices for instance and Availability Zone flexibility](emr-flexibility.md#emr-flexibility-types) or
  + Launch a cluster with On-demand capacity
+ For the provisioning timeout - continuing resize event, you can additionally wait for resize operations to be processed. Amazon EMR will continue to sequentially process the resize operations triggered for the fleet, respecting the configured resize specifications.

You can also set up rules or automated responses to this event as described in the next section.

## Automated recovery from a provisioning timeout event
<a name="emr-events-response-timeout-events-ex"></a>

You can build automation in response to Amazon EMR events with the `Spot Provisioning timeout` event code. For example, the following AWS Lambda function shuts down an EMR cluster with an instance fleet that uses Spot instances for Task nodes, and then creates a new EMR cluster with an instance fleet that contains more diversified instance types than the original request. In this example, the `Spot Provisioning timeout` event emitted for task nodes will trigger the execution of the Lambda function.

**Example function to respond to `Spot Provisioning timeout` event**  

```
// Lambda code with Python 3.10 and handler is lambda_function.lambda_handler
// Note: related IAM role requires permission to use Amazon EMR
 
import json
import boto3
import datetime
from datetime import timezone
 
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE = "EMR Instance Fleet Resize"
SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE = (
    "Spot Provisioning timeout"
)
 
CLIENT = boto3.client("emr", region_name="us-east-1")
 
# checks if the incoming event is 'EMR Instance Fleet Resize' with eventCode 'Spot provisioning timeout'
def is_spot_provisioning_timeout_event(event):
    if not event["detail"]:
        return False
    else:
        return (
            event["detail-type"] == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_DETAIL_TYPE
            and event["detail"]["eventCode"]
            == SPOT_PROVISIONING_TIMEOUT_EXCEPTION_EVENT_CODE
        )
 
 
# checks if the cluster is eligible for termination
def is_cluster_eligible_for_termination(event, describeClusterResponse):
    # instanceFleetType could be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # Check if instance fleet receiving Spot provisioning timeout event is TASK
    if (instanceFleetType == "TASK"):
        return True
    else:
        return False
 
 
# create a new cluster by choosing different InstanceType.
def create_cluster(event):
    # instanceFleetType cloud be CORE, MASTER OR TASK
    instanceFleetType = event["detail"]["instanceFleetType"]
 
    # the following two lines assumes that the customer that created the cluster already knows which instance types they use in original request
    instanceTypesFromOriginalRequestMaster = "m5.xlarge"
    instanceTypesFromOriginalRequestCore = "m5.xlarge"
   
    # select new instance types to include in the new createCluster request
    instanceTypesForTask = [
        "m5.xlarge",
        "m5.2xlarge",
        "m5.4xlarge",
        "m5.8xlarge",
        "m5.12xlarge"
    ]
    
    print("Starting to create cluster...")
    instances = {
        "InstanceFleets": [
            {
                "InstanceFleetType":"MASTER",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestMaster,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"CORE",
                "TargetOnDemandCapacity":1,
                "TargetSpotCapacity":0,
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesFromOriginalRequestCore,
                        "WeightedCapacity":1,
                    }
                ]
            },
            {
                "InstanceFleetType":"TASK",
                "TargetOnDemandCapacity":0,
                "TargetSpotCapacity":100,
                "LaunchSpecifications":{},
                "InstanceTypeConfigs":[
                    {
                        'InstanceType': instanceTypesForTask[0],
                        "WeightedCapacity":1,
                    },
                    {
                        'InstanceType': instanceTypesForTask[1],
                        "WeightedCapacity":2,
                    },
                    {
                        'InstanceType': instanceTypesForTask[2],
                        "WeightedCapacity":4,
                    },
                    {
                        'InstanceType': instanceTypesForTask[3],
                        "WeightedCapacity":8,
                    },
                    {
                        'InstanceType': instanceTypesForTask[4],
                        "WeightedCapacity":12,
                    }
                ],
                "ResizeSpecifications": {
                    "SpotResizeSpecification": {
                        "TimeoutDurationMinutes": 30
                    }
                }
            }
        ]
    }
    response = CLIENT.run_job_flow(
        Name="Test Cluster",
        Instances=instances,
        VisibleToAllUsers=True,
        JobFlowRole="EMR_EC2_DefaultRole",
        ServiceRole="EMR_DefaultRole",
        ReleaseLabel="emr-6.10.0",
    )
 
    return response["JobFlowId"]
 
 
# terminated the cluster using clusterId received in an event
def terminate_cluster(event):
    print("Trying to terminate cluster, clusterId: " + event["detail"]["clusterId"])
    response = CLIENT.terminate_job_flows(JobFlowIds=[event["detail"]["clusterId"]])
    print(f"Terminate cluster response: {response}")
 
 
def describe_cluster(event):
    response = CLIENT.describe_cluster(ClusterId=event["detail"]["clusterId"])
    return response
 
 
def lambda_handler(event, context):
    if is_spot_provisioning_timeout_event(event):
        print(
            "Received spot provisioning timeout event for instanceFleet, clusterId: "
            + event["detail"]["clusterId"]
        )
 
        describeClusterResponse = describe_cluster(event)
 
        shouldTerminateCluster = is_cluster_eligible_for_termination(
            event, describeClusterResponse
        )
        if shouldTerminateCluster:
            terminate_cluster(event)
 
            clusterId = create_cluster(event)
            print("Created a new cluster, clusterId: " + clusterId)
        else:
            print(
                "Cluster is not eligible for termination, clusterId: "
                + event["detail"]["clusterId"]
            )
 
    else:
        print("Received event is not spot provisioning timeout event, skipping")
```