

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 为 Apache Airflow 变量使用 AWS Secrets Manager 中的密钥
<a name="samples-secrets-manager-var"></a>

以下示例调用 AWS Secrets Manager 来获取 Amazon MWAA 上的 Apache Airflow 变量的密钥。它假设您已完成 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 中的步骤。

**Topics**
+ [版本](#samples-secrets-manager-var-version)
+ [先决条件](#samples-secrets-manager-var-prereqs)
+ [权限](#samples-secrets-manager-var-permissions)
+ [要求](#samples-hive-dependencies)
+ [代码示例](#samples-secrets-manager-var-code)
+ [接下来做什么？](#samples-secrets-manager-var-next-up)

## 版本
<a name="samples-secrets-manager-var-version"></a>

您可以在 [Python 3.10](https://peps.python.org/pep-0619/) 中将本页上的代码示例与 **Apache Airflow v2** 一起使用，在 [Python 3.11](https://peps.python.org/pep-0664/) 中与 **Apache Airflow v3** 一起使用。

## 先决条件
<a name="samples-secrets-manager-var-prereqs"></a>

要使用本页上的示例代码，您需要以下内容：
+ 创建 Secrets Manager 后端作为 Apache Airflow 配置选项，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。
+ Secrets Manager 中的 Apache Airflow 变量字符串，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 权限
<a name="samples-secrets-manager-var-permissions"></a>
+ Secrets Manager 权限，如 [使用密钥配置 Apache Airflow 连接 AWS Secrets Manager](connections-secrets-manager.md) 所列。

## 要求
<a name="samples-hive-dependencies"></a>

要在 Apache Airflow v2 和更高版本中使用此代码示例，无需附加依赖项。使用 [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) 安装 Apache Airflow。

## 代码示例
<a name="samples-secrets-manager-var-code"></a>

以下步骤描述了如何创建 DAG 代码，以便调用 Secrets Manager 来获取密钥。

1. 在命令提示符下，导航到存储 DAG 代码的目录。例如：

   ```
   cd dags
   ```

1. 复制以下代码示例的内容，并在本地另存为 `secrets-manager-var.py`。

   ```
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.models import Variable
   from airflow.utils.dates import days_ago
   from datetime import timedelta
   import os
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   DEFAULT_ARGS = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email': ['airflow@example.com'],
       'email_on_failure': False,
       'email_on_retry': False,
   }
   def get_variable_fn(**kwargs):
       my_variable_name = Variable.get("test-variable", default_var="undefined")
       print("my_variable_name: ", my_variable_name)
       return my_variable_name
   with DAG(
       dag_id=DAG_ID,
       default_args=DEFAULT_ARGS,
       dagrun_timeout=timedelta(hours=2),
       start_date=days_ago(1),
       schedule_interval='@once',
       tags=['variable']
   ) as dag:
       get_variable = PythonOperator(
           task_id="get_variable",
           python_callable=get_variable_fn,
           provide_context=True
       )
   ```

## 接下来做什么？
<a name="samples-secrets-manager-var-next-up"></a>
+ 要了解如何将本示例中的 DAG 代码上传到 Amazon S3 存储桶的 `dags` 文件夹，请参阅 [添加或更新 DAG](configuring-dag-folder.md)。