

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Apache Airflow REST API 사용
<a name="access-mwaa-apache-airflow-rest-api"></a>

Amazon Managed Workflows for Apache Airflow(Amazon MWAA)는 Apache Airflow v2.4.3 이상을 실행하는 환경에 Apache Airflow REST API를 사용하여 Apache Airflow 환경과 직접 상호 작용할 수 있도록 지원합니다. 이를 통해 Amazon MWAA 환경에 프로그래밍 방식으로 액세스하고 이 환경을 관리할 수 있으며, 데이터 오케스트레이션 워크플로를 간접적으로 호출하고, DAG를 관리하고, 메타데이터 데이터베이스, 트리거러, 스케줄러 등의 다양한 Apache Airflow 구성 요소의 상태를 모니터링하는 표준화된 방법도 확보할 수 있습니다.

Apache Airflow REST API를 사용하는 동안 확장성을 지원하기 위해 Amazon MWAA는 REST API 요청, 명령줄 인터페이스(CLI) 사용 또는 더 많은 동시 Apache Airflow 사용자 인터페이스(UI) 사용자 등 수요 증가를 처리하도록 웹 서버 용량의 수평적 스케일링 옵션을 제공합니다. Amazon MWAA가 웹 서버 규모를 조정하는 방법에 대한 자세한 내용은 [Amazon MWAA 웹 서버 오토 스케일링 구성](mwaa-web-server-autoscaling.md) 섹션을 참조하세요.

Apache Airflow REST API를 사용하여 환경에 다음과 같은 사용 사례를 구현할 수 있습니다.
+ **프로그래밍 방식 액세스** - 이제 Apache Airflow UI 또는 CLI에 의존하지 않고도 Apache Airflow DAG 실행을 시작하고, 데이터세트를 관리하고, 메타데이터 데이터베이스, 트리거러, 스케줄러 등 다양한 구성 요소의 상태를 검색할 수 있습니다.
+ **외부 애플리케이션 및 마이크로서비스와 통합** - Amazon MWAA 환경을 다른 시스템과 통합하는 사용자 지정 솔루션을 구축하는 데 사용할 수 있는 REST API 지원. 예를 들어 완료된 데이터베이스 작업 또는 새 사용자 등록과 같은 외부 시스템의 이벤트에 대한 응답으로 워크플로를 시작할 수 있습니다.
+ **중앙 집중식 모니터링** - 여러 Amazon MWAA 환경에서 DAG의 상태를 집계하는 모니터링 대시보드를 빌드하여 중앙 집중식 모니터링 및 관리를 활성화할 수 있습니다.

Apache Airflow REST API에 대한 자세한 내용은 [Apache Airflow REST API 참조](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html)를 참조하세요.

를 사용하면 AWS 자격 증명을 사용하여 Apache Airflow REST API에 액세스할 `InvokeRestApi`수 있습니다. 또는 웹 서버 액세스 토큰을 얻은 다음, 이 토큰을 사용해서 직접적으로 호출하여 액세스할 수도 있습니다.

`InvokeRestApi` 작업을 사용하는 동안 `Update your environment to use InvokeRestApi` 메시지의 오류가 발생하는 것은 Amazon MWAA 환경을 업데이트해야 함을 나타냅니다. 이 오류는 Amazon MWAA 환경이 `InvokeRestApi` 기능과 관련된 최신 변경 사항과 호환되지 않을 때 발생합니다. 이 문제를 해결하려면 `InvokeRestApi` 기능에 필요한 변경 사항을 통합하도록 Amazon MWAA 환경을 업데이트하세요.

`InvokeRestApi` 작업의 기본 제한 시간은 10초입니다. 작업이 이 10초 기간 내에 완료되지 않으면 작업이 자동 종료되고 오류가 발생합니다. 오류가 발생하지 않도록 REST API 호출이 이 제한 시간 내에 완료되도록 설계되었는지 확인합니다.

Apache Airflow REST API를 사용하는 동안 확장성을 지원하기 위해 Amazon MWAA는 REST API 요청, 명령줄 인터페이스(CLI) 사용 또는 더 많은 동시 Apache Airflow 사용자 인터페이스(UI) 사용자 등 수요 증가를 처리하도록 웹 서버 용량의 수평적 스케일링 옵션을 제공합니다. Amazon MWAA가 웹 서버 규모를 조정하는 방법에 대한 자세한 내용은 [Amazon MWAA 웹 서버 오토 스케일링 구성](mwaa-web-server-autoscaling.md) 섹션을 참조하세요.

Apache Airflow REST API를 사용하여 환경에 다음과 같은 사용 사례를 구현할 수 있습니다.
+ **프로그래밍 방식 액세스** - 이제 Apache Airflow UI 또는 CLI에 의존하지 않고도 Apache Airflow DAG 실행을 시작하고, 데이터세트를 관리하고, 메타데이터 데이터베이스, 트리거러, 스케줄러 등 다양한 구성 요소의 상태를 검색할 수 있습니다.
+ **외부 애플리케이션 및 마이크로서비스와 통합** - Amazon MWAA 환경을 다른 시스템과 통합하는 사용자 지정 솔루션을 구축하는 데 사용할 수 있는 REST API 지원. 예를 들어 완료된 데이터베이스 작업 또는 새 사용자 등록과 같은 외부 시스템의 이벤트에 대한 응답으로 워크플로를 시작할 수 있습니다.
+ **중앙 집중식 모니터링** - 여러 Amazon MWAA 환경에서 DAG의 상태를 집계하는 모니터링 대시보드를 빌드하여 중앙 집중식 모니터링 및 관리를 활성화할 수 있습니다.

Apache Airflow REST API에 대한 자세한 내용은 [Apache Airflow REST API 참조](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html)를 참조하세요.

를 사용하면 AWS 자격 증명을 사용하여 Apache Airflow REST API에 액세스할 `InvokeRestApi`수 있습니다. 또는 웹 서버 액세스 토큰을 얻은 다음, 이 토큰을 사용해서 직접적으로 호출하여 액세스할 수도 있습니다.
+ `InvokeRestApi` 작업을 사용하는 동안 `Update your environment to use InvokeRestApi` 메시지의 오류가 발생하는 것은 Amazon MWAA 환경을 업데이트해야 함을 나타냅니다. 이 오류는 Amazon MWAA 환경이 `InvokeRestApi` 기능과 관련된 최신 변경 사항과 호환되지 않을 때 발생합니다. 이 문제를 해결하려면 `InvokeRestApi` 기능에 필요한 변경 사항을 통합하도록 Amazon MWAA 환경을 업데이트하세요.
+ `InvokeRestApi` 작업의 기본 제한 시간은 10초입니다. 작업이 이 10초 기간 내에 완료되지 않으면 작업이 자동 종료되고 오류가 발생합니다. 오류가 발생하지 않도록 REST API 호출이 이 제한 시간 내에 완료되도록 설계되었는지 확인합니다.

**중요**  
응답 페이로드 크기는 6MB 이하여야 합니다. 이 제한을 초과하면 `RestApi`가 실패합니다.

다음 예제를 사용하여 Apache Airflow REST API에 대한 API 직접 호출을 수행하고 새로운 DAG 실행을 시작합니다.

**Topics**
+ [Apache Airflow REST API에 대한 액세스 권한 부여: `airflow:InvokeRestApi`](#granting-access-MWAA-Enhanced-REST-API)
+ [Apache Airflow REST API 직접 호출](#listing-DAGs-creating-variables-using-restapi-script)
+ [웹 서버 세션 토큰 생성 및 Apache Airflow REST API 직접 호출](#create-web-server-session-token)

## Apache Airflow REST API에 대한 액세스 권한 부여: `airflow:InvokeRestApi`
<a name="granting-access-MWAA-Enhanced-REST-API"></a>

 AWS 자격 증명을 사용하여 Apache Airflow REST API에 액세스하려면 IAM 정책에서 `airflow:InvokeRestApi` 권한을 부여해야 합니다. 다음 정책 샘플에서, `{airflow-role}`에서 `Admin`, `Op`, `User`, `Viewer` 또는 `Public` 역할을 지정하여 사용자 액세스 수준을 지정합니다. 자세한 내용은 *Apache Airflow 참조 가이드*의 [기본 역할](https://airflow.apache.org/docs/apache-airflow/1.10.6/security.html?highlight=ldap#default-roles)을 참조하세요.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "AllowMwaaRestApiAccess",
            "Effect": "Allow",
            "Action": "airflow:InvokeRestApi",
            "Resource": [
            "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}"
            ]
        }
    ]
}
```

------

**참고**  
프라이빗 웹 서버를 구성하는 동안에는 가상 프라이빗 클라우드(VPC) 외부에서 `InvokeRestApi` 작업을 간접적으로 호출할 수 없습니다. `aws:SourceVpc` 키를 사용하여 이 작업에 대해 더 세분화된 액세스 제어를 적용할 수 있습니다. 자세한 내용은 [aws:SourceVpc](https://docs.aws.amazon.com//IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcevpc)를 참조하세요.

## Apache Airflow REST API 직접 호출
<a name="listing-DAGs-creating-variables-using-restapi-script"></a>

다음 샘플 스크립트에서는 Apache Airflow REST API를 사용하여 환경에서 사용 가능한 DAG를 나열하는 방법과 Apache Airflow 변수를 생성하는 방법을 다룹니다.

```
import boto3
			
  env_name = "MyAirflowEnvironment"
			
  def list_dags(client):
    request_params = {
      "Name": env_name,
      "Path": "/dags",
      "Method": "GET",
      "QueryParameters": {
        "paused": False
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  def create_variable(client):
    request_params = {
      "Name": env_name,
      "Path": "/variables",
      "Method": "POST",
      "Body": {
        "key": "test-restapi-key",
        "value": "test-restapi-value",
        "description": "Test variable created by MWAA InvokeRestApi API",
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  if __name__ == "__main__":
    client = boto3.client("mwaa")
    list_dags(client)
    create_variable(client)
```

## 웹 서버 세션 토큰 생성 및 Apache Airflow REST API 직접 호출
<a name="create-web-server-session-token"></a>

웹 서버 액세스 토큰을 생성하려면 다음 Python 함수를 사용합니다. 이 함수는 먼저 Amazon MWAA API를 직접 호출하여 웹 로그인 토큰을 가져옵니다. 60초 후에 만료되는 웹 로그인 토큰은 웹 *세션* 토큰으로 교환되며, 이를 통해 웹 서버에 액세스하고 Apache Airflow REST API를 사용할 수 있습니다. 초당 10개가 넘는 트랜잭션(TPS) 스로틀링 용량이 필요한 경우 이 방법을 사용하여 Apache Airflow REST API에 액세스할 수 있습니다.

세션 토큰은 12시간 후에 만료됩니다.

**작은 정보**  
다음 코드 샘플에서 Apache Airflow v2에서 v3로의 주요 변경 사항은 다음과 같습니다.  
REST API 경로가 `/api/v1`에서 `/api/v2`로 변경됨
로그인 경로가 `/aws_maa/login`에서 `/pluginsv2/aws_mwaa/login`으로 변경됨
로그인 `response.cookies["_token"]`의 응답에는 후속 API 호출에 사용해야 하는 토큰 정보가 포함됩니다.
REST API 호출의 경우 헤더의 `jwt_token` 정보를 다음과 같이 전달해야 합니다.  

  ```
  headers = {
    "Authorization": f"Bearer {jwt_token}",
    "Content-Type": "application/json"
  }
  ```

------
#### [ Apache Airflow v3 ]

```
def get_token_info(region, env_name):
  logging.basicConfig(level=logging.INFO)
						
  try:
    # Initialize MWAA client and request a web login token
    mwaa = boto3.client('mwaa', region_name=region)
    response = mwaa.create_web_login_token(Name=env_name)
						
    # Extract the web server hostname and login token
    web_server_host_name = response["WebServerHostname"]
    web_token = response["WebToken"]
						
    # Construct the URL needed for authentication 
    login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
    login_payload = {"token": web_token}
						
    # Make a POST request to the MWAA login url using the login payload
    response = requests.post(
      login_url,
      data=login_payload,
      timeout=10
    )
						
    # Check if login was successful
    if response.status_code == 200:
						
    # Return the hostname and the session cookie 
    return (
      web_server_host_name,
      response.cookies['_token']
    )
    else:
      # Log an error
      logging.error("Failed to log in: HTTP %d", response.status_code)
      return None
      except requests.RequestException as e:
      
      # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
      except Exception as e:
      
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------
#### [ Apache Airflow v2 ]

```
def get_session_info(region, env_name):
  logging.basicConfig(level=logging.INFO)

  try:
      # Initialize MWAA client and request a web login token
      mwaa = boto3.client('mwaa', region_name=region)
      response = mwaa.create_web_login_token(Name=env_name)
      
      # Extract the web server hostname and login token
      web_server_host_name = response["WebServerHostname"]
      web_token = response["WebToken"]
      
      # Construct the URL needed for authentication 
      login_url = f"https://{web_server_host_name}/aws_mwaa/login"
      login_payload = {"token": web_token}

      # Make a POST request to the MWAA login url using the login payload
      response = requests.post(
          login_url,
          data=login_payload,
          timeout=10
      )

      # Check if login was succesfull 
      if response.status_code == 200:
      
          # Return the hostname and the session cookie 
          return (
              web_server_host_name,
              response.cookies["session"]
          )
      else:
          # Log an error
          logging.error("Failed to log in: HTTP %d", response.status_code)
          return None
  except requests.RequestException as e:
       # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
  except Exception as e:
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------

인증이 완료된 후 제공되는 자격 증명으로 API 엔드포인트로 요청을 전송하기 시작할 수 있습니다. 다음 섹션의 예제에서는 엔드포인트 `dags/{dag_name}/dagRuns`를 사용합니다.

------
#### [ Apache Airflow v3 ]

```
def trigger_dag(region, env_name, dag_id):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_id (str): ID of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and token for authentication
						try:
						web_server_host_name, jwt_token = get_token_info(region, env_name)
						if not jwt_token:
						logging.error("Authentication failed, no jwt token retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving token info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						request_headers = {
						"Authorization": f"Bearer {jwt_token}",
						"Content-Type": "application/json" # Good practice to include, even for GET
						}
						
						# sample request body input
						json_body = {"logical_date": "2025-09-17T14:15:00Z"}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, headers=request_headers, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_id = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_id)
```

------
#### [ Apache Airflow v2 ]

```
def trigger_dag(region, env_name, dag_name):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_name (str): Name of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and session cookie for authentication
						try:
						web_server_host_name, session_cookie = get_session_info(region, env_name)
						if not session_cookie:
						logging.error("Authentication failed, no session cookie retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving session info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						cookies = {"session": session_cookie}
						json_body = {"conf": {}}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, cookies=cookies, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_name = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_name)
```

------