

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon SageMaker Feature Store Spark を使用したバッチ取り込み
<a name="batch-ingestion-spark-connector-setup"></a>

Amazon SageMaker Feature Store Spark は、Spark ライブラリを Feature Store に接続する Spark コネクタです。Feature Store Spark は、Spark `DataFrame` から特徴量グループへのデータインジェストを簡素化します。Feature Store は、既存の ETL パイプライン、Amazon EMR、GIS、 AWS Glue ジョブ、Amazon SageMaker Processing ジョブ、または SageMaker ノートブックでの Spark を使用したバッチデータの取り込みをサポートします。

Python と Scala デベロッパー向けに、バッチデータインジェストをインストールして実装するためのメソッドが用意されています。Python デベロッパーは、「[Amazon SageMaker Feature Store Spark GitHub リポジトリ](https://github.com/aws/sagemaker-feature-store-spark)」の指示に従って、ローカル開発、Amazon EMR へのインストール、Jupyter Notebook にオープンソースの `sagemaker-feature-store-pyspark` Python ライブラリを使用できます。Scala デベロッパーは、「[Amazon SageMaker Feature Store Spark SDK Maven 中央リポジトリ](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)」にある Feature Store Spark コネクタを使用できます。

Spark コネクタを使用して、オンラインストア、オフラインストア、あるいはその両方が有効になっているかどうかに応じて、次の方法でデータを取り込むことができます。

1. デフォルトで取り込む — オンラインストアが有効になっている場合、Spark コネクタは最初に [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API を使用してオンラインストアにデータフレームを取り込みます。イベント時間が最も長いレコードのみがオンラインストアに残ります。オフラインストアが有効になっている場合、Feature Store は 15 分以内にデータフレームをオフラインストアに取り込みます。オンラインストアとオフラインストアの仕組みの詳細については、「[Feature Store の概念](feature-store-concepts.md)」を参照してください。

   これを実行するには `.ingest_data(...)` メソッドで `target_stores` を指定しないようにします。

1. オフラインストアに直接取り込む — オフラインストアが有効になっている場合、Spark コネクタはデータフレームをオフラインストアに直接バッチ取り込みします。データフレームをオフラインストアに直接取り込んでも、オンラインストアは更新されません。

   これを実行するには `.ingest_data(...)` メソッドで `target_stores=["OfflineStore"]` を指定します。

1. オフラインストアのみに取り込む — オンラインストアが有効になっている場合、Spark コネクタは [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API を使用してオンラインストアにデータフレームを取り込みます。データフレームをオンラインストアに直接取り込んでも、オフラインストアは更新されません。

   これを実行するには `.ingest_data(...)` メソッドで `target_stores=["OnlineStore"]` を指定します。

さまざまな取り込み方法の詳細については、「[実装例](#batch-ingestion-spark-connector-example-implementations)」を参照してください。

**Topics**
+ [

## Feature Store Spark のインストール
](#batch-ingestion-spark-connector-installation)
+ [

## Feature Store Spark の JAR の取得
](#retrieve-jar-spark-connector)
+ [

## 実装例
](#batch-ingestion-spark-connector-example-implementations)

## Feature Store Spark のインストール
<a name="batch-ingestion-spark-connector-installation"></a>

 **Scala ユーザー** 

「[Amazon SageMaker Feature Store Spark SDK Maven 中央リポジトリ](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)」にある Feature Store Spark SDK を使用できます。

 ****要件**** 
+ Spark >= 3.0.0 と <= 3.3.0
+ `iceberg-spark-runtime` >= 0.14.0
+ Scala >= 2.12.x  
+  Amazon EMR >= 6.1.0 (Amazon EMR を使用している場合のみ) 

 **POM.xml で依存関係を宣言する** 

Feature Store Spark コネクタは `iceberg-spark-runtime` ライブラリに依存しています。そのため、Iceberg テーブル形式で自動作成した特徴量グループにデータを取り込む場合は、`iceberg-spark-runtime` ライブラリの対応するバージョンを依存関係に追加する必要があります。例えば、Spark 3.1 を使用している場合は、プロジェクトの `POM.xml` で以下を宣言する必要があります。

```
 <dependency>
 <groupId>software.amazon.sagemaker.featurestore</groupId>
 <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
 <version>1.0.0</version>
 </dependency>
 
 <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
   <version>0.14.0</version>
</dependency>
```

 **Python ユーザー** 

「[Amazon SageMaker Feature Store Spark GitHub リポジトリ](https://github.com/aws/sagemaker-feature-store-spark)」にある Feature Store Spark SDK を使用できます。

 ****要件**** 
+ Spark >= 3.0.0 と <= 3.3.0
+ Amazon EMR >= 6.1.0 (Amazon EMR を使用している場合のみ) 
+ カーネル = `conda_python3`

Spark がインストールされているディレクトリに `$SPARK_HOME` を設定することをお勧めします。インストール中、Feature Store は必要な JAR を `SPARK_HOME` にアップロードし、依存関係が自動的に読み込まれるようにします。この PySpark ライブラリを動作させるには、JVM を起動する Spark が必要です。

 **ローカルインストール** 

インストールの詳細情報を確認するには、次のインストールコマンドに `--verbose` を追加して詳細モード有効にします。

```
pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

 **Amazon EMR へのインストール** 

リリースバージョン 6.1.0 以降で Amazon EMR クラスターを作成します。SSH を有効にすると、問題のトラブルシューティングに役立ちます。

次のどちらかを実行するとライブラリをインストールできます。
+ Amazon EMR 内でカスタムステップを作成します。
+ SSH を使用してクラスターに接続し、そこからライブラリをインストールします。

**注記**  
以下の情報は Spark バージョン 3.1 を使用していますが、要件を満たす任意のバージョンを指定できます。

```
export SPARK_HOME=/usr/lib/spark
sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

**注記**  
依存する jar を SPARK\$1HOME に自動インストールする場合は、ブートストラップステップを使用しないでください。

 **SageMaker ノートブックインスタンスへのインストール** 

以下のコマンドを使用して、Spark コネクタと互換性のある PySpark のバージョンをインストールします。

```
!pip3 install pyspark==3.1.1 
!pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

オフラインストアへのバッチ取り込みを実行する場合、ノートブックインスタンス環境内には依存関係はありません。

```
from pyspark.sql import SparkSession
import feature_store_pyspark

extra_jars = ",".join(feature_store_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.jars", extra_jars) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
    .getOrCreate()
```

 **GIS を使用したノートブックへのインストール** 

**重要**  
 AWS Glue バージョン 2.0 以降を使用する必要があります。

PySpark コネクタを AWS Glue インタラクティブセッション (GIS) でインストールする際に役立つ次の情報を参考にしてください。

Amazon SageMaker Feature Store Spark では、セッションの初期化中に特定の Spark コネクタの JAR を Amazon S3 バケットにアップロードする必要があります。必要な JAR を S3 バケットにアップロードする方法の詳細については、「[Feature Store Spark の JAR の取得](#retrieve-jar-spark-connector)」を参照してください。

JAR をアップロードしたら、次のコマンドを使用して GIS セッションに JAR を提供する必要があります。

```
%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
```

Feature Store Spark を AWS Glue ランタイムにインストールするには、GIS notebook. AWS Glue runs 内の`%additional_python_modules`マジックコマンドを使用して、 で指定したモジュール`pip`に実行します`%additional_python_modules`。

```
%additional_python_modules sagemaker-feature-store-pyspark-3.1
```

 AWS Glue セッションを開始する前に、前述の両方のマジックコマンドを使用する必要があります。

 ** AWS Glue ジョブへのインストール** 

**重要**  
 AWS Glue バージョン 2.0 以降を使用する必要があります。

 AWS Glue ジョブに Spark コネクタをインストールするには、次の例に示すように、 `--extra-jars`引数を使用して必要な JARsを指定し、ジョブを作成するときに AWS Glue ジョブパラメータとして Spark Connector `--additional-python-modules`をインストールします。必要な JAR を S3 バケットにアップロードする方法の詳細については、「[Feature Store Spark の JAR の取得](#retrieve-jar-spark-connector)」を参照してください。

```
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
    Name=pipeline_id,
    Description='Feature Store Compute Job',
    Role=glue_role_arn,
    ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location_uri,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--TempDir': temp_dir_location_uri,
        '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
        '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
        ...
    },
    MaxRetries=3,
    NumberOfWorkers=149,
    Timeout=2880,
    GlueVersion='3.0',
    WorkerType='G.2X'
)
```

 **Amazon SageMaker Processing ジョブへのインストール** 

Amazon SageMaker Processing ジョブで Feature Store Spark を使用するには、独自のイメージを持ち込みます。独自のイメージの持ち込みの詳細については、「[Amazon SageMaker Studio Classic でのカスタムイメージ](studio-byoi.md)」を参照してください。Dockerfile インストールステップを追加します。Docker イメージを Amazon ECR リポジトリにプッシュしたら、PySparkProcessor を使用して処理ジョブを作成できます。PySparkProcessor を使用して処理ジョブを作成する方法の詳細は、「[Apache Spark を使用して Processing ジョブを実行する](use-spark-processing-container.md)」を参照してください。

以下は、Dockerfile にインストールステップを追加する例です。

```
FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0

RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

## Feature Store Spark の JAR の取得
<a name="retrieve-jar-spark-connector"></a>

Feature Store Spark の依存関係 JAR を取得するには、ネットワークにアクセスできる任意の Python 環境で `pip` を使用して、Python Package Index (PyPI) リポジトリから Spark コネクタをインストールする必要があります。SageMaker Jupyter Notebook は、ネットワークアクセスが可能な Python 環境の一例です。

以下のコマンドでは Spark コネクタをインストールします。

```
!pip install sagemaker-feature-store-pyspark-3.1      
```

Feature Store Spark をインストールしたら、JAR の場所を取得して JAR を Amazon S3 にアップロードすることができます。

この `feature-store-pyspark-dependency-jars` コマンドは、Feature Store Spark が追加した必要な依存関係 JAR の場所を提供します。コマンドを使用して JAR を取得し、Amazon S3 にアップロードできます。

```
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]

s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
```

## 実装例
<a name="batch-ingestion-spark-connector-example-implementations"></a>

------
#### [ Example Python script ]

 *FeatureStoreBatchIngestion.py* 

```
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark

spark = SparkSession.builder \
                    .getOrCreate()

# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]

df = spark.createDataFrame(data).toDF(*columns)

# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
 
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)

feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"

# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)

# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])

# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
```

 **サンプル Python スクリプトを使用して Spark ジョブを送信する** 

PySpark バージョンでは、追加の依存 JAR をインポートする必要があるため、Spark アプリケーションを実行するには追加の手順が必要です。

インストール時に `SPARK_HOME` を指定しなかった場合は、実行時に必要な jar を JVM にロードする必要があります`spark-submit`。`feature-store-pyspark-dependency-jars` は、すべての jar へのパスを自動的に取得するために Spark ライブラリによってインストールされる Python スクリプトです。

```
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
```

Amazon EMR でこのアプリケーションを実行している場合は、依存 jar を他のタスクノードに配布する必要がないように、アプリケーションをクライアントモードで実行することをお勧めします。次のような Spark 引数を使用して Amazon EMR クラスターにステップを 1 つ追加します。

```
spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
```

------
#### [ Example Scala script ]

 *FeatureStoreBatchingestion.scala* 

```
import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TestSparkApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()

    // Construct test DataFrame
    val data = List(
      Row("1", "2021-07-01T12:20:12Z"),
      Row("2", "2021-07-02T12:20:13Z"),
      Row("3", "2021-07-03T12:20:14Z")
    )
    
    val schema = StructType(
      List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    
    // Initialize FeatureStoreManager with a role arn if your feature group is created by another account
    val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
    
    // Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df)

    val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"
   
    // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
    // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
    featureStoreManager.ingestData(df, featureGroupArn)
    
    // To select the target stores for ingestion, you can specify the target store as the paramter
    // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
    featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore"))
    
    // If only OfflineStore is selected, the connector will batch write the data to offline store directly
    featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"])
    
    // To retrieve the records failed to be ingested by spark connector
    val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame()
  }
}
```

 **Spark ジョブの送信** 

 **Scala** 

Feature Store Spark は通常の依存関係として使用できるはずです。すべてのプラットフォームでアプリケーションを実行するために、追加の指示は必要ありません。

------