

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 功能處理
<a name="feature-store-feature-processing"></a>

Amazon SageMaker 功能存放區功能處理是一種可讓您將原始資料轉換為機器學習 (ML) 功能的功能。它為您提供了功能處理器 SDK，您可以使用該 SDK 將批次資料來源中的資料轉換並擷取到功能群組中。透過此功能，特徵存放區負責基礎架構，包括佈建運算環境，以及建立和維護 Pipelines 以載入和擷取資料。這樣，您就可以專注於包含轉換函式 (例如，產品檢視計數、交易值平均值)、來源 (套用此轉換的位置) 和接收器 (將計算功能值寫入的位置) 的功能處理器定義。

特徵處理器管道是 Pipelines 管道。作為 Pipelines，您也可以在主控台中使用 SageMaker AI 歷程來追蹤排程的特徵處理器管道。如需 SageMaker AI 歷程的詳細資訊，請參閱[Amazon SageMaker 機器學習 (ML) 歷程追蹤](lineage-tracking.md)。這包含追蹤排程執行、視覺化歷程以追蹤特徵回到其資料來源，以及在單一環境中檢視共用特徵處理器。如需使用 Feature Store 搭配主控台的相關資訊，請參閱[從主控台檢視管道執行](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)。

**Topics**
+ [功能儲存功能處理器 SDK](feature-store-feature-processor-sdk.md)
+ [遠端執行功能儲存功能處理器](feature-store-feature-processor-execute-remotely.md)
+ [建立和執行功能儲存功能處理器管線](feature-store-feature-processor-create-execute-pipeline.md)
+ [以排程和事件為基礎執行特徵處理器管道](feature-store-feature-processor-schedule-pipeline.md)
+ [監控 Amazon SageMaker Feature Store 特徵處理器管道](feature-store-feature-processor-monitor-pipeline.md)
+ [IAM 許可和執行角色](feature-store-feature-processor-iam-permissions.md)
+ [特徵處理器限制、上限和配額](feature-store-feature-processor-quotas.md)
+ [資料來源](feature-store-feature-processor-data-sources.md)
+ [常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)

# 功能儲存功能處理器 SDK
<a name="feature-store-feature-processor-sdk"></a>

透過使用`@feature_processor`裝飾器裝飾轉換函式來宣告特徵商店特徵處理器定義。SageMaker AI SDK for Python (Boto3) 會自動從設定的輸入資料來源載入資料、套用裝飾的轉換函式，然後將轉換後的資料擷取到目標特徵群組。裝飾的轉換函式必須符合`@feature_processor`裝飾器的預期簽名。如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 Amazon SageMaker Feature Store中的 [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator)閱讀文件。

使用 `@feature_processor` 裝飾器，您的轉換函式在 Spark 執行期環境中執行，其中提供給函式及其傳回值的輸入引數是 Spark 數 DataFrames。轉換函式中的輸入參數數目必須與`@feature_processor`裝飾器中配置的輸入數目相符。

如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 [Python 的特徵處理器特徵商店 SDK (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor)。

下面的代碼是有關如何使用`@feature_processor`裝飾器的基本範例。如需更具體的使用案例範例，請參閱[常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)。

您可以使用下列指令從 SageMaker SDK 及其附加功能安裝功能處理器 SDK。

```
pip install sagemaker[feature-processor]
```

在下列範例中，`us-east-1`是資源的區域、`111122223333`資源擁有者帳戶 ID，`your-feature-group-name`是特徵群組名稱。

以下是基本特徵處理器定義，其中`@feature_processor`裝飾器會將 Amazon S3 的 CSV 輸入設定為載入並提供給轉換函式 (例如`transform`)，並準備擷取至特徵群組。最後一行執行它。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

`@feature_processor`參數包括：
+ `inputs` (List[str])：Feature Store 特徵處理器中使用的資料來源清單。如果您的資料來源是功能群組或存放在 Amazon S3 中，您可以使用功能存放區為功能處理器提供的資料來源定義。如需特徵商店提供的資料來源定義的完整清單，請參閱 [Amazon SageMaker Feature Store 中的特徵處理器資料來源](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source)閱讀文件。
+ `output`(str)：特徵群組的 ARN，以擷取裝飾函式的輸出。
+ `target_stores` (Optinal[List[str]])：要擷取至輸出的儲存清單 (例如，`OnlineStore`或`OfflineStore`)。如果未指定，則會將資料擷取到所有輸出功能群組的已啟用存放區。
+ `parameters` (Dict[str, Any])：要提供給您的轉換函式的字典。
+ `enable_ingestion` (bool)：指出轉換函式的輸出是否已擷取至輸出特徵群組的旗標。此標誌在開發階段非常有用。如果未指定，則會啟用擷取。

可選的包裝函式參數 (如果在函式簽名中提供，則作為引數提供) 包括：
+ `params` (Dict[str, Any])：在`@feature_processor`參數中定義的字典。它還包含系統設定的參數，這些參數可以與鍵 `system` 一起參考，例如`scheduled_time`參數。
+ `spark`(SparkSession)：對 Spark 應用程式初始化的 SparkSession 執行個體的引用。

以下程式碼是使用 `params` 和 `spark` 參數的範例。

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

`scheduled_time`系統參數 (在函式的`params`引數中提供) 是支援重試每次執行的重要值。此值有助於唯一識別特徵處理器的執行，並可作為以日期為基礎的輸入的參考點 (例如，僅載入最近 24 小時的資料)，以確保輸入範圍與程式碼的實際執行時間無關。如果特徵處理器按照排程執行 (請參閱[以排程和事件為基礎執行特徵處理器管道](feature-store-feature-processor-schedule-pipeline.md))，則其值會固定為排定執行的時間。在同步執行期間，可以使用 SDK 的執行 API 覆寫引數，以支援資料回填或重新執行遺漏的過去執行等使用案例。如果功能處理器以任何其他方式執行，則其值是目前的時間。

如需撰寫 Spark 程式碼的相關資訊，請參閱 [Spark SQL 程式設計指南](https://spark.apache.org/docs/latest/sql-programming-guide.html)。

如需常見使用案例的更多程式碼範例，請參閱[常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)。

請注意，用裝飾的轉換函式`@feature_processor`不會返回值。要以編程方式測試您的函式，您可以刪除或猴子修補`@feature_processor`裝飾器，使其充當包裝函式的傳遞。如需有關 `@feature_processor` 裝飾器的詳細資訊，請參閱 [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html)。

# 遠端執行功能儲存功能處理器
<a name="feature-store-feature-processor-execute-remotely"></a>

若要在需要比本機可用硬體更強大的大型資料集上執行特徵處理器，您可以使用`@remote`裝飾器來裝飾程式碼，將本機 Python 程式碼當做單一或多節點分散式 SageMaker 訓練工作執行。如需將程式碼做為 SageMaker 訓練工作執行的詳細資訊，請參閱[以 SageMaker 訓練工作方式執行本機代碼](train-remote-decorator.md)。

以下是 `@remote` 裝飾器和 `@feature_processor` 裝飾器的使用範例。

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

此`spark_config`參數指出遠端工作以 Spark 應用程式的形式執行。該`SparkConfig`執行個體可用於配置 Spark 組態，並提供額外的相依性到 Spark 應用程式，如 Python 文件、JAR 和文件。

為了在開發特徵處理代碼時更快地迭代，您可以在`@remote`裝飾器中指定`keep_alive_period_in_seconds`引數，以將設定的資源保留在暖集區中，以供後續訓練任務使用。如需有關暖集區的更多資訊，請參閱 API 參考指南中的 `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)`。

以下是本機 `requirements.txt:` 範例

```
sagemaker>=2.167.0
```

這將在遠端工作中安裝對應的 SageMaker SDK 版本，這是執行由 `@feature-processor` 註釋的方法所需的。

# 建立和執行功能儲存功能處理器管線
<a name="feature-store-feature-processor-create-execute-pipeline"></a>

特徵處理器 SDK 提供 API，可將您的特徵處理器定義提升為完全受控的 SageMaker AI 管道。如需 Pipelines 的詳細資訊，請參閱 [管道概觀](pipelines-overview.md)。若要將特徵處理器定義轉換為 SageMaker AI 管道，請使用 `to_pipeline` API 搭配您的特徵處理器定義。您可以排程功能處理器定義的執行，並使用 CloudWatch 指標進行操作監控，並將它們與 EventBridge 整合以充當事件來源或訂閱者。如需監控使用 Pipelines 建立的管道的詳細資訊，請參閱[監控 Amazon SageMaker Feature Store 特徵處理器管道](feature-store-feature-processor-monitor-pipeline.md)。

若要檢視特徵處理器管道，請參閱[從主控台檢視管道執行](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio)。

如果您的函式也使用 `@remote` 裝飾器進行裝飾，則其組態將轉移到特徵處理器管道。您可以使用 `@remote` 裝飾器指定進階組態，例如運算執行個體類型和計數、執行期相依性、網路和安全組態。

以下範例使用了 `to_pipeline` 和 `execute` API。

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

`to_pipeline` API 在語意上是更新插入作業。如果管道已存在，則更新管道；如果不存在管道，則建立管道。

`to_pipeline` API 可選擇接受 Amazon S3 URI，該 URI 參考包含特徵處理器定義的檔案，以將其與特徵處理器管道建立關聯，從而追蹤轉換函式及其 SageMaker AI 機器學習歷程中的版本。

若要擷取您的帳戶中每個特徵處理器管道的清單，您可以透過 `list_pipelines` API。對 `describe` API 的後續請求會傳回與特徵處理器管道相關的詳細資訊，包含但不限於 Pipelines 和排程詳細資訊。

以下範例使用了 `list_pipelines` 和 `describe` API。

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# 以排程和事件為基礎執行特徵處理器管道
<a name="feature-store-feature-processor-schedule-pipeline"></a>

Amazon SageMaker Feature Store Feature Processing 管道執行可以設定為根據預先設定的排程或由於另一個 AWS 服務事件以自動和非同步方式啟動。例如，您可以將特徵處理管道排定在每月的第一天執行，或將兩條管道串連在一起，待來源管道執行完成後自動執行目標管道。

**Topics**
+ [基於排程執行](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [事件型執行](#feature-store-feature-processor-schedule-pipeline-event-based)

## 基於排程執行
<a name="feature-store-feature-processor-schedule-pipeline-schedule-based"></a>

特徵處理器 SDK 提供 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API，可透過 Amazon EventBridge 排程器整合定期執行特徵處理器管道。您可以使用 [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression) 參數 (與 Amazon EventBridge 支援的運算式相同)，使用 `at`、`rate`、或 `cron` 運算式來指定排程。排程 API 在語意上是更新或新增作業，如果排程存在，則更新排程；如果排程不存在，則建立排程。如需有關 EventBridge 運算式及範例的更多資訊，請參閱 EventBridge 排程器使用者指南中的 [EventBridge 排程器上的排程類型](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html)。

以下範例使用了特徵處理器 API [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule)，使用 `at`、`rate` 和 `cron` 運算式。

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

`schedule` API 中日期和時間輸入的預設時區為 UTC。如需有關 EventBridge 排程器排程運算式的更多相關資訊，請參閱 EventBridge 排程器 API 參考文件中的 [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression)。

排程的特徵處理器管道執行為您的轉換函式提供排程執行時間，可用作等冪性標記，或基於日期範圍之輸入的固定參考點。若要停用 (即暫停) 或重新啟用排程，請在 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API 的 `state` 參數中分別使用 `‘DISABLED’` 或 `‘ENABLED’`。

如需有關特徵處理器的資訊，請參閱[特徵處理器 SDK 資料來源](feature-store-feature-processor-data-sources-sdk.md)。

## 事件型執行
<a name="feature-store-feature-processor-schedule-pipeline-event-based"></a>

特徵處理管道可以設定為當 AWS 事件發生時自動執行。特徵處理 SDK 提供一個 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) 函式，用於接受來源事件和目標管道的清單。來源事件必須是指定了管道與[執行狀態](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus)事件之 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) 的執行個體。

`put_trigger` 函數會設定 Amazon EventBridge 規則和目標來路由事件，並可讓您指定 EventBridge 事件模式來回應任何 AWS 事件。如需有關這些概念的資訊，請參閱 Amazon EventBridge [規則](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html)、[目標](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html)和[事件模式](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html)。

可以啟用或停用觸發程式。EventBridge 將使用 `put_trigger` API 的 `role_arn` 參數中提供的角色來啟動目標管道執行。如果在 Amazon SageMaker Studio Classic 或筆記本環境中使用 SDK，則依預設會使用執行角色。如需有關如何取得執行角色的資訊，請參閱[取得您的執行角色](sagemaker-roles.md#sagemaker-roles-get-execution-role)。

下面的範例會設定以下內容：
+ 使用 `to_pipeline` API 的 SageMaker AI 管道，該管道接受您的目標管道名稱 (`target-pipeline`) 和轉換函式 (`transform`)。如需有關特徵處理器和轉換函式的資訊，請參閱[特徵處理器 SDK 資料來源](feature-store-feature-processor-data-sources-sdk.md)。
+ 使用 `put_trigger` API 的觸發程式，該觸發程式接受事件的 `FeatureProcessorPipelineEvent` 和您的目標管道名稱 (`target-pipeline`)。

  `FeatureProcessorPipelineEvent` 定義來源管道 (`source-pipeline`) 狀態變成 `Succeeded` 時的觸發程式。如需有關特徵處理器管道事件函式的資訊，請參閱 Feature Store 閱讀文件中的 [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent)。

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

有關使用事件型觸發程式為特徵處理器管道建立持續執行和自動重試的範例，請參閱[使用事件型觸發程式連續執行和自動重試](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries)。

如需使用事件型觸發程式建立連續*串流*和自動重試的範例，請參閱[串流自訂資料來源範例](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming)。

# 監控 Amazon SageMaker Feature Store 特徵處理器管道
<a name="feature-store-feature-processor-monitor-pipeline"></a>

AWS 提供監控工具來即時監看 Amazon SageMaker AI 資源和應用程式、在發生問題時回報，以及適時採取自動動作。Feature Store 特徵處理器管道是 Pipelines，因此可以使用標準監視機制和整合。您可以透過 Amazon CloudWatch 指標和 Amazon EventBridge 事件監控執行失敗等操作指標。

如需有關如何監控和操作特徵商店特徵處理器的更多資訊，請參閱下列資源：
+ [監控 Amazon SageMaker AI 中的 AWS 資源](monitoring-overview.md) – 監控與稽核 SageMaker AI 資源活動的一般指引。
+ [SageMaker 管道指標](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines) – 由 Pipelines 發出的 CloudWatch 指標。
+ [SageMaker 管道執行狀態變更](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline) – 針對 Pipelines 和執行發出的 EventBridge 事件。
+ [針對 Amazon SageMaker Pipelines 進行疑難排解](pipelines-troubleshooting.md) – Pipelines 的一般偵錯和疑難排解提示。

特徵商店特徵處理器執行日誌可在 `/aws/sagemaker/TrainingJobs` 日誌群組下的 Amazon CloudWatch Logs 中找到，您可以按照查閱慣例找到執行日誌串流。對於透過直接調用 `@feature_processor` 裝飾函式建立的執行，您可以在本地執行環境的主控台中找到日誌。對於 ` @remote` 裝飾的執行，CloudWatch Logs 串流名稱中會包含函式的名稱和執行時間戳記。對於特徵處理器管道執行，該步驟的 CloudWatch Logs 串流包含 `feature-processor` 字串和管道執行 ID。

在 Amazon SageMaker Studio Classic 中，可在 Feature Store UI 中找到特定特徵群組的 Feature Store 特徵處理器管道和最近的執行狀態。與特徵理器管道相關的特徵群組會作為輸入或輸出顯示在使用者介面中。此外，歷程視圖可提供上游執行的上下文，例如產生特徵處理器管道和資料來源的資料，以便進一步偵錯。如需使用 Studio Classic 進行歷程檢視的詳細資訊，請參閱[從主控台檢視歷程](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)。

# IAM 許可和執行角色
<a name="feature-store-feature-processor-iam-permissions"></a>

若要使用 Amazon SageMaker Python SDK，需要與 互動的許可 AWS 服務。完整特徵處理器功能需要下列政策。您可以連接連接至 IAM 角色的 [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html) 和 [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS 受管政策。如需有關連接政策至 IAM 角色的更多資訊，請參閱[將政策新增至您的 IAM 角色](feature-store-adding-policies.md)。請參見以下範例，了解詳細資訊。

套用此政策之角色的信任政策必須允許 “scheduler.amazonaws.com”、“sagemaker.amazonaws.com” 和 “glue.amazonaws.com” 原則。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# 特徵處理器限制、上限和配額
<a name="feature-store-feature-processor-quotas"></a>

Amazon SageMaker Feature Store 特徵處理依賴 SageMaker AI 機器學習 (ML) 歷程追蹤。特徵商店特徵處理器使用歷程內容來表示和追蹤特徵處理管道和管道版本。每個 Feature Store 特徵處理器至少會消耗兩個歷程內容 (一個用於特徵處理管道，另一個用於版本)。如果特徵處理管道的輸入或輸出資料來源發生變更，則會建立其他歷程內容。您可以聯絡 AWS 支援提高限制，以更新 SageMaker AI ML 歷程限制。特徵商店特徵處理器使用的資源的預設上限如下。如需 SageMaker AI ML 歷程追蹤的相關資訊，請參閱 [Amazon SageMaker 機器學習 (ML) 歷程追蹤](lineage-tracking.md)。

如需 SageMaker AI 配額的詳細資訊，請參閱 [Amazon SageMaker AI 端點和配額](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html)。

每個區域的歷程上限
+ 內容 – 500 (軟性限制)
+ 成品 – 6,000 (軟性限制)
+ 關聯 – 6,000 (軟性限制)

每個區域的訓練上限
+ 訓練任務的最長執行期 – 432,000 秒
+ 每個訓練任務的執行個體數量上限 – 20
+ 目前區域中帳戶的每秒可進行 `CreateTrainingJob` 請求的數量上限 – 1 TPS
+ 保持有效期以進行叢集重複使用 – 3,600 秒

每個區域的管道和並行管道執行數目上限
+ 每個帳戶允許的管道數量上限 – 500
+ 每個帳戶允許的並行管道執行數量上限 – 20
+ 管道執行逾時的時間 – 672 小時

# 資料來源
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store 特徵處理支援多個資料來源。適用於 Python 的特徵處理器 SDK (Boto3) 提供可從 Amazon S3 已儲存的特徵群組或物件載入資料的建構模組。此外，您可以編寫自訂資料來源，以便從其他資料來源載入資料。有關 Feature Store 提供的資料來源的資訊，請參閱[特徵處理器資料來源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

**Topics**
+ [特徵處理器 SDK 資料來源](feature-store-feature-processor-data-sources-sdk.md)
+ [自訂資料來源](feature-store-feature-processor-data-sources-custom.md)
+ [自訂資料來源範例](feature-store-feature-processor-data-sources-custom-examples.md)

# 特徵處理器 SDK 資料來源
<a name="feature-store-feature-processor-data-sources-sdk"></a>

適用於 Python 的 Amazon SageMaker Feature Store 特徵處理器 SDK (Boto3) 提供可從 Amazon S3 中儲存的特徵群組或物件載入資料的建構模組。有關特徵商店提供的資料來源定義的完整清單，請參閱[特徵處理器資料來源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

如需有關如何使用特徵商店 Python SDK 資料來源定義的範例，請參閱[常見使用案例的特徵處理程式碼範例](feature-store-feature-processor-examples.md)。

## 特徵群組資料來源
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` 用於將特徵群組指定為特徵處理器的輸入資料來源。可以從離線儲存特徵群組載入資料。嘗試從線上儲存特徵群組載入資料會導致驗證錯誤。您可以指定開始偏移和結束偏移，將載入的資料限制在特定時間範圍內。例如，您可以指定 '14 天' 的開始移位，以僅載入過去兩週的資料，還可以指定結束移為為 '7 天'，將輸入限制為上一週的資料。

## 特徵商店提供的資料來源定義
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

特徵商店 Python SDK 包含可用於為特徵處理器指定各種輸入資料來源的資料來源定義。其中包含 CSV、Parquet 和 Iceberg 表來源。有關 Feature Store 提供的資料來源定義的完整清單，請參閱[特徵處理器資料來源 Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py)。

# 自訂資料來源
<a name="feature-store-feature-processor-data-sources-custom"></a>

在此頁面上，我們將描述如何建立自訂資料來源類別，並顯示一些使用範例。使用自訂資料來源時，您可以使用 SageMaker AI SDK for Python (Boto3) 提供的 API，方式與使用 Amazon SageMaker Feature Store 提供的資料來源相同。

若要使用自訂資料來源來使用特徵處理將資料轉換並擷取至特徵群組，您需要使用以下類別成員和函式來擴充 `PySparkDataSource` 類別。
+ `data_source_name` (字串)：資料來源的任意名稱。例如，Amazon Redshift，Snowflake，或 Glue 目錄 ARN。
+ `data_source_unique_id` (str)：一個唯一識別碼，指被存取的特定資源。例如，資料表名稱，DDB 資料表 ARN，Amazon S3 字首。自訂資料來源中的所有相同 `data_source_unique_id` 的用法都會與歷程檢視中的相同資料來源相關聯。歷程包含有關特徵處理工作流程的執行程式碼、使用的資料來源以及如何將其擷取特徵群組或特徵的資訊。如需在 **Studio** 中檢視特徵群組歷程的資訊，請參閱[從主控台檢視歷程](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)。
+ `read_data` (函式)：用於使用特徵處理器連接的方法。返回 Spark 資料框架。如需範例，請參閱 [自訂資料來源範例](feature-store-feature-processor-data-sources-custom-examples.md)。

`data_source_name` 和 `data_source_unique_id` 都可用來唯一識別碼歷程實體。以下是名為 `CustomDataSource` 的自訂資料來源類別的範例。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

本節提供特徵處理器的自訂資料來源實作範例。如需有關自訂資料來源的更多資訊，請參閱[自訂資料來源](feature-store-feature-processor-data-sources-custom.md)。

安全性是 AWS 與 客戶共同的責任。 AWS 負責保護在 中執行服務的基礎設施 AWS 雲端。客戶必須負責所有必要的安全組態和管理任務。例如，機密 (例如資料存放區的存取登入資料) 不應在您的自訂資料來源中進行硬式編碼。您可以使用 AWS Secrets Manager 來管理這些登入資料。如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的[什麼是 AWS Secrets Manager？](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)。以下範例會使用 Secrets Manager 做為您的憑證。

**Topics**
+ [Amazon Redshift 叢集 (JDBC) 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Snowflake 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Databricks (JDBC) 自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [串流自訂資料來源範例](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift 叢集 (JDBC) 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift 提供了一個 JDBC 驅動程式，可用於使用 Spark 讀取資料。如需有關如何下載 Amazon Redshift JDBC 驅動程式的資訊，請參閱[下載 Amazon Redshift JDBC 驅動程式 2.1 版](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)。

若要建立自訂的 Amazon Redshift 資料來源類別，您將需要覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

若要與 Amazon Redshift 叢集連線，您需要：
+ Amazon Redshift JDBC URL (`jdbc-url`)

  如需有關如何取得 Amazon Redshift JDBC URL 的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[取得 JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html)。
+ Amazon Redshift 使用者名稱 (`redshift-user`) 和密碼 (`redshift-password`)

  如需有關如何使用 Amazon Redshift SQL 命令建立和管理資料庫使用者的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[使用者](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html)。
+ Amazon Redshift 資料表名稱 (`redshift-table-name`)

  如需有關如何使用範例建立資料表的資訊，請參閱 Amazon Redshift 資料庫開發人員指南中的[建立資料表](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html)。
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`secret-redshift-account-info`)，您將在其中儲存 Secrets Manager 上的 Amazon Redshift 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`your-region`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 JDBC URL 和個人存取權杖，並覆寫自訂資料來源類別 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

以下範例示範如何將 `RedshiftDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供 jdbc 驅動程式。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake 提供了一個 Spark 連接器，可用於您的 `feature_processor` 裝飾器。如需有關 Spark 的 Snowflake 連接器的資訊，請參閱 Snowflake 文件中的 [Spark 的 Snowflake 連接器](https://docs.snowflake.com/en/user-guide/spark-connector)。

若要建立自訂 Snowflake 資料來源類別，您必須覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，並將 Spark 連接器套件新增至 Spark 類別路徑。

若要與 Snowflake 資料來源連線，您需要：
+ Snowflake URL (`sf-url`)

  如需有關如何存取 Snowflake Web 介面之 URL 的資訊，請參閱 Snowflake 文件中的[帳戶識別符](https://docs.snowflake.com/en/user-guide/admin-account-identifier)。
+ Snowflake 資料庫 (`sf-database`) 

  如需有關如何使用 Snowflake 取得資料庫名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database)。
+ Snowflake 資料庫結構描述 (`sf-schema`) 

  如需有關如何使用 Snowflake 取得你的結構描述名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema)。
+ Snowflake 倉儲 (`sf-warehouse`)

  如需有關如何使用 Snowflake 取得倉儲名稱的資訊，請參閱 Snowflake 文件中的 [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse)。
+ Snowflake 資料表名稱 (`sf-table-name`)
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`secret-snowflake-account-info`)，您將在其中儲存 Secrets Manager 上的 Snowflake 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`your-region`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 Snowflake 使用者名稱和密碼，並覆寫自訂資料來源類別 `SnowflakeDataSource` 的 `read_data` 函式。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

以下範例示範如何將 `SnowflakeDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供套件。在下面的範例中的 Spark 套件是這樣的：`spark-snowflake_2.12` 是特徵處理器 Scala 版本，`2.12.0` 是您希望使用的 Snowflake 版本，`spark_3.3` 是特徵處理器 Spark 版本。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) 自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark 可以透過使用 Databricks JDBC 驅動程式從 Databricks 讀取資料。如需有關 Databricks JDBC 驅動程式的更多資訊，請參閱 Databricks 文件中的[設定 Databricks ODBC 和 JDBC 驅動程式](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers)。

**注意**  
您可以透過包括 Spark 類路徑相應的 JDBC 驅動程式從任何其他資料庫讀取資料。如需詳細資訊，請參閱 Spark SQL 指南中的 [JDBC 至其他資料庫](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)。

若要建立自訂 Databricks 資料來源類別，您必須覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法，並將 JDBC jar 新增至 Spark classpath。

若要與 Databricks 資料來源連線，您需要：
+ Databricks URL (`databricks-url`)

  如需有關 Databricks URL 的更多資訊，請參閱 Databricks 文件中的[建置 Databricks 驅動程式的連線 URL](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver)。
+ Databricks 個人存取權杖 (`personal-access-token`)

  如需有關 Databricks 存取權杖的更多資訊，請參閱 Databricks 文件中的 [Databricks 個人存取權杖驗證](https://docs.databricks.com/en/dev-tools/auth.html#pat)。
+ 資料型錄名稱 (`db-catalog`) 

  如需有關 Databricks 目錄名稱的資訊，請參閱 Databricks 文件中的[目錄名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name)。
+ 結構描述名稱 (`db-schema`)

  如需有關 Databricks 結構描述名稱的資訊，請參閱 Databricks 文件中的[結構描述名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)。
+ 資料表名稱 (`db-table-name`)

  如需有關 Databricks 資料表名稱的資訊，請參閱 Databricks 文件中的[資料表名稱](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name)。
+ (可選) 如果使用 Secrets Manager，則需要機密名稱 (`secret-databricks-account-info`)，您將在其中儲存 Secrets Manager 上的 Databricks 存取使用者名稱和密碼。

  如需 Secrets Manager 的資訊，請參閱 AWS Secrets Manager 《 使用者指南》中的在 [中尋找秘密 AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html)。
+ AWS 區域 (`your-region`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

以下範例會示範如何從 Secrets Manager 擷取 JDBC URL 和個人存取權杖，並覆寫自訂資料來源類別 `DatabricksDataSource` 的 `read_data`。

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

下面的範例說明如何上傳 JDBC 驅動程式 jar 檔案`jdbc-jar-file-name.jar` 至 Amazon S3，以將其新增到 Spark classpath。如需有關從 Databricks 下載 Spark JDBC 驅動程式 (`jdbc-jar-file-name.jar`) 的更多資訊，請參閱 Databricks 網站中的[下載 JDBC 驅動程式](https://www.databricks.com/spark/jdbc-drivers-download)。

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

若要遠端執行特徵處理器工作，您需要透過定義 `SparkConfig` 並將其傳遞給 `@remote` 裝飾器來提供 jar。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## 串流自訂資料來源範例
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

您可以連線至 Amazon Kinesis 等串流資料來源，並使用 Spark 結構化串流撰寫轉換，以便從串流資料來源讀取。如需有關 Kinesis 連接器的資訊，請參閱 GitHub 中的[適用於 Spark 結構化串流的 Kinesis 連接器](https://github.com/roncemer/spark-sql-kinesis)。如需有關 Amazon Kinesis 的更多資訊，請參閱 Amazon Kinesis 開發人員指南中的[什麼是 Amazon Kinesis Data Streams？](https://docs.aws.amazon.com/streams/latest/dev/introduction.html)。

若要建立自訂的 Amazon Kinesis 資料來源類別，您將需要擴展 `BaseDataSource` 類別並覆寫 [自訂資料來源](feature-store-feature-processor-data-sources-custom.md) 中的 `read_data` 方法。

若要連接 Amazon Kinesis Data Streams，您需要：
+ Kinesis ARN (`kinesis-resource-arn`) 

  如需有關 Kinesis 資料串流 ARN 的資訊，請參閱 Amazon Kinesis 開發人員指南中的 [Kinesis Data Streams 的 Amazon Resource Name (ARN)](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format)。
+ Kinesis 資料串流名稱 (`kinesis-stream-name`)
+ AWS 區域 (`your-region`)

  如需有關如何使用適用於 Python 的 SDK (Boto3) 取得目前工作階段的區域名稱的資訊，請參閱 Boto3 文件中的[區域名稱](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)。

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

下面的範例示範了如何將 `KinesisDataSource` 連線至 `feature_processor` 裝飾器。

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

在上面範例程式碼中，我們使用一些 Spark 結構化串流選項，同時將微批次串流傳輸到您的特徵群組中。如需選項的完整清單，請參閱 Apache Spark 文件中的[結構化串流程式設計指南](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)。
+ `foreachBatch` 接收器模式是一項功能，可讓您在串流查詢的每個微批次的輸出資料上套用操作並寫入邏輯。

  如需有關 `foreachBatch` 的資訊，請參閱 Apache Spark 結構化串流程式設計指南中的[使用 Foreach 和 ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch)。
+ `checkpointLocation` 選項會定期儲存串流應用程式的狀態。串流日誌會儲存在檢查點位置 `s3a://checkpoint-path`。

  如需有關 `checkpointLocation` 選項的資訊，請參閱 Apache Spark 結構化串流程式設計指南中的[使用檢查點從失敗中復原](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)。
+ `trigger` 設定定義在串流應用程式中觸發微批次處理的頻率。在此範例中，處理時間觸發條件類型會以一分鐘的微批次間隔 (由 `trigger(processingTime="1 minute")` 指定) 使用。若要從串流來源回填，您可以透過 `trigger(availableNow=True)` 指定的現在可用的觸發條件類型。

  如需 `trigger` 類型的完整清單，請參閱 Apache Spark 結構化串流程式設計指南中的[觸發程式](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers)。

**使用事件型觸發程式連續串流和自動重試**

特徵處理器使用 SageMaker 訓練作為運算基礎設施，其執行期上限為 28 天。您可以使用事件型觸發程式，將連續串流延長一段時間，並從暫時性失敗中復原。如需有關排程和事件型執行的更多資訊，請參閱[以排程和事件為基礎執行特徵處理器管道](feature-store-feature-processor-schedule-pipeline.md)。

以下是設定事件型觸發程式以保持串流特徵處理器管道持續執行的範例。這會使用前面範例中定義的串流轉換函式。您可以將目標管道配置為在來源管道執行發生 `STOPPED` 或 `FAILED` 事件時觸發。請注意，使用相同的管道作為來源和目標，以便持續執行。

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# 常見使用案例的特徵處理程式碼範例
<a name="feature-store-feature-processor-examples"></a>

以下的範例提供常見使用案例下的特徵處理程式碼範例。如需展示特定使用案例的更詳細範例筆記本，請參閱 [Amazon SageMaker Feature Store 特徵處理筆記本](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb)。

在以下範例中，`us-east-1` 是資源的區域，`111122223333` 是資源擁有者帳戶 ID，`your-feature-group-name` 是特徵群組名稱。

以下範例中使用的 `transactions` 資料集具有下列結構描述：

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [聯結多個資料來源的資料](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [滑動時段彙總](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [輪轉時段匯總](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [從離線儲存提升到線上儲存](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [使用 Pandas library 程式庫進行轉換](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [使用事件型觸發程式連續執行和自動重試](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## 聯結多個資料來源的資料
<a name="feature-store-feature-processor-examples-joining-multiple-sources"></a>

```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## 滑動時段彙總
<a name="feature-store-feature-processor-examples-sliding-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## 輪轉時段匯總
<a name="feature-store-feature-processor-examples-tumbling-window-aggregates"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## 從離線儲存提升到線上儲存
<a name="feature-store-feature-processor-examples-promotion-offline-to-online-store"></a>

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## 使用 Pandas library 程式庫進行轉換
<a name="feature-store-feature-processor-examples-transforms-with-pandas-library"></a>

**使用 Pandas library 程式庫進行轉換**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## 使用事件型觸發程式連續執行和自動重試
<a name="feature-store-feature-processor-examples-continuous-execution-automatic-retries"></a>

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```