

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Verwenden Sie das folgende Codebeispiel, um einen gerichteten azyklischen Graph (DAG) zu erstellen, der die Datenbank nach einer Reihe von DAG-Ausführungsinformationen abfragt und die Daten in `.csv` Dateien schreibt, die auf Amazon S3 gespeichert sind.

Möglicherweise möchten Sie Informationen aus der Aurora PostgreSQL-Datenbank Ihrer Umgebung exportieren, um die Daten lokal zu untersuchen, sie im Objektspeicher zu archivieren oder sie mit Tools wie dem [Amazon S3 S3-to-Amazon-Redshift-Operator und der [Datenbankbereinigung](samples-database-cleanup.md) zu](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) kombinieren, um Amazon MWAA-Metadaten aus der Umgebung zu verschieben, sie aber für future Analysen aufzubewahren.

Sie können die Datenbank nach allen Objekten abfragen, die in [Apache Airflow Airflow-Modellen](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models) aufgeführt sind. In diesem Codebeispiel werden drei Modelle,, und`DagRun`, verwendet`TaskFail`, die Informationen bereitstellen`TaskInstance`, die für DAG-Läufe relevant sind.

**Topics**
+ [Version](#samples-dag-run-info-to-csv-version)
+ [Voraussetzungen](#samples-dag-run-info-to-csv-prereqs)
+ [Berechtigungen](#samples-dag-run-info-to-csv-permissions)
+ [Voraussetzungen](#samples-dag-run-info-to-csv-dependencies)
+ [Codebeispiel](#samples-dag-run-info-to-csv-code)

## Version
<a name="samples-dag-run-info-to-csv-version"></a>

[Sie können das Codebeispiel auf dieser Seite mit **Apache Airflow v2** in [Python 3.10](https://peps.python.org/pep-0619/) und **Apache Airflow v3** in Python 3.11 verwenden.](https://peps.python.org/pep-0664/)

## Voraussetzungen
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
+ Eine [Amazon MWAA-Umgebung](get-started.md).
+ Ein [neuer Amazon S3 S3-Bucket](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html), in den Sie Ihre Metadateninformationen exportieren möchten.

## Berechtigungen
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA benötigt für die Amazon S3-Aktion die Erlaubnis, die abgefragten Metadateninformationen in Amazon S3 `s3:PutObject` zu schreiben. Fügen Sie der Ausführungsrolle Ihrer Umgebung die folgende Richtlinienerklärung hinzu.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Diese Richtlinie beschränkt den Schreibzugriff nur auf*amzn-s3-demo-bucket*.

## Voraussetzungen
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images), um Apache Airflow zu installieren.

## Codebeispiel
<a name="samples-dag-run-info-to-csv-code"></a>

In den folgenden Schritten wird beschrieben, wie Sie eine DAG erstellen können, die Aurora PostgreSQL abfragt und das Ergebnis in Ihren neuen Amazon S3 S3-Bucket schreibt.

1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:

   ```
   cd dags
   ```

1. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter`metadata_to_csv.py`. Sie können den zugewiesenen Wert ändern, `MAX_AGE_IN_DAYS` um das Alter der ältesten Datensätze zu steuern, die Ihre DAG aus der Metadatendatenbank abfragt.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Wenn der Vorgang erfolgreich ist, geben Sie in den Aufgabenprotokollen für die Aufgabe eine Ausgabe ähnlich der `export_db` folgenden aus:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Sie können jetzt auf die exportierten `.csv` Dateien in Ihrem neuen Amazon S3 S3-Bucket in zugreifen und sie herunterladen`/files/export/`.