

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon SageMaker 功能儲存 Spark 進行 Batch 擷取
<a name="batch-ingestion-spark-connector-setup"></a>

Amazon SageMaker 功能儲存 Spark 是 Spark 資料庫連接到功能儲存的 Spark 連接器。特徵商店 Spark 簡化了從 Spark `DataFrame` 到特徵群組的資料擷取作業。Feature Store 使用您現有的 ETL 管道、Amazon EMR、GIS、 AWS Glue 任務、Amazon SageMaker Processing 任務或 SageMaker 筆記本支援 Spark 的批次資料擷取。

為 Python 和 Scala 開發人員提供了用於安裝和實施批次資料擷取的方法。Python 開發人員可以使用開放原始碼 `sagemaker-feature-store-pyspark` Python 程式庫進行本機開發、安裝在Amazon EMR 上，以及 Jupyter 筆記本，方法是遵循 [Amazon SageMaker Feature Store Spark GitHub 儲存](https://github.com/aws/sagemaker-feature-store-spark)庫中的指示。斯卡拉開發人員可以使用功能儲存Spark連接器可在 [Amazon SageMaker Feature Store Spark SDK Maven central 儲存庫](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)。

您可以使用 Spark 連接器以下列方式擷取資料，視線上儲存、離線儲存或兩者是否啟用而定。

1. 預設情況下擷取 — 如果啟用了線上儲存，Spark 連接器會先使用 [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API 將您的資料框擷取至線上儲存。只有活動時間最長的記錄保留在線儲存中。如果已啟用離線存放區，則功能儲存會在 15 分鐘內將您的資料框擷取至離線存放區。如需線上和離線儲存運作方式的詳細資訊，請參閱[功能儲存概念](feature-store-concepts.md)。

   您可以通過不在`.ingest_data(...)`方法中指定`target_stores`來完成此操作。

1. 離線儲存區直接擷取 — 如果啟用離線存放區，Spark 連接器批次會將您的資料框直接擷取至離線存放區。將資料框直接導入離線儲存並不會更新線上儲存。

   您可以通過在`.ingest_data(...)`方法中設置 `target_stores=["OfflineStore"]` 來完成此操作。

1. 僅限線上儲存 — 如果啟用了線上儲存，Spark 連接器會使用 [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API 將您的資料框擷取至線上儲存。將資料框直接導入線上儲存並不會更新離線儲存。

   您可以通過在`.ingest_data(...)`方法中設置 `target_stores=["OnlineStore"]` 來完成此操作。

如需不同擷取方法的詳細資訊，請參閱[實作範例](#batch-ingestion-spark-connector-example-implementations)。

**Topics**
+ [功能儲存 Spark 安裝](#batch-ingestion-spark-connector-installation)
+ [檢索功能儲存 Spark 的 JAR](#retrieve-jar-spark-connector)
+ [實作範例](#batch-ingestion-spark-connector-example-implementations)

## 功能儲存 Spark 安裝
<a name="batch-ingestion-spark-connector-installation"></a>

 **Scala 使用者** 

特徵商店 Spark SDK 開放提供 Scala 使用者運用，請前往所在位置 [Amazon SageMaker Feature Store Spark SDK Maven central 儲存庫](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)。

 ****需求**** 
+ Spark >= 3.0.0 和 <= 3.3.0
+ `iceberg-spark-runtime` >= 0.14.0
+ Scala >= 2.12.x  
+  Amazon EMR > = 6.1.0 (僅當您使用的是 Amazon EMR) 

 **在 POM.xml 中聲明相依性關係** 

特徵商店 Spark 連接器具有 `iceberg-spark-runtime` 程式庫的相依性。因此，如果您要將資料擷取到已使用 Iceberg 資料表格式自動建立的功能群組中，則必須將相應版本的 `iceberg-spark-runtime`程式庫新增至相依性。例如，如果您使用的是 Spark 3.1，則必須在您的項目中聲明以下內容`POM.xml`：

```
 <dependency>
 <groupId>software.amazon.sagemaker.featurestore</groupId>
 <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
 <version>1.0.0</version>
 </dependency>
 
 <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
   <version>0.14.0</version>
</dependency>
```

 **Python 使用者** 

特徵商店 Spark SDK 可在開放原始碼的 [Amazon SageMaker Feature Store Spark GitHub 儲存庫](https://github.com/aws/sagemaker-feature-store-spark)中使用。

 ****需求**** 
+ Spark >= 3.0.0 和 <= 3.3.0
+ Amazon EMR > = 6.1.0 (僅當您使用的是 Amazon EMR) 
+ 核心 = `conda_python3`

我們建議將 `$SPARK_HOME` 設定為 Spark 安裝目錄。在安裝期間，特徵商店會將所需的 JAR 上傳至 `SPARK_HOME`，以便自動載入相依性。Spark啟動一個 JVM 需要使這個 PySpark 庫的工作。

 **本機安裝** 

若要尋找有關安裝的詳細資訊，請在以下安裝中新增 `--verbose` 以啟用詳細模式。

```
pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

 **在 Amazon EMR 上安裝** 

使用 6.1.0 或更新版本建立 Amazon EMR 叢集。啟用 SSH 以協助您疑難排解任何問題。

您可以執行下列操作之一來安裝該資料庫：
+ 在 Amazon EMR 中建立自訂步驟。
+ 使用 SSH Connect 至叢集，然後從該處安裝資料庫。

**注意**  
下列資訊使用 Spark 3.1 版，但您可以指定符合需求的任何版本。

```
export SPARK_HOME=/usr/lib/spark
sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

**注意**  
如果您想要將相依 JAR 自動安裝到 SPARK\$1HOME，請勿使用啟動程序步驟。

 **在 SageMaker 筆記本執行個體上安裝** 

使用下列指令安裝與 Spark 連接器相容的 PySpark 版本：

```
!pip3 install pyspark==3.1.1 
!pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

如果您要對離線存放區執行批次擷取，則相依性不在筆記本執行個體環境中。

```
from pyspark.sql import SparkSession
import feature_store_pyspark

extra_jars = ",".join(feature_store_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.jars", extra_jars) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
    .getOrCreate()
```

 **使用 GIS 在筆記本上安裝** 

**重要**  
您必須使用 2 AWS Glue .0 版或更新版本。

使用下列資訊以協助您在 AWS Glue 互動式工作階段 (GIS) 中安裝 PySpark 連接器。

Amazon SageMaker 功能存放區 Spark 需要在工作階段初始化期間使用特定的 Spark 連接器 JAR，才能上傳到您的 Amazon S3 儲存貯體。如需將影片上傳至 S3 來源儲存貯體的詳細資訊，請參閱[檢索功能儲存 Spark 的 JAR](#retrieve-jar-spark-connector)。

上傳 JAR 之後，您必須使用以下命令提供 JAR 的 GIS 工作階段。

```
%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
```

若要在 AWS Glue 執行時間安裝 Feature Store Spark，請使用 GIS 筆記本中的`%additional_python_modules`魔術命令。 會 AWS Glue 執行`pip`至您在 下指定的模組`%additional_python_modules`。

```
%additional_python_modules sagemaker-feature-store-pyspark-3.1
```

開始 AWS Glue 工作階段之前，您必須使用上述兩個魔術命令。

 **在 AWS Glue 任務上安裝** 

**重要**  
您必須使用 2 AWS Glue .0 版或更新版本。

若要在 AWS Glue 任務上安裝 Spark 連接器，請使用 `--extra-jars`引數提供必要的 JARs`--additional-python-modules`，並在建立任務時將 Spark 連接器安裝為 AWS Glue 任務參數，如下列範例所示。如需將影片上傳至 S3 來源儲存貯體的詳細資訊，請參閱[檢索功能儲存 Spark 的 JAR](#retrieve-jar-spark-connector)。

```
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
    Name=pipeline_id,
    Description='Feature Store Compute Job',
    Role=glue_role_arn,
    ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location_uri,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--TempDir': temp_dir_location_uri,
        '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
        '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
        ...
    },
    MaxRetries=3,
    NumberOfWorkers=149,
    Timeout=2880,
    GlueVersion='3.0',
    WorkerType='G.2X'
)
```

 **Amazon SageMaker Processing 作業上的安裝** 

若要將功能儲存 Spark 與 Amazon SageMaker 處理任務搭配使用，請攜帶您自己的影像。有關使用映像的更多資訊，請參閱[Amazon SageMaker Studio Classic 中的自訂映像](studio-byoi.md)。將安裝步驟新增到 Docker 文件中。將泊塢視窗映像推送至 Amazon ECR 儲存庫之後，您可以使用 PySpark 處理器來建立處理任務。如需使用 PySpark 處理器建立處理任務的詳細資訊，請參閱[使用 Apache Spark 執行處理任務](use-spark-processing-container.md)。

以下是將安裝步驟新增至 Dockerfile 的範例。

```
FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0

RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

## 檢索功能儲存 Spark 的 JAR
<a name="retrieve-jar-spark-connector"></a>

要檢索特徵商店 Spark 相依性 JAR，您必須使用`pip`在任何 Python 環境與網路存取的 Python 套件索引 (PyPI) 儲存庫安裝 Spark 連接器。SageMaker Jupyter 筆記本是具有網路存取權的 Python 環境範例。

下列指令會安裝 Spark 連接器。

```
!pip install sagemaker-feature-store-pyspark-3.1      
```

安裝特徵商店 Spark 之後，您可以擷取 JAR 位置，並將 JAR 上傳到 Amazon S3。

該`feature-store-pyspark-dependency-jars`命令提供了特徵商店 Spark 新增的必要相依性 JAR 的位置。您可以使用命令擷取 JAR，然後將它上傳至 Amazon S3。

```
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]

s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
```

## 實作範例
<a name="batch-ingestion-spark-connector-example-implementations"></a>

------
#### [ Example Python script ]

 *FeatureStoreBatchIngestion.py* 

```
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark

spark = SparkSession.builder \
                    .getOrCreate()

# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]

df = spark.createDataFrame(data).toDF(*columns)

# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
 
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)

feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"

# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)

# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])

# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
```

 **使用 Python 指令碼範例提交 Spark 工作** 

PySpark 版本需要一個額外的依賴 JAR 進行匯入，因此需要額外的步驟來執行 Spark 應用程式。

如果您沒有在安裝過程中指定`SPARK_HOME`，則在執行 `spark-submit` 時必須在 JVM 中加載所需的 JAR。`feature-store-pyspark-dependency-jars` 是由 Spark 程式庫安裝的 Python 指令碼，以自動為您擷取所有 JAR 的路徑。

```
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
```

如果您在 Amazon EMR 上執行此應用程式，建議您以用戶端模式執行應用程式，這樣您就不需要將相依 JAR 散佈到其他任務節點。在Amazon EMR 群集中新增一個步驟，使用類似於以下的 Spark 引數：

```
spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
```

------
#### [ Example Scala script ]

 *功能庫批處理. 斯卡拉* 

```
import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TestSparkApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()

    // Construct test DataFrame
    val data = List(
      Row("1", "2021-07-01T12:20:12Z"),
      Row("2", "2021-07-02T12:20:13Z"),
      Row("3", "2021-07-03T12:20:14Z")
    )
    
    val schema = StructType(
      List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    
    // Initialize FeatureStoreManager with a role arn if your feature group is created by another account
    val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
    
    // Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df)

    val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"
   
    // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
    // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
    featureStoreManager.ingestData(df, featureGroupArn)
    
    // To select the target stores for ingestion, you can specify the target store as the paramter
    // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
    featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore"))
    
    // If only OfflineStore is selected, the connector will batch write the data to offline store directly
    featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"])
    
    // To retrieve the records failed to be ingested by spark connector
    val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame()
  }
}
```

 **提交一個 Spark 工作** 

 **Scala** 

您應該能夠使用功能儲存 Spark 作為正常依賴關係。在所有平台上執行應用程式不需要額外的指令。

------