

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Origini dati
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing supporta più fonti di dati. L'SDK del Processore di funzionalità per Python (Boto3) fornisce costrutti per caricare dati da gruppi di funzionalità o oggetti archiviati in Amazon S3. Inoltre, puoi creare origini dati personalizzate per caricare dati da altre origini dati. Per informazioni sulle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [

# Origini dati per SDK del Processore di funzionalità
](feature-store-feature-processor-data-sources-sdk.md)
+ [

# Origini dati personalizzate
](feature-store-feature-processor-data-sources-custom.md)
+ [

# Esempi di origini dati personalizzate
](feature-store-feature-processor-data-sources-custom-examples.md)

# Origini dati per SDK del Processore di funzionalità
<a name="feature-store-feature-processor-data-sources-sdk"></a>

L'Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) fornisce costrutti per caricare dati da gruppi di funzionalità o oggetti archiviati in Amazon S3. Per un elenco completo delle definizioni delle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

Per esempi su come utilizzare le definizioni delle origini dati dell'archivio funzionalità Python SDK, consulta [Esempio di codice di elaborazione della funzionalità per casi d'uso comuni](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource` viene utilizzato per specificare un gruppo di funzionalità come origine dati di input per un Processore di funzionalità. I dati possono essere caricati da un gruppo di funzionalità di un archivio offline. Il tentativo di caricare i dati da un gruppo di funzionalità di un archivio online genererà un errore di convalida. È possibile specificare gli offset di inizio e fine per limitare i dati caricati a un intervallo di tempo specifico. Ad esempio, puoi specificare un offset iniziale di '14 giorni' per caricare solo le ultime due settimane di dati e puoi anche specificare un offset finale di '7 giorni' per limitare l'input alla settimana precedente di dati.

## Definizioni delle origini dati fornite dall'archivio funzionalità
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

L'archivio funzionalità Python SDK contiene definizioni di origini dati che possono essere utilizzate per specificare varie origini dati di input per un Processore di funzionalità. Queste includono origini di tabella CSV, Parquet e Iceberg. Per un elenco completo delle definizioni delle origini dati fornite dall'archivio funzionalità, consulta [Origine dati del Processore di funzionalità dell'archivio funzionalità Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Origini dati personalizzate
<a name="feature-store-feature-processor-data-sources-custom"></a>

In questa pagina descriveremo come creare una classe di origine dati personalizzata e mostreremo alcuni esempi di utilizzo. Con le sorgenti dati personalizzate, puoi utilizzare l' SageMaker AI SDK for Python ( APIs Boto3) fornito nello stesso modo in cui utilizzi le sorgenti dati fornite da Amazon Feature Store. SageMaker 

Per utilizzare un'origine dati personalizzata per trasformare e inserire dati in un gruppo di funzionalità utilizzando l'elaborazione delle funzionalità, dovrai estendere la classe `PySparkDataSource` con i seguenti membri e funzioni della classe.
+ `data_source_name` (str): un nome arbitrario per l'origine dati. Ad esempio, Amazon Redshift, Snowflake o un ARN di Glue Catalog.
+ `data_source_unique_id`(str): un identificatore univoco che si riferisce alla risorsa specifica a cui si accede. Ad esempio, nome della tabella, ARN della tabella DDB, prefisso Amazon S3. Tutti gli utilizzi dello stesso `data_source_unique_id` nelle origini dati personalizzate verranno associati alla stessa origine dati nella visualizzazione della derivazione. La derivazione include informazioni sul codice di esecuzione di un flusso di lavoro di elaborazione delle funzionalità, sulle origini dati utilizzate e su come vengono inserite nel gruppo di funzionalità o nella funzionalità. Per informazioni sulla visualizzazione del lineage di un gruppo di funzionalità in **Studio**, consulta [Visualizzazione del lineage dalla console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data`(func): un metodo usato per connettersi con il processore di funzionalità. Restituisce un dataframe Spark. Per alcuni esempi, consulta [Esempi di origini dati personalizzate](feature-store-feature-processor-data-sources-custom-examples.md).

Entrambi `data_source_name` e `data_source_unique_id` vengono utilizzati per identificare in modo univoco l'entità di derivazione. Di seguito è riportato un esempio di una classe di origine dati personalizzata denominata `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Esempi di origini dati personalizzate
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Questa sezione fornisce esempi di implementazioni di origini dati personalizzate per il Processore di funzionalità. Per ulteriori informazioni sulle origini dati personalizzate, consulta [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md).

La sicurezza è una responsabilità condivisa tra i AWS nostri clienti. AWS è responsabile della protezione dell'infrastruttura che gestisce i servizi di Cloud AWS. I clienti sono responsabili di tutte le attività necessarie di configurazione e gestione della sicurezza. Ad esempio, segreti come le credenziali di accesso agli archivi dati non devono essere codificati nelle origini dati personalizzate. È possibile utilizzare Gestione dei segreti AWS per gestire queste credenziali. Per informazioni su Secrets Manager, vedi [Cos'è Gestione dei segreti AWS?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) nella guida Gestione dei segreti AWS per l'utente. I seguenti esempi utilizzeranno Secrets Manager per le tue credenziali.

**Topics**
+ [

## Esempi di origini dati personalizzate di cluster Amazon Redshift (JDBC)
](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [

## Esempi di origini dati personalizzate Snowflake
](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [

## Esempi di origini dati personalizzate Databricks (JDBC)
](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [

## Esempi di origini dati personalizzate di streaming
](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Esempi di origini dati personalizzate di cluster Amazon Redshift (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift offre un driver JDBC che può essere utilizzato per leggere i dati con Spark. Per informazioni su come scaricare il driver JDBC di Amazon Redshift, consulta [Scarica il driver JDBC di Amazon Redshift, versione 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Per creare la classe di origine dati personalizzata di Amazon Redshift, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md). 

Per connetterti a un cluster Amazon Redshift hai bisogno di:
+ URL JDBC di Amazon Redshift (`jdbc-url`)

  Per informazioni su come ottenere l'URL JDBC di Amazon Redshift, consulta la sezione [Ottenimento dell'URL JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ Nome utente (`redshift-user`) e password (`redshift-password`) di Amazon Redshift

  Per informazioni su come creare e gestire gli utenti del database utilizzando i comandi SQL di Amazon Redshift, consulta la sezione [Utenti](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ Nome della tabella (`redshift-table-name`) di Amazon Redshift

  Per informazioni su come creare una tabella con alcuni esempi, consulta la sezione [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) nella Guida per gli sviluppatori di database di Amazon Redshift.
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-redshift-account-info`) in cui archivi il nome utente e la password di accesso ad Amazon Redshift su Secrets Manager.

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare l'URL JDBC e il token di accesso personale da Secrets Manager e sovrascrivere `read_data` per la classe di origine dati personalizzata, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

L'esempio seguente mostra come connettere `RedshiftDataSource` al proprio decoratore `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire il driver jdbc definendo `SparkConfig` e passandolo al decoratore `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate Snowflake
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake fornisce un connettore Spark che può essere utilizzato per il decoratore `feature_processor`. Per informazioni sul connettore Snowflake per Spark, consulta [Connettore Snowflake per Spark](https://docs.snowflake.com/en/user-guide/spark-connector) nella documentazione di Snowflake.

Per creare la classe di origine dati personalizzata Snowflake, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md) e aggiungere i pacchetti del connettore Spark al classpath di Spark. 

Per connetterti a un'origine dati Snowflake hai bisogno di:
+ URL Snowflake (`sf-url`)

  Per informazioni sull' URLs accesso alle interfacce web di Snowflake, consulta gli [identificatori degli account](https://docs.snowflake.com/en/user-guide/admin-account-identifier) nella documentazione di Snowflake.
+ Database Snowflake (`sf-database`) 

  Per informazioni su come ottenere il nome del database utilizzando Snowflake, consulta [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) nella documentazione di Snowflake.
+ Schema del database Snowflake (`sf-schema`) 

  Per informazioni su come ottenere il nome dello schema utilizzando Snowflake, consulta [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) nella documentazione di Snowflake.
+ Warehouse Snowflake (`sf-warehouse`)

  Per informazioni su come ottenere il nome del warehouse utilizzando Snowflake, consulta [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) nella documentazione di Snowflake.
+ Nome della tabella Snowflake (`sf-table-name`)
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-snowflake-account-info`) in cui archivi il nome utente e la password di accesso a Snowflake su Secrets Manager. 

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare il nome utente e la password di Snowflake da Secrets Manager e sovrascrivere la funzione `read_data` per la classe di origine dati personalizzata `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

L'esempio seguente mostra come connettere `SnowflakeDataSource` al proprio decoratore `feature_processor`.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire i pacchetti tramite la definizione di `SparkConfig` e passarlo al decoratore `@remote`. I pacchetti Spark nell'esempio seguente sono tali che `spark-snowflake_2.12` è la versione Scala del Processore di funzionalità, `2.12.0` è la versione di Snowflake che desideri utilizzare e `spark_3.3` è la versione Spark del Processore di funzionalità. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark può leggere i dati da Databricks utilizzando il driver JDBC di Databricks. Per informazioni sul driver JDBC di Databricks, consulta [Configurazione dei driver ODBC e JDBC di Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) nella documentazione di Databricks.

**Nota**  
Puoi leggere i dati da qualsiasi altro database includendo il driver JDBC corrispondente nel classpath di Spark. Per ulteriori informazioni, consulta [Da JDBC ad altri database](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) nella Guida Spark SQL.

Per creare la classe di origine dati personalizzata Databricks, dovrai sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md) e aggiungere il file jar JDBC al classpath di Spark. 

Per connetterti a un'origine dati Databricks hai bisogno di:
+ URL di Databricks (`databricks-url`)

  Per informazioni sull'URL di Databricks, consulta [Creazione dell'URL di connessione per il driver di Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) nella documentazione di Databricks.
+ Token di accesso personale di Databricks (`personal-access-token`)

  Per informazioni sul token di accesso Databricks, consulta [Autenticazione con token di accesso personale di Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) nella documentazione di Databricks.
+ Nome del catalogo dati (`db-catalog`) 

  Per informazioni sul nome del catalogo di Databricks, consulta [Nome del catalogo](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) nella documentazione di Databricks.
+ Nome schema (`db-schema`)

  Per informazioni sul nome dello schema di Databricks, consulta [Nome dello schema](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) nella documentazione di Databricks.
+ Nome della tabella (`db-table-name`)

  Per informazioni sul nome della tabella di Databricks, consulta [Nome della tabella](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) nella documentazione di Databricks.
+ (Facoltativo) Se utilizzi Secrets Manager, avrai bisogno del nome segreto (`secret-databricks-account-info`) in cui archivi il nome utente e la password di accesso a Databricks su Secrets Manager. 

  Per informazioni su Secrets Manager, consulta [Find secrets Gestione dei segreti AWS in](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) the Gestione dei segreti AWS User Guide. 
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

L'esempio seguente illustra come recuperare l'URL JDBC e il token di accesso personale da Secrets Manager e sovrascrivere `read_data` per la classe di origine dati personalizzata, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

L'esempio seguente mostra come caricare il file jar del driver JDBC, `jdbc-jar-file-name.jar`, su Amazon S3 per aggiungerlo al classpath di Spark. Per informazioni sul download del driver JDBC di Spark (`jdbc-jar-file-name.jar`) da Databricks, consulta [Download del driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) nel sito Web di Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Per eseguire il processo del processore di funzionalità da remoto, è necessario fornire i file jar definendo `SparkConfig` e passandolo al decoratore `@remote`.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Esempi di origini dati personalizzate di streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

È possibile connettersi a origini dati di streaming come Amazon Kinesis e creare trasformazioni con Spark Structured Streaming per leggere da origini dati di streaming. Per informazioni sul connettore Kinesis, consulta Kinesis Connector [for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in. GitHub Per informazioni su Amazon Kinesis, consulta [Cos'è il flusso di dati Amazon Kinesis?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) nella Guida per gli sviluppatori di Amazon Kinesis.

Per creare la classe di origine dati personalizzata di Amazon Kinesis, dovrai estendere la classe `BaseDataSource` e sovrascrivere il metodo `read_data` da [Origini dati personalizzate](feature-store-feature-processor-data-sources-custom.md).

Per connetterti a un flusso di dati Amazon Kinesis, hai bisogno di:
+ ARN di Kinesis (`kinesis-resource-arn`) 

  Per informazioni sul flusso di dati Kinesis ARNs, consulta [Amazon Resource Names (ARNs) for Kinesis Data Streams nella Amazon Kinesis Developer](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) Guide.
+ Nome del flusso di dati Kinesis (`kinesis-stream-name`)
+ Regione AWS (`your-region`)

  Per informazioni su come ottenere il nome della Regione della sessione corrente utilizzando SDK per Python (Boto3), consulta [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) nella documentazione di Boto3.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

L'esempio seguente illustra come connettere `KinesisDataSource` al proprio decoratore `feature_processor`. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

Nel codice di esempio sopra riportato, utilizziamo alcune opzioni di Spark Structured Streaming durante lo streaming di micro-batch nel tuo gruppo di funzionalità. Per un elenco completo delle opzioni, consulta la [Guida di programmazione di Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) nella documentazione di Apache Spark. 
+ La modalità sink `foreachBatch` è una funzionalità che consente di applicare operazioni e scrivere logica sui dati di output di ogni micro-batch di una query di streaming. 

  Per informazioni su`foreachBatch`, consulta [Using Foreach e ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) nella Apache Spark Structured Streaming Programming Guide. 
+ L'opzione `checkpointLocation` salva periodicamente lo stato dell'applicazione di streaming. Il log di streaming viene salvato nella posizione di checkpoint `s3a://checkpoint-path`.

  Per informazioni sull'opzione `checkpointLocation`, consulta [Recupero dagli errori tramite esecuzione di checkpoint](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) nella Guida di programmazione di Apache Spark Structured Streaming. 
+ L'impostazione `trigger` definisce la frequenza con cui viene attivata l'elaborazione in micro-batch in un'applicazione di streaming. Nell'esempio, il tipo di trigger del tempo di elaborazione viene utilizzato con intervalli di micro-batch di un minuto, specificati da `trigger(processingTime="1 minute")`. Per effettuare il backfill da una sorgente di streaming, puoi utilizzare il tipo di trigger available-now, specificato da `trigger(availableNow=True)`.

  Per un elenco completo dei tipi di `trigger`, consulta [Trigger](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) nella Guida di programmazione di Apache Spark Structured Streaming.

**Streaming continuo e tentativi automatici utilizzando trigger basati su eventi**

Il Feature Processor utilizza SageMaker Training come infrastruttura di elaborazione e ha un limite di runtime massimo di 28 giorni. È possibile utilizzare i trigger basati su eventi per estendere lo streaming continuo per un periodo di tempo più lungo e recuperare in caso di errori temporanei. Per ulteriori informazioni sulle esecuzioni basate sulla pianificazione e sugli eventi, consulta [Esecuzioni pianificate e basate su eventi per le pipeline del Processore di funzionalità](feature-store-feature-processor-schedule-pipeline.md).

Di seguito è riportato un esempio di configurazione di un trigger basato su eventi per mantenere in esecuzione continua la pipeline del Processore di funzionalità di streaming. Usa la funzione di trasformazione di streaming definita nell'esempio precedente. Una pipeline di destinazione può essere configurata per essere attivata quando si verifica un evento `STOPPED` o `FAILED` per l'esecuzione di una pipeline di origine. Si noti che la stessa pipeline viene utilizzata come origine e destinazione in modo che sia esecuzione in modo continuo.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```