

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 从 Airflow 提交 EMR Serverless 作业
<a name="using-airflow"></a>

Apache Airflow 中的 Amazon Provider 提供 EMR Serverless 运算符。有关运算符的更多信息，请参阅 Apache Airflow 文档中的 [Amazon EMR Serverless Operators](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr_serverless.html)。

您可以使用 `EmrServerlessCreateApplicationOperator` 创建 Spark 或 Hive 应用程序。还可以使用 `EmrServerlessStartJobOperator` 通过新的应用程序启动一项或多项作业。

要在 Airflow 2.2.2 版本的 Amazon Managed Workflows for Apache Airflow（MWAA）中使用运算符，请在 `requirements.txt` 文件中添加以下行，并更新 MWAA 环境以使用新文件。

```
apache-airflow-providers-amazon==6.0.0
boto3>=1.23.9
```

请注意，Amazon Provider 5.0.0 版中增加了 EMR Serverless 支持。6.0.0 是与 Airflow 2.2.2 兼容的最新版本。您可以在 MWAA 上使用 Airflow 2.4.3 的后续版本。

下面的简略示例展示了如何创建应用程序、运行多个 Spark 作业，然后停止应用程序。[EMR 无服务器](https://github.com/aws-samples/emr-serverless-samples/tree/main/airflow)示例存储库中提供了完整的示例。 GitHub 有关 `sparkSubmit` 配置的更多详细信息，请参阅 [运行 EMR Serverless 作业时使用 Spark 配置](jobs-spark.md)。

```
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator,
)

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role"
S3_LOGS_BUCKET = "amzn-s3-demo-bucket"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"}
    },
}

with DAG(
    dag_id="example_endtoend_emr_serverless_job",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create_spark_app",
        job_type="SPARK",
        release_label="emr-6.7.0",
        config={"name": "airflow-test"},
    )

    application_id = create_app.output

    job1 = EmrServerlessStartJobOperator(
        task_id="start_job_1",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py",
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    job2 = EmrServerlessStartJobOperator(
        task_id="start_job_2",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
                "entryPointArguments": ["1000"]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
    )

    (create_app >> [job1, job2] >> delete_app)
```