

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Orígenes de datos
<a name="feature-store-feature-processor-data-sources"></a>

El procesamiento SageMaker de funciones de Amazon Feature Store admite múltiples fuentes de datos. El SDK para Python (Boto3) del procesador de características proporciona constructos para cargar datos de grupos de características u objetos almacenados en Amazon S3. Además, puede crear orígenes de datos personalizados para cargar datos de otros orígenes de datos. Para obtener información sobre los orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [Orígenes de datos del SDK del procesador de características](feature-store-feature-processor-data-sources-sdk.md)
+ [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md)
+ [Ejemplos de orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom-examples.md)

# Orígenes de datos del SDK del procesador de características
<a name="feature-store-feature-processor-data-sources-sdk"></a>

El SDK del procesador de características de Amazon SageMaker Feature Store para Python (Boto3) proporciona construcciones para cargar datos de grupos de características u objetos almacenados en Amazon S3. Para obtener una lista completa de las definiciones de orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Para ver ejemplos sobre cómo utilizar las definiciones de orígenes de datos del SDK de Python del almacén de características, consulte [Ejemplo de código de procesamiento de características para casos de uso habituales](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` se utiliza para especificar un grupo de características como origen de datos de entrada para un procesador de características. Los datos se pueden cargar desde un grupo de características de un almacenamiento sin conexión. Si intenta cargar los datos desde un grupo de características de un almacenamiento en línea, se producirá un error de validación. Puede especificar desplazamientos de inicio y final para limitar los datos que se cargan a un intervalo de tiempo específico. Por ejemplo, puede especificar un desplazamiento de inicio de “14 días” para cargar solo las dos últimas semanas de datos y, además, puede especificar un desplazamiento de finalización de “7 días” para limitar la entrada de datos a la semana anterior.

## Definiciones de orígenes de datos proporcionadas por el almacén de características
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

El SDK de Python del almacén de características contiene definiciones de orígenes de datos que se pueden usar para especificar varios orígenes de datos de entrada para un procesador de características. Entre ellas se incluyen orígenes de tablas CSV, Parquet e Iceberg. Para obtener una lista completa de las definiciones de orígenes de datos proporcionadas por el almacén de características, consulte [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Orígenes de datos personalizados
<a name="feature-store-feature-processor-data-sources-custom"></a>

En esta página se describe cómo crear una clase de origen de datos personalizado y se muestran algunos ejemplos de uso. Con las fuentes de datos personalizadas, puedes usar el SDK de SageMaker IA para Python (Boto3) que se proporciona de la misma manera que si APIs utilizaras las fuentes de datos proporcionadas por Amazon SageMaker Feature Store. 

Para utilizar un origen de datos personalizado para transformar e ingerir datos en un grupo de características mediante el procesamiento de características, tendrá que ampliar la clase `PySparkDataSource` con los siguientes miembros y funciones de la clase.
+ `data_source_name` (str): un nombre arbitrario para el origen de datos. Por ejemplo, un ARN de Amazon Redshift, Snowflake o catálogo de Glue.
+ `data_source_unique_id` (str): un identificador único que hace referencia al recurso específico al que se accede. Por ejemplo, nombre de tabla, ARN de tabla de DDB, prefijo de Amazon S3. Todo uso del mismo `data_source_unique_id` en orígenes de datos personalizados se asociará al mismo origen de datos en la vista de linaje. El linaje incluye la información sobre el código de ejecución del flujo de trabajo de procesamiento de características, los orígenes de datos que se utilizaron y la forma en que se incorporaron al grupo de características o a la característica. Para obtener más información sobre cómo ver el linaje de un grupo de características en **Studio**, consulte [Visualización del linaje desde la consola](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data` (func): método utilizado para conectarse con el procesador de características. Devuelve un marco de datos de Spark. Para ver ejemplos, consulte [Ejemplos de orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom-examples.md).

Tanto `data_source_name` como `data_source_unique_id` se utilizan para identificar de forma exclusiva la entidad del linaje. A continuación, se muestra un ejemplo de una clase de origen de datos personalizado que se denomina `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Ejemplos de orígenes de datos personalizados
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

En esta sección se proporcionan ejemplos de implementaciones de orígenes de datos personalizados para procesadores de características. Para obtener más información sobre los orígenes de datos personalizados, consulte [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md).

La seguridad es una responsabilidad compartida entre nuestros clientes AWS y nosotros. AWS es responsable de proteger la infraestructura en la que se ejecutan los servicios del Nube de AWS. Los clientes son responsables de todas las tareas necesarias de configuración y administración de la seguridad. Por ejemplo, los secretos, como las credenciales de acceso a los almacenes de datos, no deben estar codificados de forma rígida en los orígenes de datos personalizados. Puede utilizarlas AWS Secrets Manager para administrar estas credenciales. Para obtener información sobre Secrets Manager, consulte [¿Qué es AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) en la guía AWS Secrets Manager del usuario. En los siguientes ejemplos, se utilizará Secrets Manager para las credenciales.

**Topics**
+ [Ejemplos de orígenes de datos personalizados de clústeres de Amazon Redshift (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [Ejemplos de orígenes de datos personalizados de Snowflake](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [Ejemplos de orígenes de datos personalizados de Databricks (JDBC)](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [Ejemplos de orígenes de datos personalizados de transmisión](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Ejemplos de orígenes de datos personalizados de clústeres de Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift ofrece un controlador JDBC que puede utilizarse para leer datos con Spark. Para obtener información acerca de cómo descargar el controlador JDBC de Amazon Redshift, consulte [Descargar el controlador JDBC versión 2.1 de Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Para crear la clase de origen de datos personalizado de Amazon Redshift, tendrá que sobrescribir el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md). 

Para conectarse a un clúster de Amazon Redshift, necesita:
+ URL de JDBC de Amazon Redshift (`jdbc-url`).

  Para obtener información sobre cómo obtener la URL de JDBC de Amazon Redshift, consulte [Obtención de la URL de JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ Nombre de usuario (`redshift-user`) y contraseña (`redshift-password`) de Amazon Redshift.

  Para obtener información acerca de cómo crear y administrar usuarios de bases de datos mediante comandos SQL de Amazon Redshift, consulte [Usuarios](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ Nombre de la tablas de Amazon Redshift (`redshift-table-name`).

  Para obtener información acerca de cómo crear una tabla con algunos ejemplos, consulte [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) en la Guía para desarrolladores de bases de datos de Amazon Redshift.
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-redshift-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Amazon Redshift en Secrets Manager.

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar la URL de JDBC y el token de acceso personal de Secrets Manager y cómo anular `read_data` para su clase de origen de datos personalizado, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

En el siguiente ejemplo se muestra cómo conectar `RedshiftDataSource` con el decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar el controlador JDBC, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake proporciona un conector Spark que puede utilizar para su decorador `feature_processor`. Para obtener información sobre el conector de Snowflake para Spark, consulte [Snowflake Connector for Spark](https://docs.snowflake.com/en/user-guide/spark-connector) en la documentación de Snowflake.

Para crear la clase de origen de datos personalizado de Snowflake, tendrá que anular el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md) y agregar paquetes de conectores Spark a la ruta de clases de Spark. 

Para conectarse a un origen de datos de Snowflake, necesita:
+ URL de Snowflake (`sf-url`).

  Para obtener información sobre cómo URLs acceder a las interfaces web de Snowflake, consulte los [identificadores de cuenta](https://docs.snowflake.com/en/user-guide/admin-account-identifier) en la documentación de Snowflake.
+ Base de datos de Snowflake (`sf-database`). 

  Para obtener información sobre cómo obtener el nombre de la base de datos de Snowflake, consulte [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) en la documentación de Snowflake.
+ Esquema de base de datos de Snowflake (`sf-schema`). 

  Para obtener información sobre cómo obtener el nombre del esquema de Snowflake, consulte [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) en la documentación de Snowflake.
+ Almacén de Snowflake (`sf-warehouse`).

  Para obtener información sobre cómo obtener el nombre del almacén de Snowflake, consulte [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) en la documentación de Snowflake.
+ Nombre de la tabla de Snowflake (`sf-table-name`).
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-snowflake-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Snowflake en Secrets Manager. 

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar el nombre de usuario y la contraseña de Snowflake de Secrets Manager y cómo anular la función `read_data` para su clase de origen de datos personalizado, `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

En el siguiente ejemplo se muestra cómo conectar `SnowflakeDataSource` con el decorador `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar los paquetes, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`. En los paquetes de Spark en el siguiente ejemplo `spark-snowflake_2.12` es la versión de Scala del procesador de características, `2.12.0` es la versión de Snowflake que desea utilizar y `spark_3.3` es la versión de Spark del procesador de características. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark puede leer datos de Databricks mediante el controlador JDBC de Databricks. Para obtener información sobre el controlador JDBC de Databricks, consulte [Configure the Databricks ODBC and JDBC drivers](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) en la documentación de Databricks.

**nota**  
Puede leer los datos de cualquier otra base de datos si incluye el controlador JDBC correspondiente en la ruta de clases de Spark. Para obtener más información, consulte [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) en la Guía de Spark SQL.

Para crear la clase de origen de datos personalizado de Databricks, tendrá que anular el método `read_data` de los [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md) y agregar paquetes de conectores Spark a la ruta de clases de Spark. 

Para conectarse a un origen de datos de Databricks, necesita:
+ URL de Databricks (`databricks-url`).

  Para obtener información sobre la URL de Databricks, consulte [Building the connection URL for the Databricks driver](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) en la documentación de Databricks.
+ Token de acceso personal de Databricks (`personal-access-token`).

  Para obtener información sobre el token de acceso a Databricks, consulte [Databricks personal access token authentication](https://docs.databricks.com/en/dev-tools/auth.html#pat) en la documentación de Databricks.
+ Nombre del catálogo de datos (`db-catalog`). 

  Para obtener información sobre el nombre del catálogo de Databricks, consulte [Catalog name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) en la documentación de Databricks.
+ Nombre del esquema (`db-schema`).

  Para obtener información sobre el nombre del esquema de Databricks, consulte [Schema name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) en la documentación de Databricks.
+ Nombre de la tabla (`db-table-name`).

  Para obtener información sobre el nombre de la tabla de Databricks, consulte [Table name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) en la documentación de Databricks.
+ De forma opcional, si utiliza Secrets Manager, necesitará el nombre del secreto (`secret-databricks-account-info`) en el que guarda el nombre de usuario y contraseña de acceso a Databricks en Secrets Manager. 

  Para obtener información sobre Secrets Manager, consulte [Buscar secretos AWS Secrets Manager en](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) la Guía del AWS Secrets Manager usuario. 
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

En el siguiente ejemplo se muestra cómo recuperar la URL de JDBC y el token de acceso personal de Secrets Manager y cómo sobrescribir `read_data` para su clase de origen de datos personalizado, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

El siguiente ejemplo muestra cómo cargar el jar del controlador JDBC, `jdbc-jar-file-name.jar`, a Amazon S3 para agregarlo a la ruta de clases de Spark. Para obtener información sobre cómo descargar el controlador JDBC de Spark (`jdbc-jar-file-name.jar`) de Databricks, consulte [Download JDBC Driver](https://www.databricks.com/spark/jdbc-drivers-download) en el sitio web de Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Para ejecutar el trabajo del procesador de características de forma remota, debe proporcionar los jars, para lo cual debe definir `SparkConfig` y pasarlo al decorador `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Ejemplos de orígenes de datos personalizados de transmisión
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Puede conectarse a orígenes de datos de transmisión, como Amazon Kinesis, y crear transformaciones con Spark Structured Streaming para leer orígenes de datos de transmisión. Para obtener información sobre el conector de Kinesis, consulte Kinesis [Connector para Spark Structured](https://github.com/roncemer/spark-sql-kinesis) Streaming en. GitHub Para obtener información sobre Amazon Kinesis, consulte [What Is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) en la Guía para desarrolladores de Amazon Kinesis.

Para crear la clase de origen de datos personalizado de Amazon Kinesis, tendrá que ampliar la clase `BaseDataSource` y anular el método `read_data` de [Orígenes de datos personalizados](feature-store-feature-processor-data-sources-custom.md).

Para conectarse a un flujo de datos de Amazon Kinesis, necesita:
+ ARN de Kinesis (`kinesis-resource-arn`). 

  Para obtener información sobre la transmisión de datos de Kinesis ARNs, consulte [Amazon Resource Names (ARNs) para Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) en la Guía para desarrolladores de Amazon Kinesis.
+ Nombre de flujo de datos de Kinesis (`kinesis-stream-name`).
+ Región de AWS (`your-region`)

  Para obtener información sobre cómo obtener el nombre de la región de la sesión actual mediante el SDK para Python (Boto3), consulte [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) en la documentación de Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

En el siguiente ejemplo se muestra cómo conectar `KinesisDataSource` con el decorador `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

En el código de ejemplo anterior, se utilizan algunas opciones de la transmisión estructurada de Spark para transmitir microlotes al grupo de características. Para ver una lista completa de opciones, consulte la [guía de programación de transmisión estructurada](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) en la documentación de Apache Spark. 
+ El modo receptor `foreachBatch` es una característica que permite aplicar operaciones y escribir lógica en los datos de salida de cada microlote de una consulta de transmisión. 

  Para obtener más información`foreachBatch`, consulte el [uso de Foreach y](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) la Guía de programación de ForeachBatch streaming estructurado de Apache Spark. 
+ La opción `checkpointLocation` guarda periódicamente el estado de la aplicación de transmisión. El registro de transmisión se guarda en la ubicación del punto de control `s3a://checkpoint-path`.

  Para obtener información sobre la opción `checkpointLocation`, consulte [Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) en la guía de programación de transmisión estructurada de Apache Spark. 
+ La configuración `trigger` define la frecuencia con la que se desencadena el procesamiento por microlotes en una aplicación de transmisión. En el ejemplo, el tipo de desencadenador de tiempo de procesamiento se utiliza con intervalos de microlotes de un minuto, según lo especificado en `trigger(processingTime="1 minute")`. Para reponer desde un origen de transmisión, puede usar el tipo de desencadenador disponible en este momento, según lo especificado en `trigger(availableNow=True)`.

  Para ver una lista completa de tipos de `trigger`, consulte [Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) en la guía de programación de transmisión estructurada de Apache Spark.

**Transmisiones continuas y reintentos automáticos mediante desencadenadores basados en eventos**

El procesador de funciones utiliza SageMaker Training como infraestructura de cómputo y tiene un límite máximo de tiempo de ejecución de 28 días. Puede utilizar desencadenadores basados en eventos para ampliar la transmisión continua durante un período de tiempo más prolongado y recuperarse de errores transitorios. Para obtener más información sobre las ejecuciones programadas y basadas en eventos, consulte [Ejecuciones programadas y basadas en eventos para las canalizaciones del procesador de características](feature-store-feature-processor-schedule-pipeline.md).

A continuación, se muestra un ejemplo de cómo configurar un desencadenador basado en eventos para que la canalización del procesador de características de transmisión funcione de forma continua. Para ello se utiliza la función de transformación de transmisión definida en el ejemplo anterior. Es posible configurar una canalización objetivo para que se desencadene cuando se produzca un evento `STOPPED` o `FAILED` durante la ejecución de una canalización de origen. Tenga en cuenta que se utiliza la misma canalización como origen y objetivo para que se ejecute de forma continua.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```