

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Como exportar metadados do ambiente para arquivos CSV no Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Use o exemplo de código a seguir para criar um gráfico acíclico direcionado (DAG) que consulta o banco de dados em busca de uma variedade de informações de execução do DAG e grava os dados em arquivos `.csv` armazenados no Amazon S3.

Talvez você queira exportar informações do banco de dados Aurora PostgreSQL do seu ambiente para inspecionar os dados localmente, arquivá-los no armazenamento de objetos ou combiná-los com ferramentas como o [operador Amazon S3 para Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) e a [limpeza do banco de dados](samples-database-cleanup.md), a fim de mover os metadados do Amazon MWAA do ambiente, mas preservá-los para análise futura.

É possível consultar o banco de dados para qualquer um dos objetos listados nos [Modelos do Apache Airflow](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). Esse exemplo de código usa três modelos, `DagRun`, `TaskFail` e `TaskInstance`, que fornecem informações relevantes para executar o DAG.

**Topics**
+ [Versão](#samples-dag-run-info-to-csv-version)
+ [Pré-requisitos](#samples-dag-run-info-to-csv-prereqs)
+ [Permissões](#samples-dag-run-info-to-csv-permissions)
+ [Requisitos](#samples-dag-run-info-to-csv-dependencies)
+ [Exemplo de código](#samples-dag-run-info-to-csv-code)

## Versão
<a name="samples-dag-run-info-to-csv-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Para usar o código de amostra nesta página, você precisará do seguinte:
+ Um [ambiente Amazon MWAA](get-started.md).
+ Um [novo bucket do Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) no qual você deseja exportar suas informações de metadados.

## Permissões
<a name="samples-dag-run-info-to-csv-permissions"></a>

O Amazon MWAA precisa de permissão para que a ação `s3:PutObject` do Amazon S3 grave as informações de metadados consultadas no Amazon S3. Adicione a seguinte declaração de política ao perfil de execução do seu ambiente.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::amzn-s3-demo-bucket"
}
```

Essa política limita somente o acesso de gravação *amzn-s3-demo-bucket* a.

## Requisitos
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-dag-run-info-to-csv-code"></a>

As etapas a seguir descrevem como é possível criar um DAG que consulta o Aurora PostgreSQL e grava o resultado em seu novo bucket do Amazon S3.

1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como `metadata_to_csv.py`. É possível alterar o valor atribuído para `MAX_AGE_IN_DAYS` para controlar a idade dos registros mais antigos que seu DAG consulta no banco de dados de metadados.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se for bem-sucedido, você exibirá uma saída semelhante à seguinte nos logs de tarefas da tarefa `export_db`:

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Agora é possível acessar e baixar os arquivos `.csv` exportados em seu novo bucket do Amazon S3 em `/files/export/`.