

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Ingestión por lotes con Amazon SageMaker Feature Store Spark
<a name="batch-ingestion-spark-connector-setup"></a>

Amazon SageMaker Feature Store Spark es un conector de Spark que conecta la biblioteca de Spark con Feature Store. Spark con el almacén de características simplifica la ingesta de datos de los `DataFrame` de Spark a los grupos de características. Feature Store admite la ingesta de datos por lotes con Spark, utilizando tu canalización de ETL existente, en Amazon EMR, GIS, un trabajo, AWS Glue un trabajo de procesamiento de SageMaker Amazon o SageMaker un cuaderno.

Se proporcionan métodos para instalar e implementar la ingesta de datos por lotes para los desarrolladores de Python y Scala. Los desarrolladores de Python pueden usar la biblioteca `sagemaker-feature-store-pyspark` Python de código abierto para el desarrollo local, la instalación en Amazon EMR y para los Jupyter Notebooks siguiendo las instrucciones del repositorio Spark de [Amazon SageMaker ](https://github.com/aws/sagemaker-feature-store-spark) Feature Store. GitHub Los desarrolladores de Scala pueden usar el conector Spark de Feature Store, disponible en el repositorio [central de Maven del SDK de Spark de Amazon SageMaker Feature Store](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk).

Puede utilizar el conector Spark para ingerir datos de las siguientes maneras, en función de si están habilitados el almacenamiento en línea, el almacenamiento sin conexión o ambos.

1. Introducir de forma predeterminada: si la tienda online está habilitada, el conector Spark primero introduce el marco de datos en la tienda online mediante la API. [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) Solo queda en el almacenamiento en línea el registro con la hora del evento más grande. Si está habilitado el almacenamiento sin conexión, el almacén de características incorpora su marco de datos al almacenamiento sin conexión dentro de 15 minutos. Para obtener más información sobre cómo funcionan el almacenamiento sin conexión y el almacenamiento en línea, consulte [Conceptos del almacén de características](feature-store-concepts.md).

   Puede lograr esto si no especifica `target_stores` en el método `.ingest_data(...)`. 

1. Ingesta directa al almacenamiento sin conexión: si está habilitado el almacenamiento sin conexión, el conector Spark incorpora por lotes el marco de datos directamente al almacenamiento sin conexión. La ingesta directa del marco de datos al almacenamiento sin conexión no actualiza el almacenamiento en línea.

   Puede lograr esto si especifica `target_stores=["OfflineStore"]` en el método `.ingest_data(...)`.

1. Solo en la tienda en línea: si la tienda en línea está habilitada, el conector Spark introduce tu marco de datos en la tienda en línea mediante la API. [PutRecord](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html) La ingesta directa del marco de datos al almacenamiento en línea no actualiza el almacenamiento sin conexión. 

   Puede lograr esto si especifica `target_stores=["OnlineStore"]` en el método `.ingest_data(...)`.

Para obtener información acerca de los distintos métodos de ingestión, consulte [Implementaciones de ejemplo](#batch-ingestion-spark-connector-example-implementations).

**Topics**
+ [Instalación de Spark en el almacén de características](#batch-ingestion-spark-connector-installation)
+ [Recuperación del JAR para el Spark del almacén de características](#retrieve-jar-spark-connector)
+ [Implementaciones de ejemplo](#batch-ingestion-spark-connector-example-implementations)

## Instalación de Spark en el almacén de características
<a name="batch-ingestion-spark-connector-installation"></a>

 **Usuarios de Scala** 

El SDK de Spark de Feature Store está disponible en el [repositorio central de Maven del SDK de Spark de Amazon SageMaker Feature Store](https://mvnrepository.com/artifact/software.amazon.sagemaker.featurestore/sagemaker-feature-store-spark-sdk) para los usuarios de Scala.

 ****Requisitos**** 
+ Spark >= 3.0.0 y <= 3.3.0
+ `iceberg-spark-runtime` >= 0.14.0
+ Scala >= 2.12.x  
+  Amazon EMR >= 6.1.0 (solo si usa Amazon EMR) 

 **Declare la dependencia en POM.xml** 

El conector Spark del almacén de características depende de la biblioteca `iceberg-spark-runtime`. Por lo tanto, debe agregar la versión correspondiente de la biblioteca `iceberg-spark-runtime` a la dependencia si va a ingerir datos en un grupo de características que ha creado automáticamente con el formato de tabla de Iceberg. Por ejemplo, si utiliza Spark 3.1, debe declarar lo siguiente en el `POM.xml` del proyecto: 

```
 <dependency>
 <groupId>software.amazon.sagemaker.featurestore</groupId>
 <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
 <version>1.0.0</version>
 </dependency>
 
 <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
   <version>0.14.0</version>
</dependency>
```

 **Usuarios de Python** 

El SDK de Spark de Feature Store está disponible en el [ GitHubrepositorio de código abierto de Amazon SageMaker Feature Store Spark](https://github.com/aws/sagemaker-feature-store-spark).

 ****Requisitos**** 
+ Spark >= 3.0.0 y <= 3.3.0
+ Amazon EMR >= 6.1.0 (solo si usa Amazon EMR) 
+ Kernel = `conda_python3`

Se recomienda configurar `$SPARK_HOME` en el directorio en el que tenga instalado Spark. Durante la instalación, el almacén de características carga el JAR necesario para `SPARK_HOME`, de modo que las dependencias se carguen automáticamente. Para que esta PySpark biblioteca funcione, es necesario que Spark inicie una JVM.

 **Instalación local** 

Para obtener más información sobre la instalación, habilite el modo detallado anexando `--verbose` al siguiente comando de instalación. 

```
pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

 **Instalación en Amazon EMR** 

Cree un clúster de Amazon EMR con la versión de lanzamiento 6.1.0 o posterior. Habilite SSH para ayudarle a solucionar cualquier problema.

Puede hacer una de estas cosas para instalar la biblioteca:
+ Crear un paso personalizado en Amazon EMR.
+ Conectarse a su clúster mediante SSH e instalar la biblioteca desde allí.

**nota**  
La siguiente información usa la versión 3.1 de Spark, pero puede especificar cualquier versión que cumpla los requisitos.

```
export SPARK_HOME=/usr/lib/spark
sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

**nota**  
Si quiere instalar el dependiente JARs automáticamente en SPARK\$1HOME, no utilice el paso de arranque.

 **Instalación en una instancia de portátil SageMaker ** 

Instala una versión PySpark que sea compatible con el conector Spark mediante los siguientes comandos:

```
!pip3 install pyspark==3.1.1 
!pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:
```

Si va a realizar una ingestión por lotes en el almacenamiento sin conexión, las dependencias no se encuentran en el entorno de instancias del cuaderno.

```
from pyspark.sql import SparkSession
import feature_store_pyspark

extra_jars = ",".join(feature_store_pyspark.classpath_jars())

spark = SparkSession.builder \
    .config("spark.jars", extra_jars) \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
    .getOrCreate()
```

 **Instalación en cuadernos con GIS** 

**importante**  
Debes usar la AWS Glue versión 2.0 o posterior.

Utilice la siguiente información como ayuda para instalar el PySpark conector en una sesión AWS Glue interactiva (SIG).

Amazon SageMaker Feature Store Spark requiere un JAR de conector Spark específico durante la inicialización de la sesión para cargarlo en tu bucket de Amazon S3. Para obtener más información sobre cómo cargar el JAR requerido en su bucket de S3, consulte [Recuperación del JAR para el Spark del almacén de características](#retrieve-jar-spark-connector).

Una vez cargado el JAR, debe proporcionar el JAR a las sesiones de GIS mediante el siguiente comando. 

```
%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
```

Para instalar Feature Store Spark en AWS Glue tiempo de ejecución, utilice el comando `%additional_python_modules` mágico del bloc de notas GIS. AWS Glue se ejecuta `pip` en los módulos que especificó en`%additional_python_modules`.

```
%additional_python_modules sagemaker-feature-store-pyspark-3.1
```

Antes de iniciar la AWS Glue sesión, debe utilizar los dos comandos mágicos anteriores.

 **Instalación en un AWS Glue trabajo** 

**importante**  
Debe utilizar la AWS Glue versión 2.0 o posterior.

Para instalar el conector Spark en un AWS Glue trabajo, utilice el `--extra-jars` argumento para proporcionar lo necesario JARs e `--additional-python-modules` instalar el Spark Connector como parámetros del trabajo al crear el AWS Glue trabajo, como se muestra en el siguiente ejemplo. Para obtener más información sobre cómo cargar el JAR requerido en su bucket de S3, consulte [Recuperación del JAR para el Spark del almacén de características](#retrieve-jar-spark-connector).

```
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
    Name=pipeline_id,
    Description='Feature Store Compute Job',
    Role=glue_role_arn,
    ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_location_uri,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--TempDir': temp_dir_location_uri,
        '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
        '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
        ...
    },
    MaxRetries=3,
    NumberOfWorkers=149,
    Timeout=2880,
    GlueVersion='3.0',
    WorkerType='G.2X'
)
```

 **Instalación en un trabajo de Amazon SageMaker Processing** 

Para usar Feature Store Spark con los trabajos SageMaker de Amazon Processing, trae tu propia imagen. Para obtener más información acerca de cómo traer su propia imagen, consulte [Imágenes personalizadas en Amazon SageMaker Studio Classic](studio-byoi.md). Agregue el paso de instalación a un Dockerfile. Después de insertar la imagen de Docker en un repositorio de Amazon ECR, puede utilizarla PySparkProcessor para crear el trabajo de procesamiento. Para obtener más información sobre la creación de un trabajo de procesamiento con el PySpark procesador, consulte. [Ejecución de un trabajo de procesamiento con Apache Spark](use-spark-processing-container.md)

A continuación, se muestra un ejemplo de cómo agregar un paso de instalación al Dockerfile.

```
FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0

RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
```

## Recuperación del JAR para el Spark del almacén de características
<a name="retrieve-jar-spark-connector"></a>

Para recuperar el JAR de dependencias de Spark del almacén de características, debe instalar el conector Spark desde el repositorio Índice de paquetes de Python (PyPI) mediante `pip` en cualquier entorno Python con acceso a la red. Un SageMaker Jupyter Notebook es un ejemplo de un entorno Python con acceso a la red.

El siguiente comando instala el conector Spark.

```
!pip install sagemaker-feature-store-pyspark-3.1      
```

Tras instalar el Spark del almacén de características, puede recuperar la ubicación del JAR y cargar el JAR a Amazon S3.

El comando `feature-store-pyspark-dependency-jars` proporciona la ubicación del JAR de dependencia necesario que agregó el Spark del almacén de características. Puede utilizar el comando para recuperar el JAR y cargarlo en Amazon S3.

```
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]

s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
```

## Implementaciones de ejemplo
<a name="batch-ingestion-spark-connector-example-implementations"></a>

------
#### [ Example Python script ]

 *FeatureStoreBatchIngestion.py* 

```
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark

spark = SparkSession.builder \
                    .getOrCreate()

# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]

df = spark.createDataFrame(data).toDF(*columns)

# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
 
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)

feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"

# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)

# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])

# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])

# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
```

 **Enviar un trabajo de Spark con ejemplo de script de Python** 

La PySpark versión requiere la importación de un JAR dependiente adicional, por lo que se necesitan pasos adicionales para ejecutar la aplicación Spark. 

Si no lo especificaste `SPARK_HOME` durante la instalación, tendrás que cargar lo necesario JARs en la JVM durante la ejecución`spark-submit`. `feature-store-pyspark-dependency-jars`es un script de Python instalado por la biblioteca Spark para buscar automáticamente la ruta a todo JARs por ti. 

```
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
```

Si ejecuta esta aplicación en Amazon EMR, le recomendamos que ejecute la aplicación en modo cliente, de modo que no necesite distribuir lo dependiente JARs a otros nodos de tareas. Agregue un paso más en el clúster de Amazon EMR con un argumento de Spark similar al siguiente:

```
spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
```

------
#### [ Example Scala script ]

 *FeatureStoreBatchIngestion.scala* 

```
import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object TestSparkApp {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()

    // Construct test DataFrame
    val data = List(
      Row("1", "2021-07-01T12:20:12Z"),
      Row("2", "2021-07-02T12:20:13Z"),
      Row("3", "2021-07-03T12:20:14Z")
    )
    
    val schema = StructType(
      List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType))
    )

    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
    
    // Initialize FeatureStoreManager with a role arn if your feature group is created by another account
    val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
    
    // Load the feature definitions from input schema. The feature definitions can be used to create a feature group
    val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df)

    val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>"
   
    // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
    // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
    featureStoreManager.ingestData(df, featureGroupArn)
    
    // To select the target stores for ingestion, you can specify the target store as the paramter
    // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
    featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore"))
    
    // If only OfflineStore is selected, the connector will batch write the data to offline store directly
    featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"])
    
    // To retrieve the records failed to be ingested by spark connector
    val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame()
  }
}
```

 **Enviar un trabajo de Spark** 

 **Scala** 

Debería poder usar el Spark del almacén de características como una dependencia normal. No se necesitan instrucciones adicionales para ejecutar la aplicación en todas las plataformas. 

------