

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 特征处理
<a name="feature-store-feature-processing"></a>

Amazon F SageMaker eature Store 功能处理是一种功能，您可以使用该功能将原始数据转换为机器学习 (ML) 功能。它为您提供特征处理器 SDK，您可以使用该 SDK 将批处理数据源中的数据转换并摄取到特征组中。有了这项功能，特征存放区就可以负责基础的基础设施，包括预配置计算环境以及创建和维护 Pipelines 以加载和摄取数据。这样，您就可以专注于特征处理器定义，其中包括一个转换函数（例如，产品浏览次数、交易额平均值）、多个来源（在哪里应用此转换）和多个接收器（将计算出的特征值写入到哪里）。

特征处理器管道是一个 Pipelines 管道。作为流水线，您还可以在控制台中跟踪带有 SageMaker AI 血统的预定功能处理器管道。有关 SageMaker AI Lineage 的更多信息，请参阅[亚马逊 SageMaker ML Lineage 追踪](lineage-tracking.md)这包括跟踪计划执行、可视化世系以追踪要素的数据源，以及在单一环境中查看共享功能处理器。有关在管理控制台中使用特征存放区的信息，请参阅 [从管理控制台查看管道执行情况](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)。

**Topics**
+ [

# Feature Store 特征处理器 SDK
](feature-store-feature-processor-sdk.md)
+ [

# 远程运行 Feature Store 特征处理器
](feature-store-feature-processor-execute-remotely.md)
+ [

# 创建和运行 Feature Store 特征处理器管道
](feature-store-feature-processor-create-execute-pipeline.md)
+ [

# 特征处理器管道的计划执行和基于事件的执行
](feature-store-feature-processor-schedule-pipeline.md)
+ [

# 监控 Amazon SageMaker 功能商店功能处理器管道
](feature-store-feature-processor-monitor-pipeline.md)
+ [

# IAM 权限和执行角色
](feature-store-feature-processor-iam-permissions.md)
+ [

# 特征处理器约束、限制和配额
](feature-store-feature-processor-quotas.md)
+ [

# 数据来源
](feature-store-feature-processor-data-sources.md)
+ [

# 常见使用案例的特征处理代码示例
](feature-store-feature-processor-examples.md)

# Feature Store 特征处理器 SDK
<a name="feature-store-feature-processor-sdk"></a>

通过使用 `@feature_processor` 装饰器装饰您的转换函数来声明 Feature Store 特征处理器定义。适用于 Python 的 SageMaker AI SDK (Boto3) SDK 会自动从已配置的输入数据源加载数据，应用经过修饰的转换函数，然后将转换后的数据摄取到目标功能组。经过修饰的转换函数必须符合 `@feature_processor` 装饰器的预期签名。有关`@feature_processor`装饰器的更多信息，请参阅 Amazon Feature St [ore 中的 @feature\$1processor Dec](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) or SageMaker ator 阅读文档。

使用`@feature_processor`装饰器，您的转换函数在 Spark 运行时环境中运行，在该环境中，提供给您的函数的输入参数及其返回值都是 Spark DataFrames。转换函数中的输入参数数量必须与 `@feature_processor` 装饰器中配置的输入数量相匹配。

有关 `@feature_processor` 装饰器的更多信息，请参阅[特征处理器 Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor)。

以下代码是有关如何使用 `@feature_processor` 装饰器的基本示例。如需更具体的示例使用案例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

可以使用以下命令从 SageMaker Python SDK 及其附加组件安装功能处理器 SDK。

```
pip install sagemaker[feature-processor]
```

在以下示例中，`us-east-1` 是资源所在的区域，`111122223333` 是资源拥有者账户 ID，`your-feature-group-name` 是特征组名称。

以下是基本的特征处理器定义，其中 `@feature_processor` 装饰器将配置来自 Amazon S3 的 CSV 输入以加载并提供给您的转换函数（例如 `transform`），为摄取到特征组做好准备。最后一行运行该函数。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor` 参数包括：
+ `inputs` (List[str])：Feature Store 特征处理器中使用的数据源列表。如果数据源是特征组或存储在 Amazon S3 中，您可以将 Feature Store 提供的数据源定义用于特征处理器。有关功能商店提供的数据源定义的完整列表，请参阅 Amazon Feature Store 中的[ SageMaker 功能处理器数据源](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source)阅读文档。
+ `output` (str)：用于摄取经修饰函数的输出的特征组 ARN。
+ `target_stores` (Optional[List[str]])：要摄取到输出中的存储（例如 `OnlineStore` 或 `OfflineStore`）列表。如果未指定该参数，则数据将摄取到输出特征组的所有已启用存储。
+ `parameters` (Dict[str, Any])：要提供给转换函数的字典。
+ `enable_ingestion` (bool)：一个标志，用于指示转换函数的输出是否摄取到输出特征组。此标志在开发阶段很有用。如果未指定该标志，则会启用摄取。

可选的封装函数参数（如果在函数签名中提供，则作为参数提供）包括：
+ `params` (Dict[str, Any])：`@feature_processor` 参数中定义的字典。它还包含可使用键 `system` 引用的系统配置参数，例如 `scheduled_time` 参数。
+ `spark`(SparkSession)：对为 Spark 应用程序初始化的 SparkSession 实例的引用。

以下代码是使用 `params` 和 `spark` 参数的示例。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

`scheduled_time` 系统参数（在函数的 `params` 参数中提供）是支持重试每次执行的重要值。该值可帮助唯一标识特征处理器的执行情况，并可用作基于日期范围的输入（例如，仅加载最近 24 小时的数据）的参考点，以保证输入范围独立于代码的实际执行时间。如果特征处理器按计划运行（请参阅[特征处理器管道的计划执行和基于事件的执行](feature-store-feature-processor-schedule-pipeline.md)），则其值固定为计划运行的时间。在同步执行期间，可以使用 SDK 的执行 API 覆盖该参数，以支持数据回填或重新运行错过的执行等使用案例。如果特征处理器以任何其他方式运行，则其值为当前时间。

有关编写 Spark 代码的信息，请参阅 [Spark SQL 编程指南](https://spark.apache.org/docs/latest/sql-programming-guide.html)。

有关常见使用案例的更多代码示例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

请注意，用 `@feature_processor` 进行修饰的转换函数不会返回值。要以编程方式对函数进行测试，可以删除 `@feature_processor` 装饰器或为其打上猴子补丁，使其充当封装函数的直通函数。有关`@feature_processor`装饰器的更多详细信息，请参阅[亚马逊 SageMaker 功能商店 Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html)。

# 远程运行 Feature Store 特征处理器
<a name="feature-store-feature-processor-execute-remotely"></a>

要在需要比本地可用硬件更强大的硬件的大型数据集上运行特征处理器，您可以使用装饰`@remote`器装饰代码，将本地 Python 代码作为单节点或多节点分布式 SageMaker 训练作业运行。有关将代码作为 SageMaker 训练作业运行的更多信息，请参阅[将你的本地代码当作 SageMaker 训练作业来运行](train-remote-decorator.md)。

以下是 `@remote` 装饰器和 `@feature_processor` 装饰器的用法示例。

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`spark_config` 参数表示远程作业作为 Spark 应用程序运行。该`SparkConfig`实例可用于配置 Spark 配置并为 Spark 应用程序提供其他依赖项 JARs，例如 Python 文件和文件。

为了在开发特征处理代码时更快地进行迭代，可以在 `@remote` 装饰器中指定 `keep_alive_period_in_seconds` 参数，将配置的资源保留在暖池中，以供后续训练作业使用。有关暖池的更多信息，请参阅《API 参考指南》中的 `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)`。

以下代码是本地 `requirements.txt:` 的示例

```
sagemaker>=2.167.0
```

这将在远程作业中安装相应的 SageMaker SDK 版本，这是执行注释的方法所`@feature-processor`必需的。

# 创建和运行 Feature Store 特征处理器管道
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

功能处理器 SDK 可 APIs 将您的特征处理器定义提升为完全托管的 SageMaker AI 管道。有关 Pipelines 的更多信息，请参阅 [管道概述](pipelines-overview.md)。要将您的功能处理器定义转换为 SageMaker AI Pipeline，请将 `to_pipeline` API 与您的特征处理器定义一起使用。您可以计划功能处理器定义的执行，通过 CloudWatch 指标对其进行操作监控，并将其与之集成 EventBridge 以充当事件源或订阅者。有关监控使用 Pipelines 创建的管道的更多信息，请参阅 [监控 Amazon SageMaker 功能商店功能处理器管道](feature-store-feature-processor-monitor-pipeline.md)。

要查看特征处理器管道，请参阅[从管理控制台查看管道执行情况](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)。

如果您的函数也用 `@remote` 装饰器进行修饰，则它的配置将会传输到特征处理器管道。可以使用 `@remote` 装饰器指定高级配置，例如计算实例类型和数量、运行时依赖项、网络和安全配置。

以下示例使用`to_pipeline`和`execute` APIs。

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

从语义上讲，`to_pipeline` API 是一项更新插入操作。如果管道已经存在，它会对其进行更新；否则，它会创建一个管道。

`to_pipeline`API 可以选择接受引用包含功能处理器定义的文件的 Amazon S3 URI，将其与功能处理器管道相关联，以跟踪转换函数及其在其 SageMaker AI 机器学习谱系中的版本。

要检索账户中包含每个特征处理器管道的列表，可以使用 `list_pipelines` API。随后向 `describe` API 返回与特征处理器管道相关的详细信息，包括但不限于管道和计划详细信息。

以下示例使用`list_pipelines`和`describe` APIs。

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# 特征处理器管道的计划执行和基于事件的执行
<a name="feature-store-feature-processor-schedule-pipeline"></a>

可以将 Amazon F SageMaker eature Store 功能处理管道执行配置为根据预先配置的计划自动和异步启动，也可以根据其他 AWS 服务事件的结果自动和异步启动。例如，您可以计划在每个月的第一天执行特征处理管道，或者将两个管道链接在一起，以便在源管道执行完成后自动执行目标管道。

**Topics**
+ [

## 基于计划的执行
](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [

## 基于事件的执行
](#feature-store-feature-processor-schedule-pipeline-event-based)

## 基于计划的执行
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

功能处理器 SDK 提供了一[https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)个 API，用于通过 Amazon S EventBridge cheduler 集成，定期运行功能处理器管道。可以使用[https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)参数使用、或`cron`表达式来指定计划 `at``rate`，其表达式与 Amazon 支持的表达式相同 EventBridge。从语义上讲，计划 API 是一项更新插入操作，即如果已经存在计划，它会更新计划；否则，它会创建计划。有关 EventBridge 表达式和示例的更多信息，请参阅《[日程 EventBridge 安排 EventBridge 器用户指南》中的日程安排类型](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html)。

以下示例使用特征处理器 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API，该 API 使用 `at`、`rate` 和 `cron` 表达式。

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

`schedule` API 中日期和时间输入的默认时区采用 UTC。有关 EventBridge 调度程序计划表达式的更多信息，请参阅 EventBridge 调度器 API 参考文档[https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)中的。

计划的特征处理器管道执行为您的转换函数提供了计划执行时间，可用作基于日期范围的输入的幂等性令牌或固定参考点。要禁用（即暂停）或重新启用计划，请分别使用带有 `‘DISABLED’` 或 `‘ENABLED’` 的 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API 的 `state` 参数。

有关特征处理器的信息，请参阅[特征处理器 SDK 数据源](feature-store-feature-processor-data-sources-sdk.md)。

## 基于事件的执行
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

可以将特征处理管道配置为在 AWS 事件发生时自动执行。特征处理 SDK 提供了一个接受源事件列表和目标管道的 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) 函数。源事件必须是 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)（用于指定管道和[执行状态](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus)事件）的实例。

该`put_trigger`函数配置 Amazon EventBridge 规则和目标以路由事件，并允许您指定 EventBridge 事件模式以响应任何 AWS 事件。有关这些概念的信息，请参阅 Amazon EventBridge [规则](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)、[目标](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)和[事件模式](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)。

可以启用或禁用触发器。 EventBridge 将使用 `put_trigger` API `role_arn` 参数中提供的角色启动目标管道执行。如果在 Amazon SageMaker Studio Classic 或笔记本环境中使用软件开发工具包，则默认使用执行角色。有关如何获取执行角色的信息，请参阅[获取执行角色](sagemaker-roles.md#sagemaker-roles-get-execution-role)。

以下示例将：
+ 使用 `to_pipeline` API 的 SageMaker AI 管道，它接收你的目标管道名称 (`target-pipeline`) 和你的转换函数 (`transform`)。有关您的特征处理器和转换函数的信息，请参阅[特征处理器 SDK 数据源](feature-store-feature-processor-data-sources-sdk.md)。
+ 使用 `put_trigger` API 设置触发器，它接受 `FeatureProcessorPipelineEvent` 用于事件并接受您的目标管道名称 (`target-pipeline`)。

  `FeatureProcessorPipelineEvent` 定义了源管道 (`source-pipeline`) 的状态变为 `Succeeded` 时的触发器。有关特征处理器管道事件函数的信息，请参阅 Feature Store 阅读文档中的 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)。

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

有关使用基于事件的触发器为特征处理器管道创建连续执行和自动重试的示例，请参阅[使用基于事件的触发器进行连续执行和自动重试](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries)。

有关使用基于事件的触发器创建连续*流式处理* 和使用基于事件的触发器自动重试的示例，请参阅[流式处理自定义数据源示例](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming)。

# 监控 Amazon SageMaker 功能商店功能处理器管道
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS 提供监控工具，用于实时监视您的 SageMaker Amazon AI 资源和应用程序，在出现问题时进行报告，并在适当时自动采取措施。特征存放区特征处理器管道是 Pipelines，因此可以使用标准的监控机制和集成。可以通过 Amazon 指标和 Amazon EventBridge 事件监控诸如执行失败之类的操作 CloudWatch指标。

有关如何监控和运行 Feature Store 特征处理器的更多信息，请参阅以下资源：
+ [监控 Amazon A SageMaker I 中的 AWS 资源](monitoring-overview.md)-有关监控和审计 SageMaker AI 资源活动的一般指南。
+ [SageMaker 管道指标](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines)-管道发出的 CloudWatch 指标。
+ [SageMaker 管道执行状态更改](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline)-为管道和执行发出 EventBridge的事件。
+ [对亚马逊 SageMaker 管道进行故障排除](pipelines-troubleshooting.md)：管道的一般调试和故障排除技巧。

Feature Store 功能处理器执行日志可在 CloudWatch `/aws/sagemaker/TrainingJobs`日志组下的 Amazon Logs 中找到，您可以在其中使用查找约定找到执行日志流。对于通过直接调用 `@feature_processor` 修饰的函数创建的执行，您可以在本地执行环境的控制台中找到日志。对于` @remote`装饰后的执行， CloudWatch 日志流名称包含函数名称和执行时间戳。对于功能处理器管道执行，该步骤的 CloudWatch 日志流包含`feature-processor`字符串和管道执行 ID。

功能商店功能处理器管道和最新执行状态可在 Amazon Studio Classic 中找到 Feat SageMaker ure Studio Classic 中的功能商店用户界面中给定功能组。与特征处理器管道相关的特征组作为输入或输出显示在 UI 中。此外，世系视图可以为上游执行提供上下文（例如生成数据的特征处理器管道和数据源），以便进一步调试。有关使用 Studio Classic 任务流水线视图的更多信息，请参阅 [从管理控制台查看任务流水线](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)。

# IAM 权限和执行角色
<a name="feature-store-feature-processor-iam-permissions"></a>

要使用 Amaz SageMaker on Python 软件开发工具包，需要与之交互的权限 AWS 服务。要实现特征处理器的全部功能，需要执行以下策略。您可以将[AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html)和[AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS 托管策略附加到您的 IAM 角色。有关将策略附加到 IAM 角色的信息，请参阅[向您的 IAM 角色添加策略](feature-store-adding-policies.md)。详见以下示例。

此政策适用的角色的信任策略必须允许“scheduler.amazonaws.com”、“sagemaker.amazonaws.com”和“glue.amazonaws.com”原则。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# 特征处理器约束、限制和配额
<a name="feature-store-feature-processor-quotas"></a>

Amazon Feature Store SageMaker 功能处理依赖于 SageMaker AI 机器学习 (ML) 谱系跟踪。Feature Store 特征处理器使用世系上下文来表示和跟踪特征处理管道和管道版本。每个 Feature Store 特征处理器都至少使用两个世系上下文（一个用于特征处理管道，另一个用于版本）。如果特征处理管道的输入或输出数据源发生变化，则会创建额外的世系上下文。您可以通过联系 AWS 支持提高上限来更新 SageMaker AI ML 血统限制。Feature Store 特征处理器所用资源的默认限制如下。有关 SageMaker AI ML 血统跟踪的信息，请参阅[亚马逊 SageMaker ML Lineage 追踪](lineage-tracking.md)。

有关 SageMaker AI 配额的更多信息，请参阅 A [mazon A SageMaker I 终端节点和配额](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)。

每个区域的世系限制
+ 上下文 - 500（软限制）
+ 构件 - 6000（软限制）
+ 关联 - 6000（软限制）

每个区域的训练限制
+ 训练作业的最长运行时间 - 432000 秒
+ 每个训练作业的实例数 - 20
+ 在当前区域内的此账户中，您每秒可以发出的 `CreateTrainingJob` 请求最大数 - 1 TPS
+ 便于集群重复使用的保持活动时间 - 3600 秒

每个区域的最大管道数和并发管道执行数
+ 每个账户允许的最大管道数 - 500
+ 每个账户允许的最大并发管道执行数 - 20
+ 管道执行超时时间 - 672 小时

# 数据来源
<a name="feature-store-feature-processor-data-sources"></a>

Amazon Feature Store SageMaker 功能处理支持多个数据源。特征处理器 SDK for Python (Boto3) 提供了从存储在 Amazon S3 中的特征组或对象加载数据的构造。此外，您可以创作自定义数据源以加载来自其他数据源的数据。有关 Feature Store 提供的数据源的信息，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

**Topics**
+ [

# 特征处理器 SDK 数据源
](feature-store-feature-processor-data-sources-sdk.md)
+ [

# 自定义数据源
](feature-store-feature-processor-data-sources-custom.md)
+ [

# 自定义数据源示例
](feature-store-feature-processor-data-sources-custom-examples.md)

# 特征处理器 SDK 数据源
<a name="feature-store-feature-processor-data-sources-sdk"></a>

适用于 Python 的亚马逊 SageMaker 功能商店功能处理器 SDK (Boto3) 提供了从存储在 Amazon S3 中的功能组或对象加载数据的构造。有关 Feature Store 提供的数据源定义的完整列表，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

有关如何使用 Feature Store Python SDK 数据源定义的示例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` 用于将特征组指定为特征处理器的输入数据源。可以从离线存储特征组加载数据。尝试从在线存储特征组加载数据将会导致验证错误。您可以指定开始偏移和结束偏移，将加载的数据限制在特定时间范围内。例如，可以指定一个“14 天”的开始偏移，以便仅加载最近两周的数据；还可以指定一个“7 天”的结束偏移，以便将输入限制为前一周的数据。

## Feature Store 提供的数据源定义
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Feature Store Python SDK 包含数据源定义，可用于为特征处理器指定各种输入数据源。其中包括 CSV、Parquet 和 Iceberg 表源。有关 Feature Store 提供的数据源定义的完整列表，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

# 自定义数据源
<a name="feature-store-feature-processor-data-sources-custom"></a>

在本页上，我们将介绍如何创建自定义数据源类并展示一些用法示例。对于自定义数据源，您可以使用提供的适用于 Python 的 SageMaker AI SDK (Boto3)，就像使用亚马逊 SageMaker 功能商店提供的 APIs 数据源一样。

要使用自定义数据源通过特征处理将数据转换并摄取到特征组中，需要使用以下类成员和函数来扩展 `PySparkDataSource` 类。
+ `data_source_name` (str)：数据源的任意名称。例如，Amazon Redshift、Snowflake 或 Glue Catalog ARN。
+ `data_source_unique_id` (str)：表示正在访问的特定资源的唯一标识符。例如，表名、DDB 表 ARN、Amazon S3 前缀。自定义数据源中所有使用相同 `data_source_unique_id` 的情况都将关联到世系视图中的同一数据源。世系包括有关特征处理工作流的执行代码、使用的数据源以及将它们摄取到特征组或特征的方式的信息。有关在 **Studio** 中查看特征组任务流水线的信息，请参阅 [从管理控制台查看任务流水线](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)。
+ `read_data` (func)：一种用于连接特征处理器的方法。返回一个 Spark 数据框。有关示例，请参阅[自定义数据源示例](feature-store-feature-processor-data-sources-custom-examples.md)。

`data_source_name` 和 `data_source_unique_id` 都用于唯一标识您的世系实体。以下是名为 `CustomDataSource` 的自定义数据源类的示例。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

本节提供特征处理器的自定义数据源实施的示例。有关自定义数据源的更多信息，请参阅[自定义数据源](feature-store-feature-processor-data-sources-custom.md)。

安全是我们 AWS 与客户的共同责任。 AWS 负责保护在中运行服务的基础架构 AWS 云。客户则对其所有必要的安全配置和管理任务负责。例如，不应在自定义数据源中对诸如数据存储访问凭证之类的密钥进行硬编码。您可以使用 AWS Secrets Manager 来管理这些证书。有关 Secrets Manager 的信息，请参阅[什么是 AWS Secrets Manager？](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 在 AWS Secrets Manager 用户指南中。以下示例将对您的凭证使用 Secrets Manager。

**Topics**
+ [

## Amazon Redshift 集群 (JDBC) 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [

## Snowflake 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [

## Databricks (JDBC) 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [

## 流式处理自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift 集群 (JDBC) 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift 提供了 JDBC 驱动程序，可用于通过 Spark 读取数据。有关如何下载 Amazon Redshift JDBC 驱动程序的信息，请参阅[下载 Amazon Redshift JDBC 驱动程序版本 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)。

要创建自定义 Amazon Redshift 数据源类，您将需要覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

要连接 Amazon Redshift 集群，您需要：
+ Amazon Redshift JDBC URL (`jdbc-url`)

  有关获取 Amazon Redshift JDBC URL 的信息，请参阅《Amazon Redshift 数据库开发人员指南》中的[获取 JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)。
+ Amazon Redshift 用户名 (`redshift-user`) 和密码 (`redshift-password`)

  有关如何使用 Amazon Redshift SQL 命令创建和管理数据库用户的信息，请参阅《Amazon Redshift 数据库开发人员指南》中的[用户](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)。
+ Amazon Redshift 表名 (`redshift-table-name`)

  有关如何创建表的信息和一些示例，请参阅《Amazon Redshift 数据库开发人员指南》中的[创建表](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)。
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-redshift-account-info`)，用于在 Secrets Manager 上存储您的 Amazon Redshift 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 JDBC URL 和个人访问令牌，并覆盖自定义数据源类 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

以下示例显示了如何将 `RedshiftDataSource` 连接到您的 `feature_processor` 装饰器。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 来提供 JDBC 驱动程序并将其传递给 `@remote` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake 提供了一个 Spark 连接器，可用于您的 `feature_processor` 装饰器。有关适用于 Spark 的 Snowflake 连接器的信息，请参阅 Snowflake 文档中的[适用于 Spark 的 Snowflake 连接器](https://docs.snowflake.com/en/user-guide/spark-connector)。

要创建自定义 Snowflake 数据源类，您需要覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，并将 Spark 连接器包添加到 Spark 类路径中。

要连接 Snowflake 数据源，您需要：
+ Snowflake URL (`sf-url`)

   URLs 有关如何访问 Snowflake 网页界面的信息，请参阅 Snowflake [文档中的账户标识符](https://docs.snowflake.com/en/user-guide/admin-account-identifier)。
+ Snowflake 数据库 (`sf-database`) 

  有关使用 Snowflake 获取数据库名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)。
+ Snowflake 数据库架构 (`sf-schema`) 

  有关使用 Snowflake 获取架构名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)。
+ Snowflake 仓库 (`sf-warehouse`)

  有关使用 Snowflake 获取仓库名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)。
+ Snowflake 表名 (`sf-table-name`)
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-snowflake-account-info`)，用于在 Secrets Manager 上存储 Snowflake 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 Snowflake 用户名和密码，并覆盖自定义数据源类 `SnowflakeDataSource` 的 `read_data` 函数。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

以下示例显示了如何将 `SnowflakeDataSource` 连接到您的 `feature_processor` 装饰器。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 提供软件包并将其传递给 `@remote` 装饰器。以下示例中的 Spark 包是这样的：`spark-snowflake_2.12` 是特征处理器 Scala 版本，`2.12.0` 是您要使用的 Snowflake 版本，`spark_3.3` 是特征处理器 Spark 版本。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark 可以使用 Databricks JDBC 驱动程序从 Databricks 读取数据。有关 Databricks JDBC 驱动程序的信息，请参阅 Databricks 文档中的[配置 Databricks ODBC 和 JDBC 驱动程序](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)。

**注意**  
通过在 Spark 类路径中包含相应的 JDBC 驱动程序，可以从任何其他数据库读取数据。有关更多信息，请参阅《Spark SQL 指南》中的 [JDBC 转换到其他数据库](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

要创建自定义 Databricks 数据源类，您需要覆盖[自定义数据源](feature-store-feature-processor-data-sources-custom.md)中的 `read_data` 方法，并将 JDBC jar 添加到 Spark 类路径中。

要连接 Databricks 数据源，您需要：
+ Databricks URL (`databricks-url`)

  有关 Databricks URL 的信息，请参阅 Databricks 文档中的[构建 Databricks 驱动程序的连接 URL](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)。
+ Databricks 个人访问令牌 (`personal-access-token`)

  有关 Databricks 访问令牌的信息，请参阅 Databricks 文档中的 [Databricks 个人访问令牌身份验证](https://docs.databricks.com/en/dev-tools/auth.html#pat)。
+ 数据目录名称 (`db-catalog`) 

  有关 Databricks 目录名称的信息，请参阅 Databricks 文档中的[目录名称](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)。
+ 架构名称 (`db-schema`)

  有关 Databricks 架构名称的信息，请参阅 Databricks 文档中的[架构名称](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)。
+ 表名 (`db-table-name`)

  有关 Databricks 表名的信息，请参阅 Databricks 文档中的[表名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)。
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-databricks-account-info`)，用于在 Secrets Manager 上存储 Databricks 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 JDBC URL 和个人访问令牌，并覆盖自定义数据源类 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

以下示例说明了如何将 JDBC 驱动程序 jar (`jdbc-jar-file-name.jar`) 上传到 Amazon S3，以便将其添加到 Spark 类路径中。有关从 Databricks 下载 Spark JDBC 驱动程序 (`jdbc-jar-file-name.jar`) 的信息，请参阅 Databricks 网站中的[下载 JDBC 驱动程序](https://www.databricks.com/spark/jdbc-drivers-download)。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 来提供 jar 并将其传递给 `@remote` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## 流式处理自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

您可以连接到 Amazon Kinesis 等流数据源，并使用 Spark Structured Streaming 创作转换以从流数据源读取数据。有关 Kinesis 连接器的信息，请参阅中的 [Spark 结构化直播的 Kinesis 连接器](https://github.com/roncemer/spark-sql-kinesis)。 GitHub有关 Amazon Kinesis 的信息，请参阅《Amazon Kinesis 开发人员指南》中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)。

要创建自定义 Amazon Kinesis 数据源类，您需要扩展 `BaseDataSource` 类并覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

要连接到 Amazon Kinesis 数据流，您需要：
+ Kinesis ARN (`kinesis-resource-arn`) 

  有关 Kinesis 数据流的信息 ARNs，请参阅《[亚马逊 Kinesis 开发者指南》中的 Kinesis 数据流的亚马逊资源名称 (ARNs)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)。
+ Kinesis 数据流名称 (`kinesis-stream-name`)
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

以下示例演示如何将 `KinesisDataSource` 连接到 `feature_processor` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

在以上示例代码中，我们在将微批处理流式传输到您的特征组时使用了一些 Spark Structured Streaming 选项。有关选项的完整列表，请参阅 Apache Spark 文档中的 [Structured Streaming 编程指南](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)。
+ `foreachBatch` 接收器模式是一项特征，允许您对流查询的每个微批处理的输出数据应用操作和写入逻辑。

  有关信息`foreachBatch`，请参阅《[使用 Foreach》和 ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)《Apache Spark 结构化流媒体编程指南》。
+ `checkpointLocation` 选项会定期保存流应用程序的状态。流日志保存在检查点位置 `s3a://checkpoint-path`。

  有关 `checkpointLocation` 选项的信息，请参阅《Apache Spark Structured Streaming 编程指南》中的[使用检查点操作从故障中恢复](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。
+ `trigger` 设置定义了在流应用程序中触发微批处理的频率。示例中使用处理时间触发器类型，微批处理间隔为一分钟，由 `trigger(processingTime="1 minute")` 指定。要从流式传输源进行回填，您可以使用由 `trigger(availableNow=True)` 指定的 available-now 触发器类型。

  有关 `trigger` 类型的完整列表，请参阅《Apache Spark Structured Streaming 编程指南》中的[触发器](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)。

**使用基于事件的触发器进行持续流式处理和自动重试**

功能处理器使用 SageMaker 训练作为计算基础架构，其最大运行时间限制为 28 天。您可以使用基于事件的触发器来延长连续流式处理时间，并从暂时性故障中恢复。有关计划执行和基于事件的执行的更多信息，请参阅[特征处理器管道的计划执行和基于事件的执行](feature-store-feature-processor-schedule-pipeline.md)。

下面提供了一个示例，介绍如何设置基于事件的触发器以保持流特征处理器管道持续运行。这使用在上一个示例中定义的流转换函数。可以将目标管道配置为在源管道执行发生 `STOPPED` 或 `FAILED` 事件时触发。请注意，源和目标使用的是同一管道，因此可以连续运行。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# 常见使用案例的特征处理代码示例
<a name="feature-store-feature-processor-examples"></a>

以下各示例为常见使用案例提供了特征处理代码示例。有关展示特定用例的更详细的示例笔记本，请参阅 [Amazon Feature Store SageMaker 功能处理笔记本](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb)。

在以下示例中，`us-east-1` 是资源所在的区域，`111122223333` 是资源拥有者账户 ID，`your-feature-group-name` 是特征组名称。

下面各示例中使用的 `transactions` 数据集具有以下架构：

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [

## 联接多个数据源中的数据
](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [

## 滑动窗口聚合
](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [

## 滚动窗口聚合
](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [

## 从离线存储提升到在线存储
](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [

## 使用 Pandas 库进行转换
](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [

## 使用基于事件的触发器进行连续执行和自动重试
](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## 联接多个数据源中的数据
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## 滑动窗口聚合
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## 滚动窗口聚合
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## 从离线存储提升到在线存储
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## 使用 Pandas 库进行转换
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**使用 Pandas 库进行转换**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## 使用基于事件的触发器进行连续执行和自动重试
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```