

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Envío de trabajos de EMR sin servidor desde Airflow
<a name="using-airflow"></a>

El proveedor de Amazon en Apache Airflow proporciona operadores EMR sin servidor. Para obtener más información sobre los operadores, consulte [Amazon EMR Serverless Operators](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr_serverless.html) en la documentación de Apache Airflow.

Puede utilizar `EmrServerlessCreateApplicationOperator` para crear una aplicación Spark o Hive. También puede utilizar `EmrServerlessStartJobOperator` para iniciar uno o más trabajos con su nueva aplicación. 

Para usar el operador con Amazon Managed Workflows for Apache Airflow (MWAA) con Airflow 2.2.2, añada la siguiente línea al archivo `requirements.txt` y actualice su entorno de MWAA para usar el nuevo archivo. 

```
apache-airflow-providers-amazon==6.0.0
boto3>=1.23.9
```

Tenga en cuenta que la compatibilidad con EMR sin servidor se agregó a la versión 5.0.0 del proveedor Amazon. La versión 6.0.0 es la última versión compatible con Airflow 2.2.2. Puede utilizar versiones posteriores con Airflow 2.4.3 en MWAA.

El siguiente ejemplo abreviado muestra cómo crear una aplicación, ejecutar varios trabajos de Spark y, a continuación, detener la aplicación. Hay un ejemplo completo disponible en el repositorio de muestras [sin servidor de EMR](https://github.com/aws-samples/emr-serverless-samples/tree/main/airflow). GitHub Para obtener información adicional de la configuración `sparkSubmit`, consulte [Uso de configuraciones de Spark al ejecutar trabajos de EMR sin servidor](jobs-spark.md).

```
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator,
)

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role"
S3_LOGS_BUCKET = "amzn-s3-demo-bucket"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"}
    },
}

with DAG(
    dag_id="example_endtoend_emr_serverless_job",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create_spark_app",
        job_type="SPARK",
        release_label="emr-6.7.0",
        config={"name": "airflow-test"},
    )

    application_id = create_app.output

    job1 = EmrServerlessStartJobOperator(
        task_id="start_job_1",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py",
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    job2 = EmrServerlessStartJobOperator(
        task_id="start_job_2",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
                "entryPointArguments": ["1000"]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
    )

    (create_app >> [job1, job2] >> delete_app)
```