

# Feature Processing


Amazon SageMaker Feature Store Feature Processing is a capability with which you can transform raw data into machine learning (ML) features. It provides you with a Feature Processor SDK with which you can transform and ingest data from batch data sources into your feature groups. With this capability, Feature Store takes care of the underlying infrastructure including provisioning the compute environments and creating and maintaining Pipelines to load and ingest data. This way you can focus on your feature processor definitions that includes a transformation function (for example, count of product views, mean of transaction value), sources (where to apply this transformation on), and sinks (where to write the computed feature values to).

Feature Processor pipeline is a Pipelines pipeline. As a Pipelines, you can also track scheduled Feature Processor pipelines with SageMaker AI lineage in the console. For more information on SageMaker AI Lineage, see [Amazon SageMaker ML Lineage Tracking](lineage-tracking.md) This includes tracking scheduled executions, visualizing lineage to trace features back to their data sources, and viewing shared feature processors in a single environment. For information on using Feature Store with the console, see [View pipeline executions from the console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

**Topics**
+ [

# Feature Store Feature Processor SDK
](feature-store-feature-processor-sdk.md)
+ [

# Running Feature Store Feature Processor remotely
](feature-store-feature-processor-execute-remotely.md)
+ [

# Creating and running Feature Store Feature Processor pipelines
](feature-store-feature-processor-create-execute-pipeline.md)
+ [

# Scheduled and event based executions for Feature Processor pipelines
](feature-store-feature-processor-schedule-pipeline.md)
+ [

# Monitor Amazon SageMaker Feature Store Feature Processor pipelines
](feature-store-feature-processor-monitor-pipeline.md)
+ [

# IAM permissions and execution roles
](feature-store-feature-processor-iam-permissions.md)
+ [

# Feature Processor restrictions, limits, and quotas
](feature-store-feature-processor-quotas.md)
+ [

# Data sources
](feature-store-feature-processor-data-sources.md)
+ [

# Example Feature Processing code for common use cases
](feature-store-feature-processor-examples.md)

# Feature Store Feature Processor SDK


Declare a Feature Store Feature Processor definition by decorating your transformation functions with the `@feature_processor` decorator. The SageMaker AI SDK for Python (Boto3) automatically loads data from the configured input data sources, applies the decorated transformation function, and then ingests the transformed data to a target feature group. Decorated transformation functions must conform to the expected signature of the `@feature_processor` decorator. For more information about the `@feature_processor` decorator, see [@feature\$1processor Decorator](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-decorator) in the Amazon SageMaker Feature Store Read the Docs. 

With the `@feature_processor` decorator, your transformation function runs in a Spark runtime environment where the input arguments provided to your function and its return value are Spark DataFrames. The number of input parameters in your transformation function must match the number of inputs configured in the `@feature_processor` decorator. 

For more information on the `@feature_processor` decorator, see the [Feature Processor Feature Store SDK for Python (Boto3)](https://github.com/aws/sagemaker-python-sdk/tree/master/src/sagemaker/feature_store/feature_processor).

The following code are basic examples on how to use the `@feature_processor` decorator. For more specific example usage cases, see [Example Feature Processing code for common use cases](feature-store-feature-processor-examples.md).

The Feature Processor SDK can be installed from the SageMaker Python SDK and its extras using the following command. 

```
pip install sagemaker[feature-processor]
```

In the following examples, `us-east-1` is the region of the resource, `111122223333` is the resource owner account ID, and `your-feature-group-name` is the feature group name.

The following is a basic feature processor definition, where the `@feature_processor` decorator configures a CSV input from Amazon S3 to be loaded and provided to your transformation function (for example, `transform`), and prepares it for ingestion to a feature group. The last line runs it.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

The `@feature_processor` parameters include:
+ `inputs` (List[str]): A list of data sources that are used in your Feature Store Feature Processor. If your data sources are feature groups or stored in Amazon S3 you may be able to use Feature Store provided data source definitions for feature processor. For a full list of Feature Store provided data source definitions, see the [Feature Processor Data Source](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#feature-processor-data-source) in the Amazon SageMaker Feature Store Read the Docs.
+ `output` (str): The ARN of the feature group to ingest the output of the decorated function.
+ `target_stores` (Optional[List[str]]): A list of stores (for example, `OnlineStore` or `OfflineStore`) to ingest to the output. If unspecified, data is ingested to all of the output feature group’s enabled stores.
+ `parameters` (Dict[str, Any]): A dictionary to be provided to your transformation function. 
+ `enable_ingestion` (bool): A flag to indicate whether the transformation function’s outputs are ingested to the output feature group. This flag is useful during the development phase. If unspecified, ingestion is enabled.

Optional wrapped function parameters (provided as an argument if provided in the function signature) include:
+ `params` (Dict[str, Any]): The dictionary defined in the `@feature_processor` parameters. It also contains system configured parameters that can be referenced with the key `system`, such as the `scheduled_time` parameter.
+ `spark` (SparkSession): A reference to the SparkSession instance initialized for the Spark Application.

The following code is an example of using the `params` and `spark` parameters.

```
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://your-bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name' 

@feature_processor(inputs=[CSV_DATA_SOURCE], output=OUTPUT_FG)
def transform(csv_input_df, params, spark):
   
   scheduled_time = params['system']['scheduled_time']
   csv_input_df.createOrReplaceTempView('csv_input_df')
   return spark.sql(f'''
        SELECT *
        FROM csv_input_df
        WHERE date_add(event_time, 1) >= {scheduled_time}
   ''')
   
transform()
```

The `scheduled_time` system parameter (provided in the `params` argument to your function) is an important value to support retrying each execution. The value can help to uniquely identify the Feature Processor’s execution and can be used as a reference point for daterange–based inputs (for example, only loading the last 24 hours worth of data) to guarantee the input range independent of the code’s actual execution time. If the Feature Processor runs on a schedule (see [Scheduled and event based executions for Feature Processor pipelines](feature-store-feature-processor-schedule-pipeline.md)) then its value is fixed to the time it is scheduled to run. The argument can be overridden during synchronous execution using the SDK’s execute API to support use cases such as data backfills or re-running a missed past execution. Its value is the current time if the Feature Processor runs any other way.

For information about authoring Spark code, see the [ Spark SQL Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html).

For more code samples for common use-cases, see the [Example Feature Processing code for common use cases](feature-store-feature-processor-examples.md). 

Note that transformation functions decorated with `@feature_processor` do not return a value. To programmatically test your function, you can remove or monkey patch the `@feature_processor` decorator such that it acts as a pass-through to the wrapped function. For more details on the `@feature_processor` decorator, see [Amazon SageMaker Feature Store Python SDK](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_featurestore.html). 

# Running Feature Store Feature Processor remotely


To run your Feature Processors on large data sets that require hardware more powerful than what is locally available, you can decorate your code with the `@remote` decorator to run your local Python code as a single or multi-node distributed SageMaker training job. For more information on running your code as a SageMaker training job, see [Run your local code as a SageMaker training job](train-remote-decorator.md). 

The following is a usage example of the `@remote` decorator along with the `@feature_processor` decorator.

```
from sagemaker.remote_function.spark_config import SparkConfig
from sagemaker.remote_function import remote
from sagemaker.feature_store.feature_processor import CSVDataSource, feature_processor

CSV_DATA_SOURCE = CSVDataSource('s3://bucket/prefix-to-csv/')
OUTPUT_FG = 'arn:aws:sagemaker:us-east-1:123456789012:feature-group/feature-group'

@remote(
    spark_config=SparkConfig(), 
    instance_type="ml.m5.2xlarge",
    dependencies="/local/requirements.txt"
)
@feature_processor(
    inputs=[CSV_DATA_SOURCE], 
    output=OUTPUT_FG,
)
def transform(csv_input_df):
   return csv_input_df
   
transform()
```

The `spark_config` parameter indicates that the remote job runs as a Spark application. The `SparkConfig` instance can be used to configure the Spark Configuration and provide additional dependencies to the Spark application such as Python files, JARs, and files.

For faster iterations when developing your feature processing code, you can specify the `keep_alive_period_in_seconds` argument in the `@remote` decorator to retain configured resources in a warm pool for subsequent training jobs. For more information on warm pools, see `[KeepAlivePeriodInSeconds](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ResourceConfig.html#sagemaker-Type-ResourceConfig-KeepAlivePeriodInSeconds)` in the API Reference guide.

The following code is an example of local `requirements.txt:`

```
sagemaker>=2.167.0
```

This will install the corresponding SageMaker SDK version in remote job which is required for executing the method annotated by `@feature-processor`. 

# Creating and running Feature Store Feature Processor pipelines


The Feature Processor SDK provides APIs to promote your Feature Processor Definitions into a fully managed SageMaker AI Pipeline. For more information on Pipelines, see [Pipelines overview](pipelines-overview.md). To convert your Feature Processor Definitions in to a SageMaker AI Pipeline, use the `to_pipeline` API with your Feature Processor definition. You can schedule executions of your Feature Processor Definition can be scheduled, operationally monitor them with CloudWatch metrics, and integrate them with EventBridge to act as event sources or subscribers. For more information about monitoring pipelines created with Pipelines, see [Monitor Amazon SageMaker Feature Store Feature Processor pipelines](feature-store-feature-processor-monitor-pipeline.md).

To view your Feature Processor pipelines, see [View pipeline executions from the console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-executions-studio).

If your function is also decorated with the `@remote` decorator, then its configurations is carried over to the Feature Processor pipeline. You can specify advanced configurations such as compute instance type and count, runtime dependencies, network and security configurations using the `@remote` decorator.

The following example uses the `to_pipeline` and `execute` APIs.

```
from sagemaker.feature_store.feature_processor import (
    execute, to_pipeline, describe, TransformationCode
)

pipeline_name="feature-processor-pipeline"
pipeline_arn = to_pipeline(
    pipeline_name=pipeline_name,
    step=transform,
    transformation_code=TransformationCode(s3_uri="s3://bucket/prefix"),
)

pipeline_execution_arn = execute(
    pipeline_name=pipeline_name
)
```

The `to_pipeline` API is semantically an upsert operation. It updates the pipeline if it already exists; otherwise, it creates a pipeline.

The `to_pipeline` API optionally accepts an Amazon S3 URI that references a file containing the Feature Processor definition to associate it with the Feature Processor pipeline to track the transformation function and its versions in its SageMaker AI machine learning lineage.

To retrieve a list of every Feature Processor pipeline in your account, you can use the `list_pipelines` API. A subsequent request to the `describe` API returns details related to the Feature Processor pipeline including, but not limited to, Pipelines and schedule details.

The following example uses the `list_pipelines` and `describe` APIs.

```
from sagemaker.feature_store.feature_processor import list_pipelines, describe

feature_processor_pipelines = list_pipelines()

pipeline_description = describe(
    pipeline_name = feature_processor_pipelines[0]
)
```

# Scheduled and event based executions for Feature Processor pipelines


Amazon SageMaker Feature Store Feature Processing pipeline executions can be configured to start automatically and asynchronously based on a preconfigured schedule or as a result of another AWS service event. For example, you can schedule Feature Processing pipelines to execute on the first of every month or chain two pipelines together so that a target pipeline is executed automatically after a source pipeline execution completes.

**Topics**
+ [

## Schedule based executions
](#feature-store-feature-processor-schedule-pipeline-schedule-based)
+ [

## Event based executions
](#feature-store-feature-processor-schedule-pipeline-event-based)

## Schedule based executions


The Feature Processor SDK provides a [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API to run Feature Processor pipelines on a recurring basis with Amazon EventBridge Scheduler integration. The schedule can be specified with an `at`, `rate`, or `cron` expression using the [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression) parameter with the same expressions supported by Amazon EventBridge. The schedule API is semantically an upsert operation in that it updates the schedule if it already exists; otherwise, it creates it. For more information on the EventBridge expressions and examples, see [Schedule types on EventBridge Scheduler](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html) in the EventBridge Scheduler User Guide.

The following examples use the Feature Processor [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API, using the `at`, `rate`, and `cron` expressions.

```
from sagemaker.feature_store.feature_processor import schedule
pipeline_name='feature-processor-pipeline'

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="at(2020-11-30T00:00:00)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="rate(24 hours)"
)

event_bridge_schedule_arn = schedule(
    pipeline_name=pipeline_name, 
    schedule_expression="cron(0 0-23/1 ? * * 2023-2024)"
)
```

The default timezone for date and time inputs in the `schedule` API are in UTC. For more information about EventBridge Scheduler schedule expressions, see [https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression](https://docs.aws.amazon.com/scheduler/latest/APIReference/API_CreateSchedule.html#scheduler-CreateSchedule-request-ScheduleExpression) in the EventBridge Scheduler API Reference documentation.

Scheduled Feature Processor pipeline executions provide your transformation function with the scheduled execution time, to be used as an idempotency token or a fixed reference point for date range–based inputs. To disable (i.e., pause) or re-enable a schedule, use the `state` parameter of the [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.schedule) API with `‘DISABLED’` or `‘ENABLED’`, respectively.

For information about Feature Processor, see [Feature Processor SDK data sources](feature-store-feature-processor-data-sources-sdk.md). 

## Event based executions


A Feature Processing pipeline can be configured to automatically execute when an AWS event occurs. The Feature Processing SDK provides a [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.put_trigger) function that accepts a list of source events and a target pipeline. The source events must be instances of [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent), that specifies a pipeline and [execution status](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html#sagemaker-DescribePipelineExecution-response-PipelineExecutionStatus) events. 

The `put_trigger` function configures an Amazon EventBridge rule and target to route events and allows you to specify an EventBridge event pattern to respond to any AWS event. For information on these concepts, see Amazon EventBridge [rules](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-rules.html), [targets](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-targets.html), and [event patterns](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-event-patterns.html).

Triggers can be enabled or disabled. EventBridge will start a target pipeline execution using the role provided in the `role_arn` parameter of the `put_trigger` API. The execution role is used by default if the SDK is used in a Amazon SageMaker Studio Classic or Notebook environment. For information on how to get your execution role, see [Get your execution role](sagemaker-roles.md#sagemaker-roles-get-execution-role).

The following example sets up:
+ A SageMaker AI Pipeline using the `to_pipeline` API, that takes in your target pipeline name (`target-pipeline`) and your transformation function (`transform`). For information on your Feature Processor and transform function, see [Feature Processor SDK data sources](feature-store-feature-processor-data-sources-sdk.md).
+ A trigger using the `put_trigger` API, that takes in `FeatureProcessorPipelineEvent` for the event and your target pipeline name (`target-pipeline`). 

  The `FeatureProcessorPipelineEvent` defines the trigger for when the status of your source pipeline (`source-pipeline`) becomes `Succeeded`. For information on the Feature Processor Pipeline event function, see [https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent](https://sagemaker.readthedocs.io/en/stable/api/prep_data/feature_store.html#sagemaker.feature_store.feature_processor.FeatureProcessorPipelineEvent) in the Feature Store Read the Docs. 

```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent

to_pipeline(pipeline_name="target-pipeline", step=transform)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name="source-pipeline",
            status=["Succeeded"]
        )
    ],
    target_pipeline="target-pipeline"
)
```

For an example of using event based triggers to create continuous executions and automatic retries for your Feature Processor pipeline, see [Continuous executions and automatic retries using event based triggers](feature-store-feature-processor-examples.md#feature-store-feature-processor-examples-continuous-execution-automatic-retries).

For an example of using event based triggers to create continuous *streaming* and automatic retries using event based triggers, see [Streaming custom data source examples](feature-store-feature-processor-data-sources-custom-examples.md#feature-store-feature-processor-data-sources-custom-examples-streaming). 

# Monitor Amazon SageMaker Feature Store Feature Processor pipelines


AWS provides monitoring tools to watch your Amazon SageMaker AI resources and applications in real time, report when something goes wrong, and take automatic actions when appropriate. Feature Store Feature Processor pipelines are Pipelines, so the standard monitoring mechanisms and integrations are available. Operational metrics such as execution failures can be monitored via Amazon CloudWatch metrics and Amazon EventBridge events. 

For more information on how to monitor and operationalize Feature Store Feature Processor, see the following resources:
+ [Monitoring AWS resources in Amazon SageMaker AI](monitoring-overview.md) - General guidance on monitoring and auditing activity for SageMaker AI resources.
+ [SageMaker pipelines metrics](monitoring-cloudwatch.md#cloudwatch-metrics-pipelines) - CloudWatch Metrics emitted by Pipelines.
+ [SageMaker pipeline execution state change](automating-sagemaker-with-eventbridge.md#eventbridge-pipeline) - EventBridge events emitted for Pipelines and executions.
+ [Troubleshooting Amazon SageMaker Pipelines](pipelines-troubleshooting.md) - General debugging and troubleshooting tips for Pipelines.

Feature Store Feature Processor execution logs can be found in Amazon CloudWatch Logs under the `/aws/sagemaker/TrainingJobs` log group, where you can find the execution log streams using lookup conventions. For executions created by directly invoking the `@feature_processor` decorated function, you can find logs in your local execution environment’s console. For` @remote` decorated executions, the CloudWatch Logs stream name contains the name of the function and the execution timestamp. For Feature Processor pipeline executions, the CloudWatch Logs stream for the step contains the `feature-processor` string and the pipeline execution ID.

Feature Store Feature Processor pipelines and recent execution statuses can be found in Amazon SageMaker Studio Classic for a given feature group in the Feature Store UI. Feature groups related to the Feature Processor pipelines as either inputs or outputs are displayed in the UI. In addition, the lineage view can provide context into upstream executions, such as data producing Feature Processor pipelines and data sources, for further debugging. For more information on using the lineage view using Studio Classic, see [View lineage from the console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).

# IAM permissions and execution roles


To use the The Amazon SageMaker Python SDK requires permissions to interact with AWS services. The following policies are required for full Feature Processor functionality. You can attach the [AmazonSageMakerFullAccess](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AmazonSageMakerFullAccess.html) and [AmazonEventBridgeSchedulerFullAccess](https://docs.aws.amazon.com/scheduler/latest/UserGuide/security_iam_id-based-policy-examples.html#security_iam_id-based-policies-managed-policies) AWS Managed Policies attached to your IAM role. For information on attaching policies to your IAM role, see [Adding policies to your IAM role](feature-store-adding-policies.md). See the following examples for details.

The trust policy of the role to which this policy is applied must allow the "scheduler.amazonaws.com", "sagemaker.amazonaws.com", and "glue.amazonaws.com" principles.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com",
                    "glue.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

# Feature Processor restrictions, limits, and quotas


Amazon SageMaker Feature Store Feature Processing relies on SageMaker AI machine learning (ML) lineage tracking. The Feature Store Feature Processor uses lineage contexts to represent and track Feature Processing Pipelines and Pipeline versions. Each Feature Store Feature Processor consumes at least two lineage contexts (one for the Feature Processing Pipeline and another for the version). If the input or output data source of a Feature Processing Pipeline changes, an additional lineage context is created. You can update SageMaker AI ML lineage limits by reaching out to AWS support for a limit increase. Default limits for resources used by Feature Store Feature Processor are as follows. For information on SageMaker AI ML lineage tracking, see [Amazon SageMaker ML Lineage Tracking](lineage-tracking.md).

For more information on SageMaker AI quotas, see [Amazon SageMaker AI endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/sagemaker.html).

Lineage limits per Region
+ Contexts – 500 (soft limit)
+ Artifacts – 6,000 (soft limit)
+ Associations – 6,000 (soft limit)

Training Limits per Region
+ Longest run time for a training job – 432,000 seconds
+ Maximum number of instances per training job – 20
+ The maximum number of `CreateTrainingJob` requests that you can make, per second, in this account in the current Region – 1 TPS
+ Keep alive period for cluster reuse – 3,600 seconds

Maximum number of Pipelines and concurrent pipeline executions per Region
+ Maximum number of pipelines allowed per account – 500
+ Maximum number of concurrent pipeline executions allowed per account – 20
+ Time at which pipeline executions time out – 672 hours

# Data sources


Amazon SageMaker Feature Store Feature Processing supports multiple data sources. The Feature Processor SDK for Python (Boto3) provides constructs to load data from feature groups or objects stored in Amazon S3. In addition, you can author custom data sources to load data from other data sources. For information about Feature Store provided data sources, see [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

**Topics**
+ [

# Feature Processor SDK data sources
](feature-store-feature-processor-data-sources-sdk.md)
+ [

# Custom data sources
](feature-store-feature-processor-data-sources-custom.md)
+ [

# Custom data source examples
](feature-store-feature-processor-data-sources-custom-examples.md)

# Feature Processor SDK data sources


The Amazon SageMaker Feature Store Feature Processor SDK for Python (Boto3) provides constructs to load data from feature groups or objects stored in Amazon S3. For a full list of Feature Store provided data source definitions, see the [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

For examples on how to use the Feature Store Python SDK data source definitions, see [Example Feature Processing code for common use cases](feature-store-feature-processor-examples.md).

## FeatureGroupDataSource


The `FeatureGroupDataSource` is used to specify a feature group as an input data source for a Feature Processor. Data can be loaded from an offline store feature group. Attempting to load your data from an online store feature group will result in a validation error. You can specify start and end offsets to limit the data that is loaded to a specific time range. For example, you can specify a start offset of ‘14 days' to load only the last two weeks of data, and you can additionally specify an end offset of '7 days' to limit the input to the previous week of data.

## Feature Store provided data source definitions


The Feature Store Python SDK contain data source definitions that can be used to specify various input data sources for a Feature Processor. These include CSV, Parquet, and Iceberg table sources. For a full list of Feature Store provided data source definitions, see the [Feature Processor data source Feature Store Python SDK](https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/feature_store/feature_processor/_data_source.py). 

# Custom data sources


On this page we will describe how to create a custom data source class and show some usage examples. With custom data sources, you can use the SageMaker AI SDK for Python (Boto3) provided APIs in the same way as if you are using Amazon SageMaker Feature Store provided data sources. 

To use a custom data source to transform and ingest data into a feature group using Feature Processing, you will need to extend the `PySparkDataSource` class with the following class members and function.
+ `data_source_name` (str): an arbitrary name for the data source. For example, Amazon Redshift, Snowflake, or a Glue Catalog ARN.
+ `data_source_unique_id` (str): a unique identifier that refers to the specific resource being accessed. For example, table name, DDB Table ARN, Amazon S3 prefix. All usage of the same `data_source_unique_id` in custom data sources will be associated to the same data source in the lineage view. Lineage includes information about the execution code of a feature processing workflow, what data sources were used, and how they are ingested into the feature group or feature. For information about viewing lineage of a feature group in **Studio**, see [View lineage from the console](feature-store-use-with-studio.md#feature-store-view-feature-processor-pipeline-lineage-studio).
+ `read_data` (func): a method used to connect with the feature processor. Returns a Spark data frame. For examples, see [Custom data source examples](feature-store-feature-processor-data-sources-custom-examples.md).

Both `data_source_name` and `data_source_unique_id` are used to uniquely identify your lineage entity. The following is an example for a custom data source class named `CustomDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from pyspark.sql import DataFrame

class CustomDataSource(PySparkDataSource):
    
    data_source_name = "custom-data-source-name"
    data_source_unique_id = "custom-data-source-id"
    
    def read_data(self, parameter, spark) -> DataFrame:
        your own code here to read data into a Spark dataframe
        return dataframe
```

# Custom data source examples


This section provides examples of custom data sources implementations for Feature Processors. For more information on custom data sources, see [Custom data sources](feature-store-feature-processor-data-sources-custom.md).

Security is a shared responsibility between AWS and our customers. AWS is responsible for protecting the infrastructure that runs the services in the AWS Cloud. Customers are responsible for all of their necessary security configuration and management tasks. For example, secrets such as access credentials to data stores should not be hard coded in your custom data sources. You can use AWS Secrets Manager to manage these credentials. For information about Secrets Manager, see [What is AWS Secrets Manager?](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) in the AWS Secrets Manager user guide. The following examples will use Secrets Manager for your credentials.

**Topics**
+ [

## Amazon Redshift Clusters (JDBC) custom data source examples
](#feature-store-feature-processor-data-sources-custom-examples-redshift)
+ [

## Snowflake custom data source examples
](#feature-store-feature-processor-data-sources-custom-examples-snowflake)
+ [

## Databricks (JDBC) custom data source examples
](#feature-store-feature-processor-data-sources-custom-examples-databricks)
+ [

## Streaming custom data source examples
](#feature-store-feature-processor-data-sources-custom-examples-streaming)

## Amazon Redshift Clusters (JDBC) custom data source examples


Amazon Redshift offers a JDBC driver that can be used to read data with Spark. For information about how to download the Amazon Redshift JDBC driver, see [Download the Amazon Redshift JDBC driver, version 2.1](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html). 

To create the custom Amazon Redshift data source class, you will need to overwrite the `read_data` method from the [Custom data sources](feature-store-feature-processor-data-sources-custom.md). 

To connect with an Amazon Redshift cluster you need your:
+ Amazon Redshift JDBC URL (`jdbc-url`)

  For information about obtaining your Amazon Redshift JDBC URL, see [Getting the JDBC URL](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-obtain-url.html) in the Amazon Redshift Database Developer Guide.
+ Amazon Redshift user name (`redshift-user`) and password (`redshift-password`)

  For information about how to create and manage database users using the Amazon Redshift SQL commands, see [Users](https://docs.aws.amazon.com/redshift/latest/dg/r_Users.html) in the Amazon Redshift Database Developer Guide.
+ Amazon Redshift table name (`redshift-table-name`)

  For information about how to create a table with some examples, see [CREATE TABLE](https://docs.aws.amazon.com/redshift/latest/dg/r_CREATE_TABLE_NEW.html) in the Amazon Redshift Database Developer Guide.
+ (Optional) If using Secrets Manager, you’ll need the secret name (`secret-redshift-account-info`) where you store your Amazon Redshift access username and password on Secrets Manager.

  For information about Secrets Manager, see [Find secrets in AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) in the AWS Secrets Manager User Guide. 
+ AWS Region (`your-region`)

  For information about obtaining your current session’s region name using SDK for Python (Boto3), see [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in the Boto3 documentation.

The following example demonstrates how to retrieve the JDBC URL and personal access token from Secrets Manager and override the `read_data` for your custom data source class, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class RedshiftDataSource(PySparkDataSource):
    
    data_source_name = "Redshift"
    data_source_unique_id = "redshift-resource-arn"
    
    def read_data(self, spark, params):
        url = "jdbc-url?user=redshift-user&password=redshift-password"
        aws_iam_role_arn = "redshift-command-access-role"
        secret_name = "secret-redshift-account-info"
        region_name = "your-region"
        
        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password'])
        
        return spark.read \
             .format("jdbc") \
             .option("url", url) \
             .option("driver", "com.amazon.redshift.Driver") \
             .option("dbtable", "redshift-table-name") \
             .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \
             .option("aws_iam_role", aws_iam_role_arn) \
             .load()
```

The following example shows how to connect `RedshiftDataSource` to your `feature_processor` decorator.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"}
)
def transform(input_df):
    return input_df
```

To run the feature processor job remotely, you need to provide the jdbc driver by defining `SparkConfig` and pass it to the `@remote` decorator.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[RedshiftDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Snowflake custom data source examples


Snowflake provides a Spark connector that can be used for your `feature_processor` decorator. For information about Snowflake connector for Spark, see [Snowflake Connector for Spark](https://docs.snowflake.com/en/user-guide/spark-connector) in the Snowflake documentation.

To create the custom Snowflake data source class, you will need to override the `read_data` method from the [Custom data sources](feature-store-feature-processor-data-sources-custom.md) and add the Spark connector packages to the Spark classpath. 

To connect with a Snowflake data source you need:
+ Snowflake URL (`sf-url`)

  For information about URLs for accessing Snowflake web interfaces, see [Account Identifiers](https://docs.snowflake.com/en/user-guide/admin-account-identifier) in the Snowflake documentation.
+ Snowflake database (`sf-database`) 

  For information about obtaining the name of your database using Snowflake, see [CURRENT\$1DATABASE](https://docs.snowflake.com/en/sql-reference/functions/current_database) in the Snowflake documentation.
+ Snowflake database schema (`sf-schema`) 

  For information about obtaining the name of your schema using Snowflake, see [CURRENT\$1SCHEMA](https://docs.snowflake.com/en/sql-reference/functions/current_schema) in the Snowflake documentation.
+ Snowflake warehouse (`sf-warehouse`)

  For information about obtaining the name of your warehouse using Snowflake, see [CURRENT\$1WAREHOUSE](https://docs.snowflake.com/en/sql-reference/functions/current_warehouse) in the Snowflake documentation.
+ Snowflake table name (`sf-table-name`)
+ (Optional) If using Secrets Manager, you’ll need the secret name (`secret-snowflake-account-info`) where you store your Snowflake access username and password on Secrets Manager. 

  For information about Secrets Manager, see [Find secrets in AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) in the AWS Secrets Manager User Guide. 
+ AWS Region (`your-region`)

  For information about obtaining your current session’s region name using SDK for Python (Boto3), see [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in the Boto3 documentation.

The following example demonstrates how to retrieve the Snowflake user name and password from Secrets Manager and override the `read_data` function for your custom data source class `SnowflakeDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
from sagemaker.feature_store.feature_processor import feature_processor
import json
import boto3


class SnowflakeDataSource(PySparkDataSource):
    
    sf_options = { 
        "sfUrl" : "sf-url",
        "sfDatabase" : "sf-database",
        "sfSchema" : "sf-schema",
        "sfWarehouse" : "sf-warehouse",
    }

    data_source_name = "Snowflake"
    data_source_unique_id = "sf-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-snowflake-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        self.sf_options["sfUser"] = secrets.get("username")
        self.sf_options["sfPassword"] = secrets.get("password")
        
        return spark.read.format("net.snowflake.spark.snowflake") \
                        .options(**self.sf_options) \
                        .option("dbtable", "sf-table-name") \
                        .load()
```

The following example shows how to connect `SnowflakeDataSource` to your `feature_processor` decorator.

```
from sagemaker.feature_store.feature_processor import feature_processor

@feature_processor(
    inputs=[SnowflakeDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"}
)
def transform(input_df):
    return input_df
```

To run the feature processor job remotely, you need to provide the packages via defining `SparkConfig` and pass it to `@remote` decorator. The Spark packages in the following example are such that `spark-snowflake_2.12` is the Feature Processor Scala version, `2.12.0` is the Snowflake version you wish to use, and `spark_3.3` is the Feature Processor Spark version. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[SnowflakeDataSource()],
    output="feature-group-arn>",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Databricks (JDBC) custom data source examples


Spark can read data from Databricks by using the Databricks JDBC driver. For information about the Databricks JDBC driver, see [Configure the Databricks ODBC and JDBC drivers](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#configure-the-databricks-odbc-and-jdbc-drivers) in the Databricks documentation.

**Note**  
You can read data from any other database by including the corresponding JDBC driver in Spark classpath. For more information, see [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html) in the Spark SQL Guide.

To create the custom Databricks data source class, you will need to override the `read_data` method from the [Custom data sources](feature-store-feature-processor-data-sources-custom.md) and add the JDBC jar to the Spark classpath. 

To connect with a Databricks data source you need:
+ Databricks URL (`databricks-url`)

  For information about your Databricks URL, see [Building the connection URL for the Databricks driver](https://docs.databricks.com/en/integrations/jdbc-odbc-bi.html#building-the-connection-url-for-the-databricks-driver) in the Databricks documentation.
+ Databricks personal access token (`personal-access-token`)

  For information about your Databricks access token, see [Databricks personal access token authentication](https://docs.databricks.com/en/dev-tools/auth.html#pat) in the Databricks documentation.
+ Data catalog name (`db-catalog`) 

  For information about your Databricks catalog name, see [Catalog name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#catalog-name) in the Databricks documentation.
+ Schema name (`db-schema`)

  For information about your Databricks schema name, see [Schema name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#schema-name) in the Databricks documentation.
+ Table name (`db-table-name`)

  For information about your Databricks table name, see [Table name](https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html#table-name) in the Databricks documentation.
+ (Optional) If using Secrets Manager, you’ll need the secret name (`secret-databricks-account-info`) where you store your Databricks access username and password on Secrets Manager. 

  For information about Secrets Manager, see [Find secrets in AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_search-secret.html) in the AWS Secrets Manager User Guide. 
+ AWS Region (`your-region`)

  For information about obtaining your current session’s region name using SDK for Python (Boto3), see [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in the Boto3 documentation.

The following example demonstrates how to retrieve the JDBC URL and personal access token from Secrets Manager and overwrite the `read_data` for your custom data source class, `DatabricksDataSource`.

```
from sagemaker.feature_store.feature_processor import PySparkDataSource
import json
import boto3


class DatabricksDataSource(PySparkDataSource):
    
    data_source_name = "Databricks"
    data_source_unique_id = "databricks-url"
    
    def read_data(self, spark, params):
        secret_name = "secret-databricks-account-info"
        region_name = "your-region"

        session = boto3.session.Session()
        sm_client = session.client(
            service_name='secretsmanager',
            region_name=region_name,
        )
        
        secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"])
        jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd'])
         
        return spark.read.format("jdbc") \
                        .option("url", jdbc_url) \
                        .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \
                        .option("driver", "com.simba.spark.jdbc.Driver") \
                        .load()
```

The following example shows how to upload the JDBC driver jar, `jdbc-jar-file-name.jar`, to Amazon S3 in order to add it to the Spark classpath. For information about downloading the Spark JDBC driver (`jdbc-jar-file-name.jar`) from Databricks, see [Download JDBC Driver](https://www.databricks.com/spark/jdbc-drivers-download)in the Databricks website.

```
from sagemaker.feature_store.feature_processor import feature_processor
    
@feature_processor(
    inputs=[DatabricksDataSource()],
    output=feature-group-arn,
    target_stores=["OfflineStore"],
    spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"}
)
def transform(input_df):
    return input_df
```

To run the feature processor job remotely, you need to provide the jars by defining `SparkConfig` and pass it to the `@remote` decorator.

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig

config = {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"
    }
}

@remote(
    spark_config=SparkConfig(configuration=config),
    instance_type="ml.m5.2xlarge",
)
@feature_processor(
    inputs=[DatabricksDataSource()],
    output="feature-group-arn",
    target_stores=["OfflineStore"],
)
def transform(input_df):
    return input_df
```

## Streaming custom data source examples


You can connect to streaming data sources like Amazon Kinesis, and author transforms with Spark Structured Streaming to read from streaming data sources. For information about the Kinesis connector, see [Kinesis Connector for Spark Structured Streaming](https://github.com/roncemer/spark-sql-kinesis) in GitHub. For information about Amazon Kinesis, see [What Is Amazon Kinesis Data Streams?](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) in the Amazon Kinesis Developer Guide.

To create the custom Amazon Kinesis data source class, you will need to extend the `BaseDataSource` class and override the `read_data` method from [Custom data sources](feature-store-feature-processor-data-sources-custom.md).

To connect to an Amazon Kinesis data stream you need:
+ Kinesis ARN (`kinesis-resource-arn`) 

  For information on Kinesis data stream ARNs, see [Amazon Resource Names (ARNs) for Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html#kinesis-using-iam-arn-format) in the Amazon Kinesis Developer Guide.
+ Kinesis data stream name (`kinesis-stream-name`)
+ AWS Region (`your-region`)

  For information about obtaining your current session’s region name using SDK for Python (Boto3), see [region\$1name](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.region_name) in the Boto3 documentation.

```
from sagemaker.feature_store.feature_processor import BaseDataSource
from sagemaker.feature_store.feature_processor import feature_processor

class KinesisDataSource(BaseDataSource):

    data_source_name = "Kinesis"
    data_source_unique_id = "kinesis-resource-arn"
    
    def read_data(self, spark, params): 
        return spark.readStream.format("kinesis") \
            .option("streamName", "kinesis-stream-name") \
            .option("awsUseInstanceProfile", "false") \
            .option("endpointUrl", "https://kinesis.your-region.amazonaws.com")
            .load()
```

The following example demonstrates how to connect `KinesisDataSource` to your `feature_processor` decorator. 

```
from sagemaker.remote_function import remote
from sagemaker.remote_function.spark_config import SparkConfig
import feature_store_pyspark.FeatureStoreManager as fsm

def ingest_micro_batch_into_fg(input_df, epoch_id):
    feature_group_arn = "feature-group-arn"
    fsm.FeatureStoreManager().ingest_data(
        input_data_frame = input_df,
        feature_group_arn = feature_group_arn
    )

@remote(
    spark_config=SparkConfig(
        configuration={
            "Classification": "spark-defaults", 
            "Properties":{
                "spark.sql.streaming.schemaInference": "true",
                "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2"
            }
        }
    ),
    instance_type="ml.m5.2xlarge",
    max_runtime_in_seconds=2419200 # 28 days
)
@feature_processor(
    inputs=[KinesisDataSource()],
    output="feature-group-arn"
)
def transform(input_df):    
    output_stream = (
        input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)")
        .writeStream.foreachBatch(ingest_micro_batch_into_fg)
        .trigger(processingTime="1 minute")
        .option("checkpointLocation", "s3a://checkpoint-path")
        .start()
    )
    output_stream.awaitTermination()
```

In the example code above we use a few Spark Structured Streaming options while streaming micro-batches into your feature group. For a full list of options, see the [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) in the Apache Spark documentation. 
+ The `foreachBatch` sink mode is a feature that allows you to apply operations and write logic on the output data of each micro-batch of a streaming query. 

  For information on `foreachBatch`, see [Using Foreach and ForeachBatch](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch) in the Apache Spark Structured Streaming Programming Guide. 
+ The `checkpointLocation` option periodically saves the state of the streaming application. The streaming log is saved in checkpoint location `s3a://checkpoint-path`.

  For information on the `checkpointLocation` option, see [Recovering from Failures with Checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) in the Apache Spark Structured Streaming Programming Guide. 
+ The `trigger` setting defines how often the micro-batch processing is triggered in a streaming application. In the example, the processing time trigger type is used with one-minute micro-batch intervals, specified by `trigger(processingTime="1 minute")`. To backfill from a stream source, you can use the available-now trigger type, specified by `trigger(availableNow=True)`.

  For a full list of `trigger` types, see [Triggers](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) in the Apache Spark Structured Streaming Programming Guide.

**Continuous streaming and automatic retries using event based triggers**

The Feature Processor uses SageMaker Training as compute infrastructure and it has a maximum runtime limit of 28 days. You can use event based triggers to extend your continuous streaming for a longer period of time and recover from transient failures. For more information on schedule and event based executions, see [Scheduled and event based executions for Feature Processor pipelines](feature-store-feature-processor-schedule-pipeline.md).

The following is an example of setting up an event based trigger to keep the streaming Feature Processor pipeline running continuously. This uses the streaming transform function defined in the previous example. A target pipeline can be configured to be triggered when a `STOPPED` or `FAILED` event occurs for a source pipeline execution. Note that the same pipeline is used as the source and target so that it run continuously.

```
import sagemaker.feature_store.feature_processor as fp
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "streaming-pipeline"
streaming_pipeline_arn = fp.to_pipeline(
    pipeline_name = streaming_pipeline_name,
    step = transform # defined in previous section
)

fp.put_trigger(
    source_pipeline_events=FeatureProcessorPipelineEvents(
        pipeline_name=source_pipeline_name, 
        pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
    ),
    target_pipeline=target_pipeline_name
)
```

# Example Feature Processing code for common use cases


The following examples provide sample Feature Processing code for common use cases. For a more detailed example notebook showcasing specific use cases, see [Amazon SageMaker Feature Store Feature Processing notebook](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-featurestore/feature_store_feature_processor.ipynb).

In the following examples, `us-east-1` is the region of the resource, `111122223333` is the resource owner account ID, and `your-feature-group-name` is the feature group name.

The `transactions` data set used in the following examples has the following schema:

```
'FeatureDefinitions': [
  {'FeatureName': 'txn_id', 'FeatureType': 'String'},
  {'FeatureName': 'txn_time', 'FeatureType': 'String'},
  {'FeatureName': 'credit_card_num', 'FeatureType': 'String'},
  {'FeatureName': 'txn_amount', 'FeatureType': 'Fractional'}
]
```

**Topics**
+ [

## Joining data from multiple data sources
](#feature-store-feature-processor-examples-joining-multiple-sources)
+ [

## Sliding window aggregates
](#feature-store-feature-processor-examples-sliding-window-aggregates)
+ [

## Tumbling window aggregates
](#feature-store-feature-processor-examples-tumbling-window-aggregates)
+ [

## Promotion from the offline store to online store
](#feature-store-feature-processor-examples-promotion-offline-to-online-store)
+ [

## Transformations with the Pandas library
](#feature-store-feature-processor-examples-transforms-with-pandas-library)
+ [

## Continuous executions and automatic retries using event based triggers
](#feature-store-feature-processor-examples-continuous-execution-automatic-retries)

## Joining data from multiple data sources


```
@feature_processor(
    inputs=[
        CSVDataSource('s3://bucket/customer'), 
        FeatureGroupDataSource('transactions')
    ],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def join(transactions_df, customer_df):
  '''Combine two data sources with an inner join on a common column'''

  return transactions_df.join(
    customer_df, transactions_df.customer_id == customer_df.customer_id, "inner"
  )
```

## Sliding window aggregates


```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def sliding_window_aggregates(transactions_df):
    '''Aggregates over 1-week windows, across 1-day sliding windows.'''
    from pyspark.sql.functions import window, avg, count
    
    return (
        transactions_df
            .groupBy("credit_card_num", window("txn_time", "1 week", "1 day"))
            .agg(avg("txn_amount").alias("avg_week"), count("*").alias("count_week")) 
            .orderBy("window.start")
            .select("credit_card_num", "window.start", "avg_week", "count_week")
    )
```

## Tumbling window aggregates


```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/your-feature-group-name'
)
def tumbling_window_aggregates(transactions_df, spark):
    '''Aggregates over 1-week windows, across 1-day tumbling windows, as a SQL query.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT credit_card_num, window.start, AVG(amount) AS avg, COUNT(*) AS count  
        FROM transactions
        GROUP BY credit_card_num, window(txn_time, "1 week")  
        ORDER BY window.start
    ''')
```

## Promotion from the offline store to online store


```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def offline_to_online():
    '''Move data from the offline store to the online store of the same feature group.'''

    transactions_df.createOrReplaceTempView('transactions')
    return spark.sql(f'''
        SELECT txn_id, txn_time, credit_card_num, amount
        FROM
            (SELECT *,
            row_number()
            OVER
                (PARTITION BY txn_id
                ORDER BY "txn_time" DESC, Api_Invocation_Time DESC, write_time DESC)
            AS row_number
            FROM transactions)
        WHERE row_number = 1
    ''')
```

## Transformations with the Pandas library


**Transformations with the Pandas library**

```
@feature_processor(
    inputs=[FeatureGroupDataSource('transactions')],
    target_stores=['OnlineStore'],
    output='arn:aws:sagemaker:us-east-1:111122223333:feature-group/transactions'
)
def pandas(transactions_df):
    '''Author transformations using the Pandas interface.
    
    Requires PyArrow to be installed via pip.
    For more details: https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark
    '''
    import pyspark.pandas as ps
    
    # PySpark DF to Pandas-On-Spark DF (Distributed DF with Pandas interface).
    pandas_on_spark_df = transactions_df.pandas_api()
    # Pandas-On-Spark DF to Pandas DF (Single Machine Only).
    pandas_df = pandas_on_spark_df.to_pandas()
    
    # Reverse: Pandas DF to Pandas-On-Spark DF
    pandas_on_spark_df = ps.from_pandas(pandas_df)
    # Reverse: Pandas-On-Spark DF to PySpark DF
    spark_df = pandas_on_spark_df.to_spark()
    
    return spark_df
```

## Continuous executions and automatic retries using event based triggers


```
from sagemaker.feature_store.feature_processor import put_trigger, to_pipeline, FeatureProcessorPipelineEvent
from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus

streaming_pipeline_name = "target-pipeline"

to_pipeline(
    pipeline_name=streaming_pipeline_name,
    step=transform
)

put_trigger(
    source_pipeline_events=[
        FeatureProcessorPipelineEvent(
            pipeline_name=streaming_pipeline_name, 
            pipeline_execution_status=[
            FeatureProcessorPipelineExecutionStatus.STOPPED,
            FeatureProcessorPipelineExecutionStatus.FAILED]
        )
    ],
    target_pipeline=streaming_pipeline_name
)
```