

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon SageMaker 특성 저장소 Spark를 통한 일괄 수집
<a name="batch-ingestion-spark-connector-setup"></a>

Amazon SageMaker 특성 저장소 Spark는 Spark 라이브러리를 특성 저장소에 연결하는 Spark 커넥터입니다. 특성 저장소 Spark는 Spark `DataFrame`에서 특성 그룹으로의 데이터 수집을 단순화합니다. 특성 저장소는 Amazon EMR, GIS, AWS Glue 작업, Amazon SageMaker Processing 작업 또는 SageMaker 노트북에서 기존 ETL 파이프라인을 사용하여 Spark로 배치 데이터 수집을 지원합니다.

Python 및 Scala 개발자를 위해 일괄 데이터 수집을 설치, 구현하는 방법이 제공됩니다. Python 개발자는 [Amazon SageMaker 특성 저장소 Spark GitHub 리포지토리](https://github.com/aws/sagemaker-feature-store-spark)의 지침에 따라 로컬 개발, Amazon EMR 설치 및 Jupyter notebook에 대한 오픈 소스 `sagemaker-feature-store-pyspark`Python 라이브러리를 사용할 수 있습니다. Scala 개발자는 [Amazon SageMaker 특성 저장소 Spark SDK Maven Central 리포지토리](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)에서 제공되는 특성 저장소 Spark 커넥터를 사용할 수 있습니다.

온라인 저장소, 오프라인 저장소 또는 둘 다 활성화되었는지 여부에 따라 Spark 커넥터를 사용하여 다음과 같은 방법으로 데이터를 수집할 수 있습니다.

1. 기본 수집 - 온라인 저장소가 활성화된 경우 Spark 커넥터는 먼저 [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API를 사용하여 데이터프레임을 온라인 저장소에 수집합니다. 이벤트 시간이 가장 긴 레코드만 온라인 저장소에 남습니다. 오프라인 저장소가 활성화되면 15분 이내에 특성 저장소가 데이터프레임을 오프라인 저장소에 수집합니다. 온라인 및 오프라인 저장소의 작동 방식에 대한 자세한 내용은 [특성 저장소 개념](feature-store-concepts.md)섹션을 참조하세요.

   `.ingest_data(...)` 메서드에서 `target_stores`를 지정하지 않고 이 작업을 수행할 수 있습니다.

1. 오프라인 저장소 직접 수집 - 오프라인 저장소가 활성화된 경우 Spark 커넥터 배치는 데이터프레임을 오프라인 저장소에 직접 수집합니다. 데이터프레임을 오프라인 저장소에 직접 수집해도 온라인 저장소는 업데이트되지 않습니다.

   `.ingest_data(...)` 메서드에서 `target_stores=["OfflineStore"]`를 설정하여 이 작업을 수행할 수 있습니다.

1. 온라인 저장소 전용 - 온라인 저장소가 활성화된 경우 Spark 커넥터는 [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) API를 사용하여 데이터 프레임을 온라인 저장소로 수집합니다. 데이터프레임을 온라인 저장소에 직접 수집해도 오프라인 저장소는 업데이트되지 않습니다.

   `.ingest_data(...)` 메서드에서 `target_stores=["OnlineStore"]`를 설정하여 이 작업을 수행할 수 있습니다.

다양한 수집 방법 사용에 대한 자세한 내용은 [예제 구현](#batch-ingestion-spark-connector-example-implementations)섹션을 참조하세요.

**Topics**
+ [특성 저장소 Spark 설치](#batch-ingestion-spark-connector-installation)
+ [특성 저장소 Spark JAR 검색](#retrieve-jar-spark-connector)
+ [예제 구현](#batch-ingestion-spark-connector-example-implementations)

## 특성 저장소 Spark 설치
<a name="batch-ingestion-spark-connector-installation"></a>

 **Scala 사용자** 

특성 저장소 Spark SDK는 Scala 사용자를 위한 [Amazon SageMaker 특성 저장소 Spark SDK Maven Central 리포지토리](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk)에서 사용할 수 있습니다.

 ****요구 사항**** 
+ Spark >= 3.0.0 및 <= 3.3.0
+ `iceberg-spark-runtime` >= 0.14.0
+ Scala >= 2.12.x  
+  Amazon EMR >= 6.1.0 (Amazon EMR을 사용하는 경우에만 해당) 

 **POM.xml 파일에서의 종속성 선언** 

특성 저장소 Spark 커넥터는 `iceberg-spark-runtime`라이브러리에 종속되어 있습니다. 따라서 Iceberg 테이블 형식으로 자동 생성한 특성 그룹에 데이터를 수집하려면 해당 버전의 `iceberg-spark-runtime`라이브러리를 종속 항목에 추가해야 합니다. 예를 들어 Spark 3.1을 사용하는 경우 프로젝트의 `POM.xml`에서 다음을 선언해야 합니다.

```
 <dependency>
 <groupId>software.amazon.sagemaker.featurestore</groupId>
 <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
 <version>1.0.0</version>
 </dependency>
 
 <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
   <version>0.14.0</version>
</dependency>
```

 **Python 사용자** 

특성 저장소 Spark SDK는 오픈 소스 [Amazon SageMaker 특성 저장소 Spark GitHub 리포지토리](https://github.com/aws/sagemaker-feature-store-spark)에서 사용할 수 있습니다.

 ****요구 사항**** 
+ Spark >= 3.0.0 및 <= 3.3.0
+ Amazon EMR >= 6.1.0 (Amazon EMR을 사용하는 경우에만 해당) 
+ 커널 = `conda_python3`

Spark가 설치된 디렉터리로 `$SPARK_HOME`을 설정하는 것이 좋습니다. 설치 중에 특성 저장소는 필수 JAR을 `SPARK_HOME`에 업로드하여 종속 항목이 자동으로 로드되도록 합니다. 이 PySpark 라이브러리가 작동하려면 Spark에서 JVM을 시작해야 합니다.

 **로컬 설치** 

설치에 대한 자세한 정보를 찾으려면 `--verbose`를 다음 설치 명령을 추가하여 상세 내용 표시 모드를 활성화하세요.

```
pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

 **Amazon EMR에 설치** 

릴리스 버전 6.1.0 이상을 사용하여 Amazon EMR 클러스터를 생성합니다. SSH를 활성화하면 문제를 해결하는 데 도움이 됩니다.

다음을 수행하여 라이브러리를 설치할 수 있습니다.
+ Amazon EMR 내에서 사용자 지정 단계를 생성합니다.
+ SSH를 사용하여 클러스터에 연결하고 그곳에 라이브러리를 설치합니다.

**참고**  
다음 정보는 Spark 버전 3.1을 사용하지만 요구 사항을 충족하는 모든 버전을 지정할 수 있습니다.

```
export SPARK_HOME=/usr/lib/spark
sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

**참고**  
종속 JAR을 SPARK\$1HOME에 자동으로 설치하려면 부트스트랩 단계를 사용하지 마세요.

 **SageMaker 노트북 인스턴스에 설치** 

다음 명령을 사용하여 Spark 커넥터와 호환되는 PySpark 버전을 설치합니다.

```
!pip3 install pyspark==3.1.1 
!pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

오프라인 저장소에 일괄 수집을 수행하는 경우 종속성은 노트북 인스턴스 환경 내에 있지 않습니다.

```
from pyspark.sql import SparkSession
import feature_store_pyspark

extra_jars = ",".join(feature_store_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.jars", extra_jars) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
    .getOrCreate()
```

 **GIS를 사용하는 노트북에 설치** 

**중요**  
 AWS Glue 버전 2.0 이상을 사용해야 합니다.

다음 정보를 사용하면 AWS Glue 대화형 세션(GIS)에 PySpark 커넥터를 설치하는 데 도움이 됩니다.

Amazon SageMaker 특성 저장소 Spark를 사용하려면 세션을 초기화하는 동안 특정 Spark 커넥터 JAR을 Amazon S3 버킷에 업로드해야 합니다. S3 버킷에 필수 JAR을 업로드하는 방법에 대한 자세한 내용은 [특성 저장소 Spark JAR 검색](#retrieve-jar-spark-connector)섹션을 참조하세요.

JAR을 업로드한 후에는 다음 명령을 사용하여 JAR과 함께 GIS 세션을 제공해야 합니다.

```
%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
```

 AWS Glue 런타임에 특성 저장소 Spark를 설치하려면 GIS notebook. AWS Glue runs 내의 `%additional_python_modules` 매직 명령을 사용하여에서 지정한 `pip` 모듈을 실행합니다`%additional_python_modules`.

```
%additional_python_modules sagemaker-feature-store-pyspark-3.1
```

 AWS Glue 세션을 시작하기 전에 위의 매직 명령을 모두 사용해야 합니다.

 ** AWS Glue 작업에 설치** 

**중요**  
 AWS Glue 버전 2.0 이상을 사용해야 합니다.

 AWS Glue 작업에 Spark 커넥터를 설치하려면 `--extra-jars` 인수를 사용하여 필요한 JARs을 제공하고 다음 예제와 같이 작업을 생성할 때 Spark 커넥터를 AWS Glue 작업 파라미터로 `--additional-python-modules` 설치합니다. S3 버킷에 필수 JAR을 업로드하는 방법에 대한 자세한 내용은 [특성 저장소 Spark JAR 검색](#retrieve-jar-spark-connector)섹션을 참조하세요.

```
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
    Name=pipeline_id,
    Description='Feature Store Compute Job',
    Role=glue_role_arn,
    ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location_uri,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--TempDir': temp_dir_location_uri,
        '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
        '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
        ...
    },
    MaxRetries=3,
    NumberOfWorkers=149,
    Timeout=2880,
    GlueVersion='3.0',
    WorkerType='G.2X'
)
```

 **Amazon SageMaker Processing 작업에 설치** 

Amazon SageMaker Processing 작업에서 특성 저장소 Spark를 사용하려면 사용자 고유 이미지를 가져오세요. 이미지 가져오기에 대한 자세한 내용은 [Amazon SageMaker Studio Classic의 사용자 지정 이미지](studio-byoi.md)섹션을 참조하세요. Dockerfile에 설치 단계를 추가합니다. 도커 이미지를 Amazon ECR 리포지토리로 푸시한 후 PySpark 프로세서를 사용하여 처리 작업을 생성할 수 있습니다. Pyspark 프로세서를 사용한 처리 작업 생성에 대한 자세한 내용은 [Apache Spark를 사용한 처리 작업 실행](use-spark-processing-container.md)섹션을 참조하세요.

다음은 Dockerfile에 설치 단계를 추가하는 예제입니다.

```
FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0

RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

## 특성 저장소 Spark JAR 검색
<a name="retrieve-jar-spark-connector"></a>

특성 저장소 Spark 종속성 JAR을 검색하려면 네트워크 액세스가 가능한 Python 환경에서 `pip`를 사용하여 Python Package Index (PyPI) 리포지토리에서 Spark 커넥터를 설치해야 합니다. SageMaker Jupyter notebook은 네트워크 액세스가 가능한 Python 환경의 한 예입니다.

다음 명령은 Spark 커넥터를 설치합니다.

```
!pip install sagemaker-feature-store-pyspark-3.1      
```

특성 저장소 Spark를 설치한 후 JAR 위치를 검색하고 JAR을 Amazon S3에 업로드할 수 있습니다.

이 `feature-store-pyspark-dependency-jars`명령은 특성 저장소 Spark가 추가한 필수 종속성 JAR의 위치를 제공합니다. 명령을 사용하여 JAR을 검색하고 이를 Amazon S3에 업로드할 수 있습니다.

```
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]

s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
```

## 예제 구현
<a name="batch-ingestion-spark-connector-example-implementations"></a>

------
#### [ Example Python script ]

 *FeatureStoreBatchIngestion.py* 

```
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark

spark = SparkSession.builder \
                    .getOrCreate()

# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]

df = spark.createDataFrame(data).toDF(*columns)

# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
 
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)

feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"

# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)

# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])

# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
```

 **예제 Python 스크립트가 포함된 Spark 작업 제출** 

PySpark 버전에서는 추가 종속 JAR을 가져와야 하므로 Spark 애플리케이션을 실행하려면 추가 단계가 필요합니다.

설치 중 `SPARK_HOME`을 지정하지 않은 경우 `spark-submit`실행 시 필요한 JAR을 JVM에 로드해야 합니다. `feature-store-pyspark-dependency-jars`는 모든 JAR의 경로를 자동으로 가져오기 위해 Spark 라이브러리에서 설치한 Python 스크립트입니다.

```
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
```

Amazon EMR에서 이 애플리케이션을 실행하는 경우 종속 JAR을 다른 태스크 노드에 배포할 필요가 없도록 클라이언트 모드에서 애플리케이션을 실행하는 것이 좋습니다. Amazon EMR 클러스터에 다음과 비슷한 Spark 인수를 사용하여 단계를 하나 더 추가합니다.

```
spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
```

------
#### [ Example Scala script ]

 *FeatureStoreBatchIngestion.scala* 

```
import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TestSparkApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()

    // Construct test DataFrame
    val data = List(
      Row("1", "2021-07-01T12:20:12Z"),
      Row("2", "2021-07-02T12:20:13Z"),
      Row("3", "2021-07-03T12:20:14Z")
    )
    
    val schema = StructType(
      List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    
    // Initialize FeatureStoreManager with a role arn if your feature group is created by another account
    val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
    
    // Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df)

    val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"
   
    // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
    // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
    featureStoreManager.ingestData(df, featureGroupArn)
    
    // To select the target stores for ingestion, you can specify the target store as the paramter
    // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
    featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore"))
    
    // If only OfflineStore is selected, the connector will batch write the data to offline store directly
    featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"])
    
    // To retrieve the records failed to be ingested by spark connector
    val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame()
  }
}
```

 **Spark 작업 제출** 

 **Scala** 

특성 저장소 Spark를 일반 종속 항목으로 사용할 수 있어야 합니다. 모든 플랫폼에서 애플리케이션을 실행하는 데 추가 지침은 필요하지 않습니다.

------