

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Sumber data
<a name="feature-store-feature-processor-data-sources"></a>

Amazon SageMaker Feature Store Feature Processing mendukung beberapa sumber data. Feature Processor SDK for Python (Boto3) menyediakan konstruksi untuk memuat data dari grup fitur atau objek yang disimpan di Amazon S3. Selain itu, Anda dapat membuat sumber data khusus untuk memuat data dari sumber data lain. Untuk informasi tentang sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

**Topics**
+ [

# Fitur Sumber data SDK Prosesor
](feature-store-feature-processor-data-sources-sdk.md)
+ [

# Sumber data kustom
](feature-store-feature-processor-data-sources-custom.md)
+ [

# Contoh sumber data kustom
](feature-store-feature-processor-data-sources-custom-examples.md)

# Fitur Sumber data SDK Prosesor
<a name="feature-store-feature-processor-data-sources-sdk"></a>

Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) menyediakan konstruksi untuk memuat data dari grup fitur atau objek yang disimpan di Amazon S3. Untuk daftar lengkap definisi sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

Untuk contoh tentang cara menggunakan definisi sumber data SDK Python Toko Fitur, lihat. [Contoh kode Pemrosesan Fitur untuk kasus penggunaan umum](feature-store-feature-processor-examples.md)

## FeatureGroupDataSource
<a name="feature-store-feature-processor-data-sources-sdk-featuregroup"></a>

`FeatureGroupDataSource`Ini digunakan untuk menentukan grup fitur sebagai sumber data input untuk Prosesor Fitur. Data dapat dimuat dari grup fitur toko offline. Mencoba memuat data Anda dari grup fitur toko online akan menghasilkan kesalahan validasi. Anda dapat menentukan offset awal dan akhir untuk membatasi data yang dimuat ke rentang waktu tertentu. Misalnya, Anda dapat menentukan offset awal '14 hari' untuk memuat hanya dua minggu terakhir data, dan Anda juga dapat menentukan offset akhir '7 hari' untuk membatasi input ke data minggu sebelumnya.

## Fitur Store menyediakan definisi sumber data
<a name="feature-store-feature-processor-data-sources-sdk-provided-sources"></a>

Feature Store Python SDK berisi definisi sumber data yang dapat digunakan untuk menentukan berbagai sumber data input untuk Prosesor Fitur. Ini termasuk sumber tabel CSV, Parket, dan Gunung Es. Untuk daftar lengkap definisi sumber data yang disediakan Toko Fitur, lihat [Sumber data Prosesor Fitur Feature Store Python](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py) SDK. 

# Sumber data kustom
<a name="feature-store-feature-processor-data-sources-custom"></a>

Pada halaman ini kita akan menjelaskan cara membuat kelas sumber data kustom dan menunjukkan beberapa contoh penggunaan. Dengan sumber data khusus, Anda dapat menggunakan SageMaker AI SDK for Python ( APIs Boto3) yang disediakan dengan cara yang sama seperti jika Anda menggunakan sumber data yang disediakan Amazon Feature Store. SageMaker 

Untuk menggunakan sumber data khusus untuk mengubah dan menyerap data ke dalam grup fitur menggunakan Pemrosesan Fitur, Anda perlu memperluas `PySparkDataSource` kelas dengan anggota dan fungsi kelas berikut.
+ `data_source_name`(str): nama arbitrer untuk sumber data. Misalnya, Amazon Redshift, Snowflake, atau Glue Catalog ARN.
+ `data_source_unique_id`(str): pengenal unik yang mengacu pada sumber daya tertentu yang diakses. Misalnya, nama tabel, DDB Tabel ARN, awalan Amazon S3. Semua penggunaan yang sama `data_source_unique_id` dalam sumber data kustom akan dikaitkan dengan sumber data yang sama dalam tampilan garis keturunan. Lineage mencakup informasi tentang kode eksekusi alur kerja pemrosesan fitur, sumber data apa yang digunakan, dan bagaimana mereka dimasukkan ke dalam grup fitur atau fitur. Untuk informasi tentang melihat silsilah grup fitur di **Studio**, lihat. [Lihat silsilah dari konsol](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio)
+ `read_data`(func): metode yang digunakan untuk terhubung dengan prosesor fitur. Mengembalikan frame data Spark. Sebagai contoh, lihat [Contoh sumber data kustom](feature-store-feature-processor-data-sources-custom-examples.md).

Keduanya `data_source_name` dan `data_source_unique_id` digunakan untuk mengidentifikasi entitas garis keturunan Anda secara unik. Berikut ini adalah contoh untuk kelas sumber data kustom bernama`CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Contoh sumber data kustom
<a name="feature-store-feature-processor-data-sources-custom-examples"></a>

Bagian ini memberikan contoh implementasi sumber data kustom untuk Prosesor Fitur. Untuk informasi selengkapnya tentang sumber data kustom, lihat[Sumber data kustom](feature-store-feature-processor-data-sources-custom.md).

Keamanan adalah tanggung jawab bersama antara AWS dan pelanggan kami. AWS bertanggung jawab untuk melindungi infrastruktur yang menjalankan layanan di AWS Cloud. Pelanggan bertanggung jawab atas semua konfigurasi keamanan dan tugas manajemen yang diperlukan. Misalnya, rahasia seperti kredensyal akses ke penyimpanan data tidak boleh dikodekan keras dalam sumber data kustom Anda. Anda dapat menggunakan AWS Secrets Manager untuk mengelola kredensional ini. Untuk informasi tentang Secrets Manager, lihat [Apa itu AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) dalam panduan AWS Secrets Manager pengguna. Contoh berikut akan menggunakan Secrets Manager untuk kredensialmu.

**Topics**
+ [

## Contoh sumber data kustom Amazon Redshift Clusters (JDBC)
](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [

## Contoh sumber data kustom kepingan salju
](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [

## Contoh sumber data kustom Databricks (JDBC)
](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [

## Contoh sumber data khusus streaming
](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Contoh sumber data kustom Amazon Redshift Clusters (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-redshift"></a>

Amazon Redshift menawarkan driver JDBC yang dapat digunakan untuk membaca data dengan Spark. Untuk informasi tentang cara mengunduh driver Amazon Redshift JDBC, lihat Mengunduh driver [Amazon Redshift JDBC, versi 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

Untuk membuat kelas sumber data Amazon Redshift kustom, Anda harus menimpa `read_data` metode dari. [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) 

Untuk terhubung dengan cluster Amazon Redshift, Anda memerlukan:
+ URL JDBC Amazon Redshift () `jdbc-url`

  Untuk informasi tentang mendapatkan URL Amazon Redshift JDBC, lihat [Mendapatkan URL JDBC di Panduan Pengembang Database](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) Amazon Redshift.
+ Nama pengguna Amazon Redshift (`redshift-user`) dan kata sandi () `redshift-password`

  Untuk informasi tentang cara membuat dan mengelola pengguna database menggunakan perintah Amazon Redshift SQL, lihat [Pengguna](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) di Panduan Pengembang Database Amazon Redshift.
+ Nama tabel Amazon Redshift () `redshift-table-name`

  Untuk informasi tentang cara membuat tabel dengan beberapa contoh, lihat [MEMBUAT TABEL di Panduan](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) Pengembang Database Amazon Redshift.
+ (Opsional) Jika menggunakan Secrets Manager, Anda memerlukan nama rahasia (`secret-redshift-account-info`) tempat menyimpan nama pengguna dan kata sandi akses Amazon Redshift di Secrets Manager.

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil URL JDBC dan token akses pribadi dari Secrets Manager dan mengganti `read_data` untuk kelas sumber data kustom Anda,. `DatabricksDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `RedshiftDataSource` ke `feature_processor` dekorator Anda.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan driver jdbc dengan mendefinisikan `SparkConfig` dan meneruskannya ke dekorator. `@remote`

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data kustom kepingan salju
<a name="feature-store-feature-processor-data-sources-custom-examples-snowflake"></a>

Snowflake menyediakan konektor Spark yang dapat digunakan untuk dekorator Anda. `feature_processor` Untuk informasi tentang konektor Snowflake untuk Spark, lihat Snowflake [Connector for Spark di dokumentasi Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector).

Untuk membuat kelas sumber data Snowflake kustom, Anda harus mengganti `read_data` metode dari [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) dan menambahkan paket konektor Spark ke classpath Spark. 

Untuk terhubung dengan sumber data Snowflake yang Anda butuhkan:
+ URL kepingan salju () `sf-url`

  Untuk informasi tentang cara URLs mengakses antarmuka web Snowflake, lihat [Pengenal Akun](https://docs.snowflake.com/en/user-guide/admin-account-identifier) di dokumentasi Snowflake.
+ Database kepingan salju () `sf-database` 

  Untuk informasi tentang mendapatkan nama database Anda menggunakan Snowflake, lihat [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) dalam dokumentasi Snowflake.
+ Skema basis data kepingan salju () `sf-schema` 

  Untuk informasi tentang mendapatkan nama skema Anda menggunakan Snowflake, lihat [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) di dokumentasi Snowflake.
+ Gudang kepingan salju () `sf-warehouse`

  Untuk informasi tentang mendapatkan nama gudang Anda menggunakan Snowflake, lihat [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) di dokumentasi Snowflake.
+ Nama tabel kepingan salju () `sf-table-name`
+ (Opsional) Jika menggunakan Secrets Manager, Anda akan memerlukan nama rahasia (`secret-snowflake-account-info`) tempat Anda menyimpan nama pengguna dan kata sandi akses Snowflake di Secrets Manager. 

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil nama pengguna dan kata sandi Snowflake dari Secrets Manager dan mengganti `read_data` fungsi untuk kelas sumber data kustom Anda. `SnowflakeDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `SnowflakeDataSource` ke `feature_processor` dekorator Anda.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan paket melalui mendefinisikan `SparkConfig` dan meneruskannya ke `@remote` dekorator. Paket Spark dalam contoh berikut sedemikian rupa sehingga `spark-snowflake_2.12` merupakan versi Feature Processor Scala, `2.12.0` adalah versi Snowflake yang ingin Anda gunakan, dan `spark_3.3` merupakan versi Feature Processor Spark. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data kustom Databricks (JDBC)
<a name="feature-store-feature-processor-data-sources-custom-examples-databricks"></a>

Spark dapat membaca data dari Databricks dengan menggunakan driver Databricks JDBC. Untuk informasi tentang driver JDBC Databricks, lihat [Mengkonfigurasi driver Databricks ODBC dan JDBC dalam dokumentasi Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers).

**catatan**  
Anda dapat membaca data dari database lain dengan memasukkan driver JDBC yang sesuai di Spark classpath. Untuk informasi selengkapnya, lihat [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) di Spark SQL Guide.

Untuk membuat kelas sumber data Databricks kustom, Anda harus mengganti `read_data` metode dari [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md) dan menambahkan jar JDBC ke classpath Spark. 

Untuk terhubung dengan sumber data Databricks yang Anda butuhkan:
+ URL Databricks () `databricks-url`

  Untuk informasi tentang URL Databricks Anda, lihat [Membangun URL koneksi untuk driver Databricks dalam dokumentasi Databricks](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver).
+ Databricks token akses pribadi () `personal-access-token`

  Untuk informasi tentang token akses Databricks Anda, lihat [otentikasi token akses pribadi Databricks](https://docs.databricks.com/en/dev-tools/auth.html#pat) dalam dokumentasi Databricks.
+ Nama katalog data (`db-catalog`) 

  Untuk informasi tentang nama katalog Databricks Anda, lihat [Nama katalog](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) dalam dokumentasi Databricks.
+ Nama skema () `db-schema`

  [Untuk informasi tentang nama skema Databricks Anda, lihat Nama skema dalam dokumentasi Databricks.](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name)
+ Nama tabel (`db-table-name`)

  Untuk informasi tentang nama tabel Databricks Anda, lihat [Nama tabel](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) dalam dokumentasi Databricks.
+ (Opsional) Jika menggunakan Secrets Manager, Anda memerlukan nama rahasia (`secret-databricks-account-info`) tempat menyimpan nama pengguna dan kata sandi akses Databricks di Secrets Manager. 

  Untuk informasi tentang Secrets Manager, lihat [Menemukan rahasia AWS Secrets Manager di](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) Panduan AWS Secrets Manager Pengguna. 
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

Contoh berikut menunjukkan cara mengambil URL JDBC dan token akses pribadi dari Secrets Manager dan menimpa `read_data` untuk kelas sumber data kustom Anda,. `DatabricksDataSource`

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

Contoh berikut menunjukkan cara mengunggah jar driver JDBC,`jdbc-jar-file-name.jar`, ke Amazon S3 untuk menambahkannya ke classpath Spark. Untuk informasi tentang mengunduh driver Spark JDBC (`jdbc-jar-file-name.jar`) dari Databricks, lihat [Mengunduh Driver JDBC](https://www.databricks.com/spark/jdbc-drivers-download) di situs web Databricks.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

Untuk menjalankan pekerjaan prosesor fitur dari jarak jauh, Anda perlu menyediakan stoples dengan mendefinisikan `SparkConfig` dan meneruskannya ke dekorator. `@remote`

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Contoh sumber data khusus streaming
<a name="feature-store-feature-processor-data-sources-custom-examples-streaming"></a>

Anda dapat terhubung ke sumber data streaming seperti Amazon Kinesis, dan penulis mengubah dengan Spark Structured Streaming untuk membaca dari sumber data streaming. Untuk informasi tentang konektor Kinesis, lihat Konektor [Kinesis untuk Streaming Terstruktur Spark](https://github.com/roncemer/spark-sql-kinesis) di. GitHub Untuk informasi tentang Amazon Kinesis, lihat [Apa Itu Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) Data Streams? di Panduan Pengembang Amazon Kinesis.

Untuk membuat kelas sumber data Amazon Kinesis kustom, Anda perlu memperluas `BaseDataSource` kelas dan mengganti metode dari`read_data`. [Sumber data kustom](feature-store-feature-processor-data-sources-custom.md)

Untuk terhubung ke aliran data Amazon Kinesis, Anda memerlukan:
+ Kinesis ARN () `kinesis-resource-arn` 

  Untuk informasi tentang aliran data Kinesis ARNs, lihat [Amazon Resource Names (ARNs) untuk Kinesis Data Streams di Panduan Pengembang Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format).
+ Nama aliran data Kinesis () `kinesis-stream-name`
+ Wilayah AWS (`your-region`)

  [Untuk informasi tentang mendapatkan nama wilayah sesi Anda saat ini menggunakan SDK for Python (Boto3), lihat region\$1name dalam dokumentasi Boto3.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name)

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

Contoh berikut menunjukkan bagaimana menghubungkan `KinesisDataSource` ke `feature_processor` dekorator Anda. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

Dalam contoh kode di atas, kami menggunakan beberapa opsi Streaming Terstruktur Spark saat mengalirkan batch mikro ke grup fitur Anda. Untuk daftar lengkap opsi, lihat [Panduan Pemrograman Streaming Terstruktur](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) di dokumentasi Apache Spark. 
+ Mode `foreachBatch` wastafel adalah fitur yang memungkinkan Anda menerapkan operasi dan menulis logika pada data keluaran setiap batch mikro dari kueri streaming. 

  Untuk informasi tentang`foreachBatch`, lihat [Menggunakan Foreach dan ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) di Panduan Pemrograman Streaming Terstruktur Apache Spark. 
+ `checkpointLocation`Opsi ini secara berkala menyimpan keadaan aplikasi streaming. Log streaming disimpan di lokasi `s3a://checkpoint-path` pos pemeriksaan.

  Untuk informasi tentang `checkpointLocation` opsi, lihat [Memulihkan dari Kegagalan dengan Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) di Panduan Pemrograman Streaming Terstruktur Apache Spark. 
+ `trigger`Pengaturan menentukan seberapa sering pemrosesan batch mikro dipicu dalam aplikasi streaming. Dalam contoh, jenis pemicu waktu pemrosesan digunakan dengan interval batch mikro satu menit, yang ditentukan oleh. `trigger(processingTime="1 minute")` Untuk mengisi ulang dari sumber aliran, Anda dapat menggunakan tipe pemicu yang tersedia sekarang, yang ditentukan oleh. `trigger(availableNow=True)`

  Untuk daftar lengkap `trigger` jenis, lihat [Pemicu](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) dalam Panduan Pemrograman Streaming Terstruktur Apache Spark.

**Streaming berkelanjutan dan percobaan ulang otomatis menggunakan pemicu berbasis peristiwa**

Prosesor Fitur menggunakan SageMaker Pelatihan sebagai infrastruktur komputasi dan memiliki batas waktu proses maksimum 28 hari. Anda dapat menggunakan pemicu berbasis peristiwa untuk memperpanjang streaming berkelanjutan Anda untuk jangka waktu yang lebih lama dan pulih dari kegagalan sementara. Untuk informasi selengkapnya tentang eksekusi berdasarkan jadwal dan acara, lihat[Eksekusi terjadwal dan berbasis acara untuk pipeline Prosesor Fitur](feature-store-feature-processor-schedule-pipeline.md).

Berikut ini adalah contoh pengaturan pemicu berbasis peristiwa untuk menjaga saluran Prosesor Fitur streaming tetap berjalan terus menerus. Ini menggunakan fungsi transformasi streaming yang didefinisikan dalam contoh sebelumnya. Pipeline target dapat dikonfigurasi untuk dipicu ketika `FAILED` peristiwa `STOPPED` atau terjadi untuk eksekusi pipeline sumber. Perhatikan bahwa pipeline yang sama digunakan sebagai sumber dan target sehingga berjalan terus menerus.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```