

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Exportación de metadatos del entorno a archivos CSV en Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Use el siguiente código de muestra para crear un gráfico acíclico dirigido (DAG) que consulte en la base de datos un rango de información de ejecución del DAG y escriba los datos en los archivos `.csv` almacenados en Amazon S3.

Es posible que deba exportar información de la base de datos Aurora PostgreSQL de su entorno para inspeccionar los datos localmente, archivarlos en un almacenamiento de objetos o combinarlos con herramientas como el [operador Amazon S3 a Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) y la [limpieza de la base de datos](samples-database-cleanup.md), a fin de sacar los metadatos de Amazon MWAA del entorno y conservarlos para futuros análisis.

Puede consultar en la base de datos cualquiera de los objetos enumerados en los [modelos de Apache Airflow.](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models) En este código de muestra, se usan tres modelos, `DagRun`, `TaskFail` y `TaskInstance`, que proporcionan información relevante para las ejecuciones del DAG.

**Topics**
+ [Versión](#samples-dag-run-info-to-csv-version)
+ [Requisitos previos](#samples-dag-run-info-to-csv-prereqs)
+ [Permisos](#samples-dag-run-info-to-csv-permissions)
+ [Requisitos](#samples-dag-run-info-to-csv-dependencies)
+ [Código de ejemplo](#samples-dag-run-info-to-csv-code)

## Versión
<a name="samples-dag-run-info-to-csv-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).
+ Un [nuevo bucket de Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) a donde desea exportar su información de metadatos.

## Permisos
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA necesita permiso para que la acción `s3:PutObject` de Amazon S3 escriba la información de metadatos consultada en Amazon S3. Añada la siguiente instrucción de política al rol de ejecución de su entorno.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Esta política limita el acceso por escrito únicamente a*amzn-s3-demo-bucket*.

## Requisitos
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-dag-run-info-to-csv-code"></a>

Los siguientes pasos describen cómo puede crear un DAG que consulte Aurora PostgreSQL y escriba el resultado en su nuevo bucket de Amazon S3.

1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `metadata_to_csv.py`. Puede cambiar el valor asignado `MAX_AGE_IN_DAYS` para controlar la antigüedad de los registros más antiguos que el DAG consulta en la base de datos de metadatos.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Ejecute el siguiente AWS CLI comando para copiar el DAG al bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas para la tarea `export_db`:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Ahora puede acceder a los archivos `.csv` exportados y descargarlos en su nuevo bucket de Amazon S3 en `/files/export/`.