

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 数据来源
<a name="feature-store-feature-processor-data-sources"></a>

Amazon Feature Store SageMaker 功能处理支持多个数据源。特征处理器 SDK for Python (Boto3) 提供了从存储在 Amazon S3 中的特征组或对象加载数据的构造。此外，您可以创作自定义数据源以加载来自其他数据源的数据。有关 Feature Store 提供的数据源的信息，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

**Topics**
+ [

# 特征处理器 SDK 数据源
](feature-store-feature-processor-data-sources-sdk.md)
+ [

# 自定义数据源
](feature-store-feature-processor-data-sources-custom.md)
+ [

# 自定义数据源示例
](feature-store-feature-processor-data-sources-custom-examples.md)

# 特征处理器 SDK 数据源
<a name="feature-store-feature-processor-data-sources-sdk"></a>

适用于 Python 的亚马逊 SageMaker 功能商店功能处理器 SDK (Boto3) 提供了从存储在 Amazon S3 中的功能组或对象加载数据的构造。有关 Feature Store 提供的数据源定义的完整列表，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

有关如何使用 Feature Store Python SDK 数据源定义的示例，请参阅[常见使用案例的特征处理代码示例](feature-store-feature-processor-examples.md)。

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` 用于将特征组指定为特征处理器的输入数据源。可以从离线存储特征组加载数据。尝试从在线存储特征组加载数据将会导致验证错误。您可以指定开始偏移和结束偏移，将加载的数据限制在特定时间范围内。例如，可以指定一个“14 天”的开始偏移，以便仅加载最近两周的数据；还可以指定一个“7 天”的结束偏移，以便将输入限制为前一周的数据。

## Feature Store 提供的数据源定义
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Feature Store Python SDK 包含数据源定义，可用于为特征处理器指定各种输入数据源。其中包括 CSV、Parquet 和 Iceberg 表源。有关 Feature Store 提供的数据源定义的完整列表，请参阅[特征处理器数据源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

# 自定义数据源
<a name="feature-store-feature-processor-data-sources-custom"></a>

在本页上，我们将介绍如何创建自定义数据源类并展示一些用法示例。对于自定义数据源，您可以使用提供的适用于 Python 的 SageMaker AI SDK (Boto3)，就像使用亚马逊 SageMaker 功能商店提供的 APIs 数据源一样。

要使用自定义数据源通过特征处理将数据转换并摄取到特征组中，需要使用以下类成员和函数来扩展 `PySparkDataSource` 类。
+ `data_source_name` (str)：数据源的任意名称。例如，Amazon Redshift、Snowflake 或 Glue Catalog ARN。
+ `data_source_unique_id` (str)：表示正在访问的特定资源的唯一标识符。例如，表名、DDB 表 ARN、Amazon S3 前缀。自定义数据源中所有使用相同 `data_source_unique_id` 的情况都将关联到世系视图中的同一数据源。世系包括有关特征处理工作流的执行代码、使用的数据源以及将它们摄取到特征组或特征的方式的信息。有关在 **Studio** 中查看特征组任务流水线的信息，请参阅 [从管理控制台查看任务流水线](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)。
+ `read_data` (func)：一种用于连接特征处理器的方法。返回一个 Spark 数据框。有关示例，请参阅[自定义数据源示例](feature-store-feature-processor-data-sources-custom-examples.md)。

`data_source_name` 和 `data_source_unique_id` 都用于唯一标识您的世系实体。以下是名为 `CustomDataSource` 的自定义数据源类的示例。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

本节提供特征处理器的自定义数据源实施的示例。有关自定义数据源的更多信息，请参阅[自定义数据源](feature-store-feature-processor-data-sources-custom.md)。

安全是我们 AWS 与客户的共同责任。 AWS 负责保护在中运行服务的基础架构 AWS 云。客户则对其所有必要的安全配置和管理任务负责。例如，不应在自定义数据源中对诸如数据存储访问凭证之类的密钥进行硬编码。您可以使用 AWS Secrets Manager 来管理这些证书。有关 Secrets Manager 的信息，请参阅[什么是 AWS Secrets Manager？](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) 在 AWS Secrets Manager 用户指南中。以下示例将对您的凭证使用 Secrets Manager。

**Topics**
+ [

## Amazon Redshift 集群 (JDBC) 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [

## Snowflake 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [

## Databricks (JDBC) 自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [

## 流式处理自定义数据源示例
](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift 集群 (JDBC) 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift 提供了 JDBC 驱动程序，可用于通过 Spark 读取数据。有关如何下载 Amazon Redshift JDBC 驱动程序的信息，请参阅[下载 Amazon Redshift JDBC 驱动程序版本 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)。

要创建自定义 Amazon Redshift 数据源类，您将需要覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

要连接 Amazon Redshift 集群，您需要：
+ Amazon Redshift JDBC URL (`jdbc-url`)

  有关获取 Amazon Redshift JDBC URL 的信息，请参阅《Amazon Redshift 数据库开发人员指南》中的[获取 JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)。
+ Amazon Redshift 用户名 (`redshift-user`) 和密码 (`redshift-password`)

  有关如何使用 Amazon Redshift SQL 命令创建和管理数据库用户的信息，请参阅《Amazon Redshift 数据库开发人员指南》中的[用户](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)。
+ Amazon Redshift 表名 (`redshift-table-name`)

  有关如何创建表的信息和一些示例，请参阅《Amazon Redshift 数据库开发人员指南》中的[创建表](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)。
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-redshift-account-info`)，用于在 Secrets Manager 上存储您的 Amazon Redshift 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 JDBC URL 和个人访问令牌，并覆盖自定义数据源类 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

以下示例显示了如何将 `RedshiftDataSource` 连接到您的 `feature_processor` 装饰器。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 来提供 JDBC 驱动程序并将其传递给 `@remote` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake 提供了一个 Spark 连接器，可用于您的 `feature_processor` 装饰器。有关适用于 Spark 的 Snowflake 连接器的信息，请参阅 Snowflake 文档中的[适用于 Spark 的 Snowflake 连接器](https://docs.snowflake.com/en/user-guide/spark-connector)。

要创建自定义 Snowflake 数据源类，您需要覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，并将 Spark 连接器包添加到 Spark 类路径中。

要连接 Snowflake 数据源，您需要：
+ Snowflake URL (`sf-url`)

   URLs 有关如何访问 Snowflake 网页界面的信息，请参阅 Snowflake [文档中的账户标识符](https://docs.snowflake.com/en/user-guide/admin-account-identifier)。
+ Snowflake 数据库 (`sf-database`) 

  有关使用 Snowflake 获取数据库名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)。
+ Snowflake 数据库架构 (`sf-schema`) 

  有关使用 Snowflake 获取架构名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)。
+ Snowflake 仓库 (`sf-warehouse`)

  有关使用 Snowflake 获取仓库名称的信息，请参阅 Snowflake 文档中的 [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)。
+ Snowflake 表名 (`sf-table-name`)
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-snowflake-account-info`)，用于在 Secrets Manager 上存储 Snowflake 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 Snowflake 用户名和密码，并覆盖自定义数据源类 `SnowflakeDataSource` 的 `read_data` 函数。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

以下示例显示了如何将 `SnowflakeDataSource` 连接到您的 `feature_processor` 装饰器。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 提供软件包并将其传递给 `@remote` 装饰器。以下示例中的 Spark 包是这样的：`spark-snowflake_2.12` 是特征处理器 Scala 版本，`2.12.0` 是您要使用的 Snowflake 版本，`spark_3.3` 是特征处理器 Spark 版本。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) 自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark 可以使用 Databricks JDBC 驱动程序从 Databricks 读取数据。有关 Databricks JDBC 驱动程序的信息，请参阅 Databricks 文档中的[配置 Databricks ODBC 和 JDBC 驱动程序](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)。

**注意**  
通过在 Spark 类路径中包含相应的 JDBC 驱动程序，可以从任何其他数据库读取数据。有关更多信息，请参阅《Spark SQL 指南》中的 [JDBC 转换到其他数据库](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

要创建自定义 Databricks 数据源类，您需要覆盖[自定义数据源](feature-store-feature-processor-data-sources-custom.md)中的 `read_data` 方法，并将 JDBC jar 添加到 Spark 类路径中。

要连接 Databricks 数据源，您需要：
+ Databricks URL (`databricks-url`)

  有关 Databricks URL 的信息，请参阅 Databricks 文档中的[构建 Databricks 驱动程序的连接 URL](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)。
+ Databricks 个人访问令牌 (`personal-access-token`)

  有关 Databricks 访问令牌的信息，请参阅 Databricks 文档中的 [Databricks 个人访问令牌身份验证](https://docs.databricks.com/en/dev-tools/auth.html#pat)。
+ 数据目录名称 (`db-catalog`) 

  有关 Databricks 目录名称的信息，请参阅 Databricks 文档中的[目录名称](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)。
+ 架构名称 (`db-schema`)

  有关 Databricks 架构名称的信息，请参阅 Databricks 文档中的[架构名称](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)。
+ 表名 (`db-table-name`)

  有关 Databricks 表名的信息，请参阅 Databricks 文档中的[表名](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)。
+ （可选）如果使用 Secrets Manager，则需要密钥名称 (`secret-databricks-account-info`)，用于在 Secrets Manager 上存储 Databricks 访问用户名和密码。

  有关 Secrets Manager 的信息，请参阅 AWS Secrets Manager 用户指南[AWS Secrets Manager中的查找密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下示例演示如何从 Secrets Manager 中检索 JDBC URL 和个人访问令牌，并覆盖自定义数据源类 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

以下示例说明了如何将 JDBC 驱动程序 jar (`jdbc-jar-file-name.jar`) 上传到 Amazon S3，以便将其添加到 Spark 类路径中。有关从 Databricks 下载 Spark JDBC 驱动程序 (`jdbc-jar-file-name.jar`) 的信息，请参阅 Databricks 网站中的[下载 JDBC 驱动程序](https://www.databricks.com/spark/jdbc-drivers-download)。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

要远程运行特征处理器作业，您需要通过定义 `SparkConfig` 来提供 jar 并将其传递给 `@remote` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## 流式处理自定义数据源示例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

您可以连接到 Amazon Kinesis 等流数据源，并使用 Spark Structured Streaming 创作转换以从流数据源读取数据。有关 Kinesis 连接器的信息，请参阅中的 [Spark 结构化直播的 Kinesis 连接器](https://github.com/roncemer/spark-sql-kinesis)。 GitHub有关 Amazon Kinesis 的信息，请参阅《Amazon Kinesis 开发人员指南》中的[什么是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)。

要创建自定义 Amazon Kinesis 数据源类，您需要扩展 `BaseDataSource` 类并覆盖 [自定义数据源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

要连接到 Amazon Kinesis 数据流，您需要：
+ Kinesis ARN (`kinesis-resource-arn`) 

  有关 Kinesis 数据流的信息 ARNs，请参阅《[亚马逊 Kinesis 开发者指南》中的 Kinesis 数据流的亚马逊资源名称 (ARNs)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)。
+ Kinesis 数据流名称 (`kinesis-stream-name`)
+ AWS 区域 (`your-region`)

  有关使用 SDK for Python (Boto3) 获取当前会话的区域名称的信息，请参阅 Boto3 文档中的 [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

以下示例演示如何将 `KinesisDataSource` 连接到 `feature_processor` 装饰器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

在以上示例代码中，我们在将微批处理流式传输到您的特征组时使用了一些 Spark Structured Streaming 选项。有关选项的完整列表，请参阅 Apache Spark 文档中的 [Structured Streaming 编程指南](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)。
+ `foreachBatch` 接收器模式是一项特征，允许您对流查询的每个微批处理的输出数据应用操作和写入逻辑。

  有关信息`foreachBatch`，请参阅《[使用 Foreach》和 ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)《Apache Spark 结构化流媒体编程指南》。
+ `checkpointLocation` 选项会定期保存流应用程序的状态。流日志保存在检查点位置 `s3a://checkpoint-path`。

  有关 `checkpointLocation` 选项的信息，请参阅《Apache Spark Structured Streaming 编程指南》中的[使用检查点操作从故障中恢复](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。
+ `trigger` 设置定义了在流应用程序中触发微批处理的频率。示例中使用处理时间触发器类型，微批处理间隔为一分钟，由 `trigger(processingTime="1 minute")` 指定。要从流式传输源进行回填，您可以使用由 `trigger(availableNow=True)` 指定的 available-now 触发器类型。

  有关 `trigger` 类型的完整列表，请参阅《Apache Spark Structured Streaming 编程指南》中的[触发器](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)。

**使用基于事件的触发器进行持续流式处理和自动重试**

功能处理器使用 SageMaker 训练作为计算基础架构，其最大运行时间限制为 28 天。您可以使用基于事件的触发器来延长连续流式处理时间，并从暂时性故障中恢复。有关计划执行和基于事件的执行的更多信息，请参阅[特征处理器管道的计划执行和基于事件的执行](feature-store-feature-processor-schedule-pipeline.md)。

下面提供了一个示例，介绍如何设置基于事件的触发器以保持流特征处理器管道持续运行。这使用在上一个示例中定义的流转换函数。可以将目标管道配置为在源管道执行发生 `STOPPED` 或 `FAILED` 事件时触发。请注意，源和目标使用的是同一管道，因此可以连续运行。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```