

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan Apache Airflow REST API
<a name="access-mwaa-apache-airflow-rest-api"></a>

Alur Kerja Terkelola Amazon untuk Apache Airflow (Amazon MWAA) mendukung interaksi dengan lingkungan Apache Airflow Anda secara langsung menggunakan API REST Apache Airflow untuk lingkungan yang menjalankan Apache Airflow v2.4.3 dan yang lebih baru. Ini memungkinkan Anda mengakses dan mengelola lingkungan Amazon MWAA Anda secara terprogram, menyediakan cara standar untuk menjalankan alur kerja orkestrasi data, mengelola, dan DAGs memantau status berbagai komponen Apache Airflow seperti database metadata, pemicu, dan penjadwal.

Untuk mendukung skalabilitas saat menggunakan Apache Airflow REST API, Amazon MWAA memberi Anda opsi untuk menskalakan kapasitas server web secara horizontal untuk menangani peningkatan permintaan, baik dari permintaan REST API, penggunaan antarmuka baris perintah (CLI), atau pengguna antarmuka pengguna Apache Airflow (UI) lainnya secara bersamaan. Untuk informasi lebih lanjut tentang bagaimana Amazon MWAA menskalakan server web, lihat. [Mengkonfigurasi penskalaan otomatis server web Amazon MWAA](mwaa-web-server-autoscaling.md)

Anda dapat menggunakan Apache Airflow REST API untuk mengimplementasikan kasus penggunaan berikut untuk lingkungan Anda:
+ **Akses terprogram** - Anda sekarang dapat memulai Apache Airflow DAG berjalan, mengelola kumpulan data, dan mengambil status berbagai komponen seperti database metadata, pemicu, dan penjadwal tanpa bergantung pada Apache Airflow UI atau CLI.
+ **Integrasikan dengan aplikasi eksternal dan layanan mikro** — Dukungan REST API yang dapat Anda gunakan untuk membangun solusi khusus yang mengintegrasikan lingkungan Amazon MWAA Anda dengan sistem lain. Misalnya, Anda dapat memulai alur kerja sebagai respons terhadap peristiwa dari sistem eksternal, seperti pekerjaan database yang diselesaikan atau pendaftaran pengguna baru.
+ **Pemantauan terpusat** — Anda dapat membuat dasbor pemantauan yang menggabungkan status Anda di DAGs beberapa lingkungan Amazon MWAA, memungkinkan pemantauan dan pengelolaan terpusat.

Untuk informasi selengkapnya tentang Apache Airflow REST API, lihat Referensi API Apache [Airflow](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) REST.

Dengan menggunakan`InvokeRestApi`, Anda dapat mengakses Apache Airflow REST API menggunakan kredensil. AWS Atau, Anda juga dapat mengaksesnya dengan mendapatkan token akses server web dan kemudian menggunakan token untuk memanggilnya.

Jika Anda menemukan kesalahan dengan pesan `Update your environment to use InvokeRestApi` saat menggunakan `InvokeRestApi` operasi, ini menunjukkan bahwa Anda perlu memperbarui lingkungan Amazon MWAA Anda. Kesalahan ini terjadi ketika lingkungan Amazon MWAA Anda tidak kompatibel dengan perubahan terbaru yang terkait dengan fitur tersebut`InvokeRestApi`. Untuk mengatasi masalah ini, perbarui lingkungan Amazon MWAA Anda untuk memasukkan perubahan yang diperlukan untuk fitur tersebut`InvokeRestApi`.

`InvokeRestApi`Operasi ini memiliki durasi batas waktu default 10 detik. Jika operasi tidak selesai dalam jangka waktu 10 detik ini, operasi secara otomatis dihentikan, dan kesalahan muncul. Pastikan bahwa panggilan REST API Anda dirancang untuk diselesaikan dalam periode waktu tunggu ini untuk menghindari kesalahan.

Untuk mendukung skalabilitas saat menggunakan Apache Airflow REST API, Amazon MWAA memberi Anda opsi untuk menskalakan kapasitas server web secara horizontal untuk menangani peningkatan permintaan, baik dari permintaan REST API, penggunaan antarmuka baris perintah (CLI), atau pengguna antarmuka pengguna Apache Airflow (UI) yang lebih bersamaan. Untuk informasi selengkapnya tentang bagaimana Amazon MWAA menskalakan server web, lihat. [Mengkonfigurasi penskalaan otomatis server web Amazon MWAA](mwaa-web-server-autoscaling.md)

Anda dapat menggunakan Apache Airflow REST API untuk mengimplementasikan kasus penggunaan berikut untuk lingkungan Anda:
+ **Akses terprogram** - Anda sekarang dapat memulai Apache Airflow DAG berjalan, mengelola kumpulan data, dan mengambil status berbagai komponen seperti database metadata, pemicu, dan penjadwal tanpa bergantung pada Apache Airflow UI atau CLI.
+ **Integrasikan dengan aplikasi eksternal dan layanan mikro** — Dukungan REST API yang dapat Anda gunakan untuk membangun solusi khusus yang mengintegrasikan lingkungan Amazon MWAA Anda dengan sistem lain. Misalnya, Anda dapat memulai alur kerja sebagai respons terhadap peristiwa dari sistem eksternal, seperti pekerjaan database yang diselesaikan atau pendaftaran pengguna baru.
+ **Pemantauan terpusat** — Anda dapat membuat dasbor pemantauan yang menggabungkan status Anda di DAGs beberapa lingkungan Amazon MWAA, memungkinkan pemantauan dan pengelolaan terpusat.

Untuk informasi selengkapnya tentang Apache Airflow REST API, lihat Referensi API [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) REST.

Dengan menggunakan`InvokeRestApi`, Anda dapat mengakses Apache Airflow REST API menggunakan kredensil. AWS Atau, Anda juga dapat mengaksesnya dengan mendapatkan token akses server web dan kemudian menggunakan token untuk memanggilnya.
+ Jika Anda menemukan kesalahan dengan pesan `Update your environment to use InvokeRestApi` saat menggunakan `InvokeRestApi` operasi, ini menunjukkan bahwa Anda perlu memperbarui lingkungan Amazon MWAA Anda. Kesalahan ini terjadi ketika lingkungan Amazon MWAA Anda tidak kompatibel dengan perubahan terbaru yang terkait dengan fitur tersebut`InvokeRestApi`. Untuk mengatasi masalah ini, perbarui lingkungan Amazon MWAA Anda untuk memasukkan perubahan yang diperlukan untuk fitur tersebut`InvokeRestApi`.
+ `InvokeRestApi`Operasi ini memiliki durasi batas waktu default 10 detik. Jika operasi tidak selesai dalam jangka waktu 10 detik ini, operasi secara otomatis dihentikan, dan kesalahan muncul. Pastikan bahwa panggilan REST API Anda dirancang untuk diselesaikan dalam periode waktu tunggu ini untuk menghindari kesalahan.

**penting**  
Ukuran payload respons tidak boleh melebihi 6 MB. Anda `RestApi` gagal jika batas ini terlampaui.

Gunakan contoh berikut untuk melakukan panggilan API ke Apache Airflow REST API dan memulai DAG run baru:

**Topics**
+ [Memberikan akses ke Apache Airflow REST API: `airflow:InvokeRestApi`](#granting-access-MWAA-Enhanced-REST-API)
+ [Memanggil API REST Apache Airflow](#listing-DAGs-creating-variables-using-restapi-script)
+ [Membuat token sesi server web dan memanggil Apache Airflow REST API](#create-web-server-session-token)

## Memberikan akses ke Apache Airflow REST API: `airflow:InvokeRestApi`
<a name="granting-access-MWAA-Enhanced-REST-API"></a>

Untuk mengakses Apache Airflow REST API AWS menggunakan kredensil, Anda harus memberikan izin `airflow:InvokeRestApi` dalam kebijakan IAM Anda. Dalam contoh kebijakan berikut, tentukan `Public` peran `Admin``Op`, `User``Viewer`,,, atau `{airflow-role}` untuk menyesuaikan tingkat akses pengguna. Untuk informasi selengkapnya, lihat [Peran Default](https://airflow.apache.org/docs/apache-airflow/1.10.6/security.html?highlight=ldap#default-roles) dalam panduan referensi *Apache Airflow*.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "AllowMwaaRestApiAccess",
            "Effect": "Allow",
            "Action": "airflow:InvokeRestApi",
            "Resource": [
            "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}"
            ]
        }
    ]
}
```

------

**catatan**  
Saat mengonfigurasi server web pribadi, `InvokeRestApi` tindakan tidak dapat dipanggil dari luar Virtual Private Cloud (VPC). Anda dapat menggunakan `aws:SourceVpc` kunci untuk menerapkan kontrol akses yang lebih terperinci untuk operasi ini. Untuk informasi lebih lanjut, lihat [aws: SourceVpc](https://docs.aws.amazon.com//IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcevpc).

## Memanggil API REST Apache Airflow
<a name="listing-DAGs-creating-variables-using-restapi-script"></a>

Contoh skrip berikut ini mencakup cara menggunakan Apache Airflow REST API untuk mencantumkan yang DAGs tersedia di lingkungan Anda dan cara membuat variabel Apache Airflow:

```
import boto3
			
  env_name = "MyAirflowEnvironment"
			
  def list_dags(client):
    request_params = {
      "Name": env_name,
      "Path": "/dags",
      "Method": "GET",
      "QueryParameters": {
        "paused": False
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  def create_variable(client):
    request_params = {
      "Name": env_name,
      "Path": "/variables",
      "Method": "POST",
      "Body": {
        "key": "test-restapi-key",
        "value": "test-restapi-value",
        "description": "Test variable created by MWAA InvokeRestApi API",
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  if __name__ == "__main__":
    client = boto3.client("mwaa")
    list_dags(client)
    create_variable(client)
```

## Membuat token sesi server web dan memanggil Apache Airflow REST API
<a name="create-web-server-session-token"></a>

Untuk membuat token akses server web, gunakan fungsi Python berikut. Fungsi ini pertama-tama memanggil Amazon MWAA API untuk mendapatkan token login web. Token login web, yang kedaluwarsa setelah 60 detik, kemudian ditukar dengan token *sesi* web, yang memungkinkan Anda mengakses server web dan menggunakan API REST Apache Airflow. Jika Anda memerlukan lebih dari 10 transaksi per detik (TPS) kapasitas pelambatan, Anda dapat menggunakan metode ini untuk mengakses Apache Airflow REST API.

Token sesi berakhir setelah 12 jam.

**Tip**  
Perubahan utama dalam contoh kode berikut dari Apache Airflow v2 ke v3 adalah:  
Jalur REST API diubah dari `/api/v1` menjadi `/api/v2`
Jalur masuk diubah dari `/aws_maa/login` menjadi `/pluginsv2/aws_mwaa/login`
Respons dari login `response.cookies["_token"]` berisi informasi token yang harus Anda gunakan untuk panggilan API berikutnya
Untuk panggilan REST API, Anda harus meneruskan `jwt_token` informasi di header sebagai:  

  ```
  headers = {
    "Authorization": f"Bearer {jwt_token}",
    "Content-Type": "application/json"
  }
  ```

------
#### [ Apache Airflow v3 ]

```
def get_token_info(region, env_name):
  logging.basicConfig(level=logging.INFO)
						
  try:
    # Initialize MWAA client and request a web login token
    mwaa = boto3.client('mwaa', region_name=region)
    response = mwaa.create_web_login_token(Name=env_name)
						
    # Extract the web server hostname and login token
    web_server_host_name = response["WebServerHostname"]
    web_token = response["WebToken"]
						
    # Construct the URL needed for authentication 
    login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
    login_payload = {"token": web_token}
						
    # Make a POST request to the MWAA login url using the login payload
    response = requests.post(
      login_url,
      data=login_payload,
      timeout=10
    )
						
    # Check if login was successful
    if response.status_code == 200:
						
    # Return the hostname and the session cookie 
    return (
      web_server_host_name,
      response.cookies['_token']
    )
    else:
      # Log an error
      logging.error("Failed to log in: HTTP %d", response.status_code)
      return None
      except requests.RequestException as e:
      
      # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
      except Exception as e:
      
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------
#### [ Apache Airflow v2 ]

```
def get_session_info(region, env_name):
  logging.basicConfig(level=logging.INFO)

  try:
      # Initialize MWAA client and request a web login token
      mwaa = boto3.client('mwaa', region_name=region)
      response = mwaa.create_web_login_token(Name=env_name)
      
      # Extract the web server hostname and login token
      web_server_host_name = response["WebServerHostname"]
      web_token = response["WebToken"]
      
      # Construct the URL needed for authentication 
      login_url = f"https://{web_server_host_name}/aws_mwaa/login"
      login_payload = {"token": web_token}

      # Make a POST request to the MWAA login url using the login payload
      response = requests.post(
          login_url,
          data=login_payload,
          timeout=10
      )

      # Check if login was succesfull 
      if response.status_code == 200:
      
          # Return the hostname and the session cookie 
          return (
              web_server_host_name,
              response.cookies["session"]
          )
      else:
          # Log an error
          logging.error("Failed to log in: HTTP %d", response.status_code)
          return None
  except requests.RequestException as e:
       # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
  except Exception as e:
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------

Setelah otentikasi selesai, Anda memiliki kredensional untuk mulai mengirim permintaan ke titik akhir API. Dalam contoh di bagian berikut, gunakan titik akhir`dags/{dag_name}/dagRuns`.

------
#### [ Apache Airflow v3 ]

```
def trigger_dag(region, env_name, dag_id):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_id (str): ID of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and token for authentication
						try:
						web_server_host_name, jwt_token = get_token_info(region, env_name)
						if not jwt_token:
						logging.error("Authentication failed, no jwt token retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving token info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						request_headers = {
						"Authorization": f"Bearer {jwt_token}",
						"Content-Type": "application/json" # Good practice to include, even for GET
						}
						
						# sample request body input
						json_body = {"logical_date": "2025-09-17T14:15:00Z"}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, headers=request_headers, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_id = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_id)
```

------
#### [ Apache Airflow v2 ]

```
def trigger_dag(region, env_name, dag_name):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_name (str): Name of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and session cookie for authentication
						try:
						web_server_host_name, session_cookie = get_session_info(region, env_name)
						if not session_cookie:
						logging.error("Authentication failed, no session cookie retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving session info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						cookies = {"session": session_cookie}
						json_body = {"conf": {}}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, cookies=cookies, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_name = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_name)
```

------