

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Airflow에서 EMR Serverless 작업 제출
<a name="using-airflow"></a>

Apache Airflow의 Amazon Provider에서는 EMR Serverless 연산자를 제공합니다. 연산자에 대한 자세한 내용은 Apache Airflow 설명서의 [Amazon EMR Serverless Operators](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr_serverless.html)를 참조하세요.

`EmrServerlessCreateApplicationOperator`를 사용하여 Spark 또는 Hive 애플리케이션을 생성할 수 있습니다. `EmrServerlessStartJobOperator`를 사용하여 새 애플리케이션으로 하나 이상의 작업을 시작할 수도 있습니다.

Airflow 2.2.2와 함께 Amazon Managed Workflows for Apache Airflow(MWAA)에서 연산자를 사용하려면 `requirements.txt` 파일에 다음 줄을 추가하고 MWAA 환경을 업데이트하여 새 파일을 사용합니다.

```
apache-airflow-providers-amazon==6.0.0
boto3>=1.23.9
```

Amazon 제공업체의 릴리스 5.0.0에는 EMR Serverless 지원이 추가되었습니다. 릴리스 6.0.0은 Airflow 2.2.2와 호환되는 마지막 버전입니다. MWAA의 Airflow 2.4.3에서 이후 버전을 사용할 수 있습니다.

다음 약식 예제에서는 애플리케이션을 생성하고 여러 Spark 작업을 실행한 다음, 애플리케이션을 중지하는 방법을 보여줍니다. 전체 예제는 [EMR Serverless Samples](https://github.com/aws-samples/emr-serverless-samples/tree/main/airflow) GitHub 리포지토리에서 사용할 수 있습니다. `sparkSubmit` 구성에 대한 자세한 내용은 [EMR Serverless 작업을 실행하는 경우 Spark 구성 사용](jobs-spark.md) 섹션을 참조하세요.

```
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrServerlessCreateApplicationOperator,
    EmrServerlessStartJobOperator,
    EmrServerlessDeleteApplicationOperator,
)

# Replace these with your correct values
JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role"
S3_LOGS_BUCKET = "amzn-s3-demo-bucket"

DEFAULT_MONITORING_CONFIG = {
    "monitoringConfiguration": {
        "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"}
    },
}

with DAG(
    dag_id="example_endtoend_emr_serverless_job",
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    create_app = EmrServerlessCreateApplicationOperator(
        task_id="create_spark_app",
        job_type="SPARK",
        release_label="emr-6.7.0",
        config={"name": "airflow-test"},
    )

    application_id = create_app.output

    job1 = EmrServerlessStartJobOperator(
        task_id="start_job_1",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py",
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    job2 = EmrServerlessStartJobOperator(
        task_id="start_job_2",
        application_id=application_id,
        execution_role_arn=JOB_ROLE_ARN,
        job_driver={
            "sparkSubmit": {
                "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py",
                "entryPointArguments": ["1000"]
            }
        },
        configuration_overrides=DEFAULT_MONITORING_CONFIG,
    )

    delete_app = EmrServerlessDeleteApplicationOperator(
        task_id="delete_app",
        application_id=application_id,
        trigger_rule="all_done",
    )

    (create_app >> [job1, job2] >> delete_app)
```