

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 從 Airflow 提交 EMR Serverless 任務
<a name="using-airflow"></a>

Apache Airflow 中的 Amazon Provider 提供 EMR Serverless 運算子。如需運算子的詳細資訊，請參閱 Apache Airflow 文件中的 [Amazon EMR Serverless Operators](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr_serverless.html)。

您可以使用 來`EmrServerlessCreateApplicationOperator`建立 Spark 或 Hive 應用程式。您也可以使用 `EmrServerlessStartJobOperator` 來啟動一或多個使用新應用程式的任務。

若要搭配 Amazon Managed Workflows for Apache Airflow (MWAA) 搭配 Airflow 2.2.2 使用 運算子，請將以下行新增至您的 `requirements.txt` 檔案，並更新您的 MWAA 環境以使用新的 檔案。

```
apache-airflow-providers-amazon==6.0.0
boto3>=1.23.9
```

請注意，EMR Serverless 支援已新增至 Amazon 供應商的 5.0.0 版。6.0.0 版是與 Airflow 2.2.2 相容的最新版本。您可以在 MWAA 上使用 Airflow 2.4.3 的更新版本。

下列縮寫範例示範如何建立應用程式、執行多個 Spark 任務，然後停止應用程式。[EMR Serverless Samples](https://github.com/aws-samples/emr-serverless-samples/tree/main/airflow) GitHub 儲存庫提供完整範例。如需`sparkSubmit`組態的其他詳細資訊，請參閱 [執行 EMR Serverless 任務時使用 Spark 組態](jobs-spark.md)。

```
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator,
)

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role"
S3_LOGS_BUCKET = "amzn-s3-demo-bucket"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"}
    },
}

with DAG(
    dag_id="example_endtoend_emr_serverless_job",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create_spark_app",
        job_type="SPARK",
        release_label="emr-6.7.0",
        config={"name": "airflow-test"},
    )

    application_id = create_app.output

    job1 = EmrServerlessStartJobOperator(
        task_id="start_job_1",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py",
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    job2 = EmrServerlessStartJobOperator(
        task_id="start_job_2",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
                "entryPointArguments": ["1000"]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
    )

    (create_app >> [job1, job2] >> delete_app)
```