

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案
<a name="samples-dag-run-info-to-csv"></a>

使用下列程式碼範例建立定向無環圖 (DAG)，查詢資料庫以取得一系列 DAG 執行資訊，並將資料寫入 Amazon S3 上存放`.csv`的檔案。

您可能想要從環境的 Aurora PostgreSQL 資料庫匯出資訊，以在本機檢查資料、將其封存在物件儲存中，或將其與 [Amazon S3 等工具結合到 Amazon Redshift 運算子](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html)和[資料庫清除](samples-database-cleanup.md)，以將 Amazon MWAA 中繼資料移出環境，但保留以供未來分析。

您可以查詢資料庫，尋找 [Apache Airflow 模型](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models)中列出的任何物件。此程式碼範例使用三種模型 `DagRun`、 `TaskFail`和 `TaskInstance`，可提供與 DAG 執行相關的資訊。

**Topics**
+ [版本](#samples-dag-run-info-to-csv-version)
+ [先決條件](#samples-dag-run-info-to-csv-prereqs)
+ [許可](#samples-dag-run-info-to-csv-permissions)
+ [要求](#samples-dag-run-info-to-csv-dependencies)
+ [範例程式碼](#samples-dag-run-info-to-csv-code)

## 版本
<a name="samples-dag-run-info-to-csv-version"></a>

您可以使用此頁面上的程式碼範例搭配 Python 3.10 中的 **Apache Airflow v2** 和 Python 3.11 中的 **Apache Airflow v**3。 [https://peps.python.org/pep-0619/](https://peps.python.org/pep-0619/) [https://peps.python.org/pep-0664/](https://peps.python.org/pep-0664/)

## 先決條件
<a name="samples-dag-run-info-to-csv-prereqs"></a>

若要使用此頁面上的範例程式碼，您需要下列項目：
+ [Amazon MWAA 環境](get-started.md)。
+ 您要匯出中繼資料資訊[的新 Amazon S3 儲存貯](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html)體。

## 許可
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA 需要 Amazon S3 動作的許可`s3:PutObject`，才能將查詢的中繼資料資訊寫入 Amazon S3。將下列政策陳述式新增至您環境的執行角色。

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

此政策僅限制對 *amzn-s3-demo-bucket* 的寫入存取。

## 要求
<a name="samples-dag-run-info-to-csv-dependencies"></a>

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例，不需要額外的相依性。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 來安裝 Apache Airflow。

## 範例程式碼
<a name="samples-dag-run-info-to-csv-code"></a>

下列步驟說明如何建立查詢 Aurora PostgreSQL 的 DAG，並將結果寫入新的 Amazon S3 儲存貯體。

1. 在終端機中，導覽至存放 DAG 程式碼的目錄。例如：

   ```
   cd dags
   ```

1. 複製下列程式碼範例的內容，並將其儲存為 `metadata_to_csv.py`。您可以變更指派給 的值`MAX_AGE_IN_DAYS`，以控制 DAG 從中繼資料資料庫查詢的最舊記錄的存留期。

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  執行下列 AWS CLI 命令，將 DAG 複製到您環境的儲存貯體，然後使用 Apache Airflow UI 觸發 DAG。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. 如果成功，您將在任務的任務日誌中輸出類似以下內容`export_db`：

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   您現在可以在 的新 Amazon S3 儲存貯體中存取和下載匯出`.csv`的檔案`/files/export/`。