

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Feature Store 特征处理器 SDK
<a name="feature-store-feature-processor-sdk"></a>

通过使用 `@feature_processor` 装饰器装饰您的转换函数来声明 Feature Store 特征处理器定义。适用于 Python 的 SageMaker AI SDK (Boto3) SDK 会自动从已配置的输入数据源加载数据，应用经过修饰的转换函数，然后将转换后的数据摄取到目标功能组。经过修饰的转换函数必须符合 `@feature_processor` 装饰器的预期签名。有关`@feature_processor`装饰器的更多信息，请参阅 Amazon Feature St [ore 中的 @feature\$1processor Dec](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) or SageMaker ator 阅读文档。

使用`@feature_processor`装饰器，您的转换函数在 Spark 运行时环境中运行，在该环境中，提供给您的函数的输入参数及其返回值都是 Spark DataFrames。转换函数中的输入参数数量必须与 `@feature_processor` 装饰器中配置的输入数量相匹配。

有关 `@feature_processor` 装饰器的更多信息，请参阅[特征处理器 Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor)。

以下代码是有关如何使用 `@feature_processor` 装饰器的基本示例。如需更具体的示例使用案例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

可以使用以下命令从 SageMaker Python SDK 及其附加组件安装功能处理器 SDK。

```
pip install sagemaker[feature-processor]
```

在以下示例中，`us-east-1` 是资源所在的区域，`111122223333` 是资源拥有者账户 ID，`your-feature-group-name` 是特征组名称。

以下是基本的特征处理器定义，其中 `@feature_processor` 装饰器将配置来自 Amazon S3 的 CSV 输入以加载并提供给您的转换函数（例如 `transform`），为摄取到特征组做好准备。最后一行运行该函数。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor` 参数包括：
+ `inputs` (List[str])：Feature Store 特征处理器中使用的数据源列表。如果数据源是特征组或存储在 Amazon S3 中，您可以将 Feature Store 提供的数据源定义用于特征处理器。有关功能商店提供的数据源定义的完整列表，请参阅 Amazon Feature Store 中的[ SageMaker 功能处理器数据源](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source)阅读文档。
+ `output` (str)：用于摄取经修饰函数的输出的特征组 ARN。
+ `target_stores` (Optional[List[str]])：要摄取到输出中的存储（例如 `OnlineStore` 或 `OfflineStore`）列表。如果未指定该参数，则数据将摄取到输出特征组的所有已启用存储。
+ `parameters` (Dict[str, Any])：要提供给转换函数的字典。
+ `enable_ingestion` (bool)：一个标志，用于指示转换函数的输出是否摄取到输出特征组。此标志在开发阶段很有用。如果未指定该标志，则会启用摄取。

可选的封装函数参数（如果在函数签名中提供，则作为参数提供）包括：
+ `params` (Dict[str, Any])：`@feature_processor` 参数中定义的字典。它还包含可使用键 `system` 引用的系统配置参数，例如 `scheduled_time` 参数。
+ `spark`(SparkSession)：对为 Spark 应用程序初始化的 SparkSession 实例的引用。

以下代码是使用 `params` 和 `spark` 参数的示例。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

`scheduled_time` 系统参数（在函数的 `params` 参数中提供）是支持重试每次执行的重要值。该值可帮助唯一标识特征处理器的执行情况，并可用作基于日期范围的输入（例如，仅加载最近 24 小时的数据）的参考点，以保证输入范围独立于代码的实际执行时间。如果特征处理器按计划运行（请参阅[特征处理器管道的计划执行和基于事件的执行](feature-store-feature-processor-schedule-pipeline.md)），则其值固定为计划运行的时间。在同步执行期间，可以使用 SDK 的执行 API 覆盖该参数，以支持数据回填或重新运行错过的执行等使用案例。如果特征处理器以任何其他方式运行，则其值为当前时间。

有关编写 Spark 代码的信息，请参阅 [Spark SQL 编程指南](https://spark.apache.org/docs/latest/sql-programming-guide.html)。

有关常见使用案例的更多代码示例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

请注意，用 `@feature_processor` 进行修饰的转换函数不会返回值。要以编程方式对函数进行测试，可以删除 `@feature_processor` 装饰器或为其打上猴子补丁，使其充当封装函数的直通函数。有关`@feature_processor`装饰器的更多详细信息，请参阅[亚马逊 SageMaker 功能商店 Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html)。