

# Pipeline management
Pipeline management

 Data pipeline is the core functionality of this guidance. In the Clickstream Analytics on AWS guidance, we define a data pipeline as a sequence of integrated AWS services that ingest, process, and model the clickstream data into a destination data warehouse for analytics and visualization. It is also designed to efficiently and reliably collect data from your websites and apps to a S3-based data lake, where it can be further processed, analyzed, and utilized for additional use cases (such as real-time monitoring, and recommendation). 

To get a basic understanding of the pipeline, you can refer to [terms and concepts](terms-and-concepts.md) for more information.

## Prerequisites
Prerequisites

 You can configure the pipeline in all AWS Regions. For opt-in regions, you need to [enable them](https://docs.aws.amazon.com/general/latest/gr/rande-manage.html) first. 

 Before you start to configure the pipeline in a specific region, make sure you have the following in the target region: 
+  At least one Amazon VPC. 
+  At least two public subnets across two AZs in the VPC. 
+  At least two private (with NAT gateways or instances) subnets across two AZs, or at least two isolated subnets across two AZs in the VPC. If you want to deploy the guidance resources in the isolated subnets, you have to create [VPC endpoints](https://docs.aws.amazon.com/whitepapers/latest/aws-privatelink/what-are-vpc-endpoints.html) for below AWS services, 
  +  s3, logs, ecr.api, ecr.dkr, ecs, ecs-agent, ecs-telemetry. 
  +  kinesis-streams if you use KDS as sink buffer in ingestion module. 
  +  emr-serverless, glue if you enable data processing module. 
  +  redshift-data, sts, dynamodb, states and lambda if you enable Redshift as analytics engine in data modeling module. 
+  An Amazon S3 bucket located in the same region. 
+  If you need to enable Redshift Serverless as analytics engine in data modeling module, you need have subnets across at least three AZs. 
+  QuickSight Enterprise edition subscription is required if the reporting is enabled. 

# Ingestion
Ingestion

 Ingestion module contains a web service that provides an endpoint to collect data through HTTP/HTTPS requests, which mainly is composed of Amazon Application Load Balancer and Amazon Elastic Container Service. It also supports sinking data into a stream service or S3 directly. 

 You can create an ingestion module with the following settings: 
+  [Ingestion endpoint settings](ingestion-endpoint.md): Create a web service as an ingestion endpoint to collect data sent from your SDKs. 
+  Data sink settings: Configure how the guidance sinks the data for downstream consumption. Currently, the guidance supports three types of data sink: 
  +  [Apache Kafka](data-sink-kafka.md) 
  +  [Amazon S3](data-sink---s3.md) 
  +  [Amazon Kinesis Data Stream (KDS)](data-sink-kinesis.md) 

**Throttle**

Currently, there is no built-in throttling feature available with this guidance. If needed, you can configure AWS WAF to implement throttling feature. Please refer to [WAF](https://docs.aws.amazon.com/waf/latest/developerguide/waf-rule-statement-type-rate-based.html) documentation.

# Ingestion endpoint
Ingestion endpoint

 The guidance creates a web service as an ingestion endpoint to collect data sent from your SDKs. You can set below configurations for ingestion endpoint. 
+ **Network type:** Specify the scheme of the application load balancer. Choose **General** for internet-facing or **Private** for internal.
+ **Public subnets (only applicable to General network type) : **Select at least two existing VPC public subnets, and the Amazon Application Load Balancers (ALBs) will be deployed in these subnets. 
+ **Private subnets****: **Select at least two existing VPC private subnets, and the EC2 instances running in ECS will be deployed in these subnets. 
**Note**  
 The availability zones where the public subnets are located must be consistent with those of the private subnets. 
+  **Ingestion capacity**: This configuration sets the capacity of the ingestion server, and the ingestion server will automatically scale up or down based on the utilization of the processing CPU.
  + Ingestion Capacity Unit (ICU): A single Ingestion Compute Unit (ICU) represents billable compute and memory units, approximately 8 gigabytes (GB) of memory and 2 vCPUs. 1 ICU generally can support 4000\$16000 requests per second.
  +  Minimum capacity: The minimum capacity to which the ingestion server will scale down. 
  +  Maximum capacity: The maximum capacity to which the ingestion server will scale up. 
  +  Warm pool: Warm pool gives you the ability to decrease latency for your applications that have exceptionally long boot time. For more information, please refer to [Warm pools for Amazon EC2 Auto Scaling](https://docs.aws.amazon.com/autoscaling/ec2/userguide/ec2-auto-scaling-warm-pools.html). 
+  **Enable HTTPS**: Users can choose HTTPS/HTTP protocol for the Ingestion endpoint. 
**Warning**  
If you switch between enabling HTTPS and disabling HTTPS, there may be interruptions in the ingestion service.
  +  Enable HTTPS: If users choose to enable HTTPS, the ingestion server will provide HTTPS endpoint. 
    +  Domain name: enter a domain name. 
**Note**  
Once the ingestion server is created, use the custom endpoint to create an alias or CNAME mapping in your Domain Name System (DNS) for the custom endpoint. 
    +  SSL Certificate: User need to select an ACM certificate corresponding to the domain name that you input. If there is no ACM certificate, please refer to [create public certificate](https://docs.aws.amazon.com/acm/latest/userguide/gs-acm-request-public.html) to create it. 
  +  Disable HTTPS: If users choose to disable HTTPS, the ingestion server will provide HTTP endpoint. 
**Important**  
 Using HTTP protocol is not secure, because data will be sent without any encryption, and there are high risks of data being leaked or tampered during transmission. Please acknowledge the risk to proceed. 
+ **Cross-Origin Resource Sharing (CORS)**: You can enable CORS to limit requests to data ingestion API from a specific domain. Note that, you need to input a complete internet address, for example, https://www.example.com, http://localhost:8080. Use comma to separate domain if you have multiple domain for this setting.
**Warning**  
CORS is a mandatory setting if you are collecting data from a website. If you do not set value for this parameter, the ingestion server to reject all the requests from Web platform.
+  Additional Settings 
  +  Request path: User can input the path of ingestion endpoint to collect data, the default path is "/collect". 
  +  AWS Global Accelerator: User can choose to create an accelerator to get static IP addresses that act as a global fixed entry point to your ingestion server, which will improves the availability and performance of your ingestion server. Note that additional charges apply. 
  +  Authentication: User can use OIDC provider to authenticate the request sent to your ingestion server. If you plan to enable it, please create an OIDC client in the OIDC provider then create a secret in AWS Secret Manager with information: 
    +  issuer 
    +  token endpoint 
    +  User endpoint 
    +  Authorization endpoint 
    +  App client ID 
    +  App Client Secret 

       The format is like:

      ```
      {
          "issuer":"xxx",
          "userEndpoint":"xxx",
          "authorizationEndpoint":"xxx",
          "tokenEndpoint":"xxx",
          "appClientId":"xxx",
          "appClientSecret":"xxx"
      }
      ```

       In the OIDC provider, you need to add `https://<ingestion server endpoint>/oauth2/idpresponse` to "Allowed callback URLs". 

       If you need to obtain the authentication token directly without inputting credential (username/password) manually, you can refer to [alb headless authentication client code](https://github.com/aws-samples/alb-headless-authentication-client) to set up your client to obtain the authentication token automatically.
**Warning**  
If you switch between enabling Authentication and disabling Authentication, there may be interruptions in the ingestion service.
  +  Access logs: ALB supports delivering detailed logs of all requests it receives. If you enable this option, the guidance will automatically enable access logs for you and store the logs into the S3 bucket you selected in previous step. 
**Important**  
The bucket must have a bucket policy that grants Elastic Load Balancing permission to write the access logs to the bucket. For details, refer to [Step 2: Attach a policy to your S3 bucket](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/enable-access-logging.html#attach-bucket-policy).  
Below is an example policy for the bucket in Regions available before August 2022.   

****  

    ```
    {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "logdelivery.elasticloadbalancing.amazonaws.com"
          },
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::amzn-s3-demo-bucket/prefix/AWSLogs/123456789012/*"
        }
      ]
    }
    ```
You need to replace `elb-account-id` with the ID of the AWS account for Elastic Load Balancing in your Region:  
US East (N. Virginia) – 127311923021
US East (Ohio) – 033677994240
US West (N. California) – 027434742980
US West (Oregon) – 797873946194
Africa (Cape Town) – 098369216593
Asia Pacific (Hong Kong) – 754344448648
Asia Pacific (Jakarta) – 589379963580
Asia Pacific (Mumbai) – 718504428378
Asia Pacific (Osaka) – 383597477331
Asia Pacific (Seoul) – 600734575887
Asia Pacific (Singapore) – 114774131450
Asia Pacific (Sydney) – 783225319266
Asia Pacific (Tokyo) – 582318560864
Canada (Central) – 985666609251
Europe (Frankfurt) – 054676820928
Europe (Ireland) – 156460612806
Europe (London) – 652711504416
Europe (Milan) – 635631232127
Europe (Paris) – 009996457667
Europe (Stockholm) – 897822967062
Middle East (Bahrain) – 076674570225
South America (São Paulo) – 507241528517
China (Beijing) – 638102146993
China (Ningxia) – 037604701340

# Data sink – Kafka
Data sink – Kafka

 This data sink will stream the clickstream data collected by the ingestion endpoint into a topic in a Kafka cluster. Currently, guidance support Amazon Managed Streaming for Apache Kafka (Amazon MSK) or a self-hosted Kafka cluster. 

## Amazon MSK
Amazon MSK
+  **Select an existing Amazon MSK cluster.** Select an MSK cluster from the drop-down list, and the MSK cluster needs to meet the following requirements: 
  +  MSK cluster and this guidance need to be in the same VPC 
  +  Enable **Unauthenticated access** in Access control methods 
  +  Enable **Plaintext** in Encryption 
  +  Set **auto.create.topics.enable** as true in MSK cluster configuration. This configuration sets whether MSK cluster can create topic automatically. 
  +  The value of **default.replication.factor** cannot be larger than the number of MKS cluster brokers 

**Note**  
 If there is no MSK cluster, the user needs to create an MSK Cluster following above requirements. 
+  **Topic**: The user can specify a topic name. By default, the guidance will create a topic with “project-id”. 

## Self-hosted Kafka
Self-hosted Kafka

 Users can also use self-hosted Kafka clusters. To integrate the guidance with Kafka clusters, provide the following configurations: 
+  **Broker link**: Enter the brokers link of Kafka cluster that you wish to connect to. The Kafka cluster needs to meet the following requirements:
  + 
    + The Kafka cluster and this guidance need to be in the same VPC.
    + At least two Kafka cluster brokers are available.
+  **Topic**: User can specify the topic for storing the data 
+  **Security Group**: This VPC security group defines which subnets and IP ranges can access the Kafka cluster. 

## Connector
Connector

 Enable guidance to create Kafka connector and a custom plugin for this connector. This connector will sink the data from Kafka cluster to S3 bucket. 

## Additional Settings
Additional Settings
+  **Sink maximum interval**: Specifies the maximum length of time (in seconds) that records should be buffered before streaming to the AWS service. 
+  **Batch size**: The maximum number of records to deliver in a single batch. 

# Data sink – Kinesis
Data sink – Kinesis

 This data sink will stream the clickstream data collected by the ingestion endpoint into KDS. The guidance will create a KDS in your AWS account based on your specifications. 

## Provision mode
Provision mode

 Two modes are available: **On-demand** and **Provisioned** 
+  **On-demand**: In this mode, KDS shards are provisioned based on the workshop automatically. On-demand mode is suited for workloads with unpredictable and highly-variable traffic patterns. 
+  **Provisioned**: In this mode, KDS shards are set at creation. The provisioned mode is suited for predictable traffic with capacity requirements that are easy to forecast. You can also use the provisioned mode if you want fine-grained control over how data is distributed across shards. 
  +  Shard number: With the provisioned mode, you must specify the number of shards for the data stream. For more information, please refer to [provisioned mode](https://docs.aws.amazon.com/streams/latest/dev/how-do-i-size-a-stream.html#provisionedmode). 

## Addtional settings
Addtional settings
+  **Sink maximum interval**: You can specify the maximum interval (in seconds) that records should be buffered before streaming to the AWS service. 
+  **Batch size**: You can specify the maximum number of records to deliver in a single batch. 

# Data sink - S3
Data sink - S3

 In this option, clickstream data is buffered in the memory of ingestion Server, then sink into a S3 bucket. This option provides the best cost-performance in case real-time data consumption is not required. 

**Note**  
Unlike Kafka and KDS data sink, this option buffers data in the ingestion server and responses 200 code to SDK client before sink into S3, so there is chance data could be lost while ingestion server fails and auto-scaled machine is in the process of creation. But it is worth to note that this probability is very low because of the High-availability design of the guidance. 
+  **Buffer size**: Specify the data size to buffer before sending to Amazon S3. The higher buffer size may be lower in cost with higher latency, while the lower buffer size will be faster in delivery with higher cost. Min: 1 MiB, Max: 50 MiB 
+  **Buffer interval**: Specify the maximum interval (in seconds) for saving buffer to S3. The higher interval allows more time to collect data and the size of data may be bigger. The lower interval sends the data more frequently and may be more advantageous when looking at shorter cycles of data activity. Min: 60 Seconds, Max: 3600 Seconds 

# Data processing
Data processing

 Clickstream Analytics on AWS provides an inbuilt data schema to parse and model the raw event data sent from your web and mobile apps, which makes it easy for you to analyze the data in analytics engines (such as RedShift and Athena). 

 Data Processing module includes two functionalities: 
+  **Transformation**: Extract the data from files sank by ingestion module, then parse each event data and transform them to guidance data model. 
+  **Enrichment**: Add additional dimensions/fields to event data. 

 This chapter includes: 
+  [Data schema](data-schema.md) 
+  [Configure execution parameters](execution-parameters.md) 
+  [Configure custom plugins](processing-plugin.md) 

# Data schema
Data schema

This article explains the data schema and format in Clickstream Analytics on AWS. This guidance uses an **event-based** data model to store and analyze clickstream data. Every activity (such as click and view) on the clients is modeled as an event with dimensions, and each dimension represents a parameter of the event. Dimensions are common for all events.

You can use JSON objects to store custom event parameters as key-value pairs into special dimensions, which helps you to collect information that is specific for your business. Those JSON objects are stored in special data types, which allow you to extract the values in the analytics engines.

## Database and table
Database and table

 For each project, the guidance creates a database with name of `<project-id>` in Amazon Redshift and Athena. Each app will have a schema with name of `app_id`. In Athena, all tables are added partitions of `app_id`, year, month, and day. Based on the event data, the guidance's data-processing module creates the following four base tables:
+ **event-v2:** This table stores event data. Each record represents an individual event.
+ **user-v2:** This table stores the latest user attributes. Each record represents a visitor (pseudonymous user).
+ **item-v2:** This table stores event-item data. Each record represents an event that is associated with an item.
+ **session:** This table stores session data. Each record represents a session for each pseudonymous user.

## Columns
Columns

Each column in the tables represents a specific parameter for an event, user, or item. Some parameters are nested within a `Super` field in Amazon Redshift or a `Map` field in Athena. Those fields (such as `custom_parameters`, `user_properties`) contain parameters that are repeatable. The following table describes the fields.

### Event table fields
Event table fields


|  **Field Name**  |  **Data Type - Redshift**  |  **Data Type - Athena**  |  **Description**  | 
| --- | --- | --- | --- | 
|  event\$1timestamp  |  TIMESTAMP  |  TIMESTAMP  | The timestamp (in microseconds, UTC) when the event was logged on the client. | 
|  event\$1id  |  VARCHAR  |  STRING  |  Unique ID for the event.  | 
| event\$1time\$1msec | BIGINT | BIGINT | The time in UNIX timestamp format (microseconds) when the event was logged on the client. | 
|  event\$1name  |  VARCHAR  |  STRING  |  The name of the event.  | 
|  event\$1value  | DOUBLE PRECISION | FLOAT | The value of the event's "value" parameter. | 
|  event\$1value\$1currency |  VARCHAR  |  STRING  |  The currency of the value associated with the event. | 
|  event\$1bundle\$1sequence\$1id  |  BIGINT  |  BIGINT  |  The sequential ID of the bundle in which these events were uploaded.  | 
|  ingest\$1timestamp  |  BIGINT  |  BIGINT  |  Timestamp offset between collection time and upload time in micros.  | 
|  device.mobile\$1brand\$1name  |  VARCHAR  |  STRING  |  The device brand name.  | 
|  device.mobile\$1model\$1name  |  VARCHAR  |  STRING  |  The device model name.  | 
|  device.manufacturer  |  VARCHAR  |  STRING  |  The device manufacturer name.  | 
|  device.carrier  |  VARCHAR  |  STRING  |  The device network provider name.  | 
|  device.network\$1type  |  VARCHAR  |  STRING  |  The network\$1type of the device, e.g., WIFI, 5G  | 
|  device.operating\$1system  |  VARCHAR  |  STRING  |  The operating system of the device.  | 
|  device.operating\$1system\$1version  |  VARCHAR  |  STRING  |  The OS version.  | 
|  device.vendor\$1id  |  VARCHAR  |  STRING  |  IDFV (present only if IDFA is not collected).  | 
|  device.advertising\$1id  |  VARCHAR  |  STRING  |  Advertising ID/IDFA.  | 
|  device.system\$1language  |  VARCHAR  |  STRING  |  The OS language.  | 
|  device.time\$1zone\$1offset\$1seconds  |  BIGINT  |  BIGINT  |  The offset from GMT in seconds.  | 
|  device.ua\$1browser  |  VARCHAR  |  STRING  |  The browser in which the user viewed content, derived from User Agent string  | 
|  device.ua\$1browser\$1version  |  VARCHAR  |  STRING  |  The version of the browser in which the user viewed content, derive from User Agent  | 
|  device.ua\$1device  |  VARCHAR  |  STRING  |  The device in which user viewed content, derive from User Agent.  | 
|  device.ua\$1device\$1category  |  VARCHAR  |  STRING  |  The device category in which user viewed content, derive from User Agent.  | 
| device.ua\$1os |  VARCHAR  |  STRING  | The operating system of the device in which user viewed content, derive from User Agent. | 
| device.ua\$1os\$1version |  VARCHAR  |  STRING  | The operating system version of the device category in which user viewed content, derive from User Agent. | 
| device.ua | SUPER | MAP | The parsed User Agent in key-value pairs | 
|  device.screen\$1width  |  VARCHAR  |  STRING  |  The screen width of the device.  | 
|  device.screen\$1height  |  VARCHAR  |  STRING  |  The screen height of the device.  | 
| device.viewport\$1width | VARCHAR | STRING | The screen width of the browser viewport. | 
| device.viewport\$1height | VARCHAR | STRING | The screen height of the browser viewport. | 
|  geo.continent  |  VARCHAR  |  STRING  |  The continent from which events were reported, based on IP address.  | 
|  geo.sub\$1continent  |  VARCHAR  |  STRING  |  The subcontinent from which events were reported, based on IP address.  | 
|  geo.country  |  VARCHAR  |  STRING  |  The country from which events were reported, based on IP address.  | 
|  geo.region  |  VARCHAR  |  STRING  |  The region from which events were reported, based on IP address.  | 
|  geo.metro  |  VARCHAR  |  STRING  |  The metro from which events were reported, based on IP address.  | 
|  geo.city  |  VARCHAR  |  STRING  |  The city from which events were reported, based on IP address.  | 
|  geo.locale  |  VARCHAR  |  STRING  |  The locale information obtained from device.  | 
|  traffic\$1source\$1name  |  VARCHAR  |  STRING  |  Name of the marketing campaign that acquired the user when the events were reported.  | 
|  traffic\$1source\$1medium  |  VARCHAR  |  STRING  |  Name of the medium (paid search, organic search, email, etc.) that acquired the user when the events were reported.  | 
|  traffic\$1source\$1campaign  |  VARCHAR  |  STRING  |  The marketing campaign (derive from utm\$1campaign) associated with the event.  | 
| traffic\$1source\$1content | VARCHAR | STRING | The marketing campaign content (derive from utm\$1content) associated with the event. | 
| traffic\$1source\$1term | VARCHAR | STRING | The marketing campaign term (derive from utm\$1term) associated with the event. | 
| traffic\$1source\$1campaign\$1id | VARCHAR | STRING | The marketing campaign id (derive from utm\$1id) associated with the event. | 
| traffic\$1source\$1clid | VARCHAR | STRING | The click id associated with the event. | 
| traffic\$1source\$1clid\$1platform | VARCHAR | STRING | The platform of the click id associated with the event. | 
| traffic\$1source\$1channel\$1group | VARCHAR | STRING | The channel group (assigned by traffic classification rules) associated with the event. | 
| traffic\$1source\$1category | VARCHAR | STRING | The source category (i.e., Search, Social, Video, Shopping) based on the traffic source associated with the event. | 
| user\$1first\$1touch\$1time\$1msec | BIGINT | BIGINT | The time in UNIX timestamp format (microseconds) when the user first touch the app or website. | 
| app\$1package\$1id | VARCHAR | STRING | The package name or bundle ID of the app. | 
| app\$1version | VARCHAR | STRING | The app's versionName (Android) or short bundle version. | 
| app\$1title | VARCHAR | STRING | The app's name. | 
| app\$1id | VARCHAR | STRING | The App ID (created by this guidance) associated with the app. | 
| app\$1install\$1source | VARCHAR | STRING | The store from which user installed the app. | 
|  platform  |  VARCHAR  |  STRING  |  The data stream platform (Web, IOS or Android) from which the event originated.  | 
|  project\$1id  |  VARCHAR  |  STRING  |  The project id associated with the app.  | 
| screen\$1view\$1screen\$1name |  VARCHAR  | STRING | The screen name associated with the event. | 
| screen\$1view\$1screen\$1id |  VARCHAR  | STRING | The screen class id associated with the event. | 
| screen\$1view\$1screen\$1unique\$1id  |  VARCHAR  | STRING | The unique screen id associated with the event | 
| screen\$1view\$1previous\$1screen\$1name |  VARCHAR  | STRING | The previous unique screen id associated with the event. | 
| screen\$1view\$1previous\$1screen\$1id | VARCHAR | STRING | The previous unique screen id associated with the event. | 
| screen\$1view\$1previous\$1screen\$1unique\$1id  | VARCHAR | STRING | The previous unique screen id associated with the event. | 
| screen\$1view\$1entrances  | BOOLEAN | BOOLEAN | Whether the screen is the entrance view of the session. | 
| page\$1view\$1page\$1referrer  | VARCHAR | STRING | The referrer page url.  | 
| page\$1view\$1page\$1referrer\$1title | VARCHAR | STRING | The referrer page title. | 
|  page\$1view\$1previous\$1time\$1msec | BIGINT | BIGINT | The timestamp of the previous page\$1view event.  | 
|  page\$1view\$1engagement\$1time\$1msec  | BIGINT | BIGINT | The previous page\$1view duration in milliseconds. | 
| page\$1view\$1page\$1title | VARCHAR | STRING | The title of the webpage associated with the event. | 
|  page\$1view\$1page\$1url  | VARCHAR | STRING | The url of the webpage associated with the event.  | 
| page\$1view\$1page\$1url\$1path | VARCHAR  | STRING | The url path of the webpage associated with the event.  | 
|  page\$1view\$1page\$1url\$1query\$1parameters  | SUPER  |  MAP  | The query parameters in key-value pairs of the page url associated with the event.  | 
| page\$1view\$1hostname  | VARCHAR  | STRING  | The host name of the web page associated with the event.  | 
| page\$1view\$1latest\$1referrer  | VARCHAR  | STRING  | The url of the latest external referrer.  | 
| page\$1view\$1latest\$1referrer\$1host  | VARCHAR  | STRING  | The hostname of the latest external referrer.  | 
| page\$1view\$1entrances  | BOOLEAN  | BOOLEAN  | Whether the page is the entrance view of the session.  | 
| app\$1start\$1is\$1first\$1time | BOOLEAN  | BOOLEAN | Whether the app start is a new app launch.  | 
| upgrade\$1previous\$1app\$1version  | VARCHAR | STRING | Previous app version before app upgrade event.  | 
| upgrade\$1previous\$1os\$1version  | VARCHAR | STRING | Previous os version before OS upgrade event.  | 
| search\$1key  | VARCHAR | STRING | The name of the keyword in the URL when user perform search on web site.  | 
| search\$1term  | VARCHAR  | STRING  | The search content in the URL when user perform search on web site.  | 
| outbound\$1link\$1classes  | VARCHAR | STRING  | The content of class in tag that associated with the outbound link.  | 
| outbound\$1link\$1domain  |  VARCHAR  | STRING  | The domain of href in tag that associated with the outbound link.  | 
| outbound\$1link\$1id  | VARCHAR  | STRING  | The content of id in tag that associated with the outbound link.  | 
| outbound\$1link\$1url  | VARCHAR  | STRING  | The content of href in tag that associated with the outbound link.  | 
| outbound\$1link  | BOOLEAN  | BOOLEAN  | Whether the link is outbound link or not.  | 
| user\$1engagement\$1time\$1msec  | BIGINT  | BIGINT  | The user engagement duration in milliseconds.  | 
| user\$1id  | VARCHAR  | STRING  | The unique ID assigned to a user through setUserId() API.  | 
| user\$1pseudo\$1id  | VARCHAR  | STRING  | The pseudonymous id generated by SDK for the user.  | 
| session\$1id  | VARCHAR  | STRING  | The session id associated with the event.  | 
| session\$1start\$1time\$1msec  | BIGINT  | BIGINT  | The start time in UNIX timestamp of the session.  | 
| session\$1duration  | BIGINT  | BIGINT  | The duration the session lasts, in milliseconds.  | 
| session\$1number  | BIGINT  | BIGINT  | Number of the sessions generated from the client.  | 
| scroll\$1engagement\$1time\$1msec  | BIGINT  | BIGINT  | The engagement time on the web page until user scroll.  | 
| sdk\$1error\$1code  | VARCHAR  | STRING  | The error code generated by SDK when an event is invalid in some way.  | 
| sdk\$1error\$1message  | VARCHAR  | STRING  | The error message generated by SDK an event is invalid in some way.  | 
| sdk\$1version  | VARCHAR  | STRING  | The version of the SDK.  | 
| sdk\$1name  | VARCHAR  | STRING  | The name of the SDK.  | 
| app\$1exception\$1message  | VARCHAR  | STRING  | The exception message when the app crashes or throws an exception.  | 
| app\$1exception\$1stack  | VARCHAR  | STRING  | The exception stack trace when the app crashes or throws an exception.  | 
| custom\$1parameters\$1json\$1str  | VARCHAR  | STRING  | All the custom event parameters stored in key-value pairs.  | 
| custom\$1parameters | SUPER  | MAP  | All the custom event parameters stored in key-value pairs.  | 
|  process\$1info  | SUPER  | MAP  | Store information about the data processing. | 
| created\$1time | TIMESTAMP | TIMESTAMP | Store information about the data processing.  | 

### User table fields
User table fields


|  **Field Name**  |  **Data Type - Redshift**  |  **Data Type - Athena**  |  **Description**  | 
| --- | --- | --- | --- | 
|  event\$1timestamp  |  BIGINT  |  STRING  |  The timestamp of when the user attributes was collected.  | 
|  user\$1id  |  VARCHAR  |  STRING  |  The unique ID assigned to a user through setUserId() API.  | 
|  user\$1pseudo\$1id  |  VARCHAR  |  STRING  |  The pseudonymous id generated by SDK for the user.  | 
|  user\$1properties  |  SUPER  |  ARRAY  |  Properties of the user.  | 
| user\$1properties\$1json\$1str | VARCHAR | STRING | Properties of the user. | 
| first\$1touch\$1timestamp | BIGINT | BIGINT | The time (in microseconds) at which the user first opened the app or visited the site. | 
| first\$1visit\$1date  |  Date  |  Date  |  Date of the user's first visit. | 
| first\$1referer  |  VARCHAR  |  STRING  |  The first referer detected for the user. | 
| first\$1traffic\$1source |  VARCHAR  |  STRING  | The the network source that acquired the user that was first detected for the user, e.g., Google, Baidu  | 
| first\$1traffic\$1source\$1medium  |  VARCHAR  |  STRING  | The medium of the network source that acquired the user that was first detected for the user, e.g., paid search, organic search, email, etc | 
| first\$1traffic\$1source\$1campaign  | VARCHAR | STRING | The name of the marketing campaign that acquired the user that was first detected for the user. | 
| first\$1traffic\$1source\$1content  | VARCHAR | STRING | The marketing campaign content that acquired the user that was first detected for the user. | 
| first\$1traffic\$1source\$1term  | VARCHAR | STRING | The keyword of the marketing ads that acquired the user that was first detected for the user. | 
| first\$1traffic\$1source\$1campaign\$1id  | VARCHAR | STRING | The id of the marketing campaign that acquired the user that was first detected for the user. | 
| first\$1traffic\$1source\$1clid\$1platform  | VARCHAR | STRING | The click id platform of the marketing campaign that acquired the user that was first detected for the user.  | 
| first\$1traffic\$1source\$1clid  | VARCHAR | STRING | The click id of the marketing campaign that acquired the user that was first detected for the user.  | 
| first\$1traffic\$1source\$1channel\$1group first\$1traffic\$1source\$1category | VARCHAR | STRING | The channel group of the traffic source that acquired the user that was first detected for the user.  | 
|  first\$1app\$1install\$1source  | VARCHAR | STRING | The source category (i.e., Search, Social, Video, Shopping) based on the traffic source that acquired the user for the first time.  | 
| process\$1info  | SUPER  | MAP  | The install channel for the user, e.g., Google Play Store information about the data processing. | 
| created\$1time  | TIMESTAMP | TIMESTAMP |  Store information about the data processing. | 

### Session table fields
User table fields


|  **Field Name**  |  **Data Type - Redshift**  |  **Data Type - Athena**  |  **Description**  | 
| --- | --- | --- | --- | 
| event\$1timestamp  | TIMESTAMP  | STRING | The timestamp of when the event occurred.  | 
| user\$1pseudo\$1id  | VARCHAR | STRING | The pseudonymous ID generated by the SDK for the user. | 
| session\$1id  | VARCHAR | STRING | The ID assigned to a session. | 
| user\$1id  | VARCHAR | STRING | The unique ID assigned to a user through the setUserId() API.  | 
| session\$1number  | BIGINT | INT | The sequence number of the session in the client. | 
| session\$1start\$1time\$1msec  | BIGINT | BIGINT | The start time of the session in milliseconds.  | 
| session\$1source  | VARCHAR | STRING  | The traffic source of the session. | 
| session\$1medium  | VARCHAR | STRING  | The traffic source medium of the session. | 
| session\$1campaign  | VARCHAR | STRING  | The traffic source campaign of the session. | 
| session\$1content  | VARCHAR | STRING  | The traffic source content of the session. | 
| session\$1term  | VARCHAR | STRING | The traffic source term of the session.  | 
| session\$1campaign\$1id  | VARCHAR | STRING | The traffic source campaign ID of the session. | 
| session\$1clid\$1platform  | VARCHAR | STRING | The platform of the CLID (Click ID) of the session. | 
| session\$1clid  | VARCHAR | STRING | The CLID (Click ID) of the session. | 
| session\$1channel\$1group  | VARCHAR | STRING | The traffic source channel group of the session.  | 
| session\$1source\$1category  | VARCHAR | STRING | The traffic source category of the session source.  | 
| process\$1info  | SUPER | MAP | Additional data processing information. | 
| created\$1time  | TIMESTAMP | STRING  | The timestamp of when the session data was created. | 

### Item table fields
Item table fields


|  **Field Name**  |  **Data Type - Redshift**  |  **Data Type - Athena**  |  **Description**  | 
| --- | --- | --- | --- | 
|  event\$1timestamp  | TIMESTAMP | STRING | The timestamp of when the event occurred. | 
| event\$1id  | VARCHAR  | STRING  |  The ID of the event.  | 
| event\$1name  | VARCHAR | STRING | The name of the event. | 
| platform | VARCHAR | STRING | The platform associated with the event. | 
| user\$1pseudo\$1id | VARCHAR | STRING | The pseudonymous ID generated by the SDK for the user. | 
| user\$1id | VARCHAR | STRING | The unique ID assigned to a user through the setUserId() API. | 
| item\$1id | VARCHAR | STRING | The ID of the item. | 
| name | VARCHAR | STRING | The name of the item. | 
| brand | VARCHAR | STRING | The brand of the item.  | 
| currency | VARCHAR | STRING | The currency associated with the item price. | 
| price | DOUBLE PRECISION  | DOUBLE | The price of the item.  | 
| quantity | DOUBLE PRECISION  | DOUBLE | The quantity of the item in the event. | 
| creative\$1name | VARCHAR | STRING | The name of the creative associated with the item. | 
| creative\$1slot | VARCHAR | STRING | The slot of the creative associated with the item. | 
| location\$1id  | VARCHAR | STRING | The ID of the location associated with the item.  | 
| category | VARCHAR | STRING | The category of the item. | 
| category2 | VARCHAR | STRING | The second category of the item.  | 
| category3 | VARCHAR | STRING | The third category of the item.  | 
| category4 | VARCHAR | STRING | The fourth category of the item.  | 
| category5 | VARCHAR | STRING | The fifth category of the item. | 
| custom\$1parameters\$1json\$1str | VARCHAR | STRING | The JSON string representation of custom parameters.  | 
| custom\$1parameters  | SUPER  | MAP  | Additional custom parameters.  | 
| process\$1info | SUPER  | MAP  | Additional process information. | 
| created\$1time | TIMESTAMP | STRING | The timestamp of when the item data was created. | 

# Execution parameters
Execution parameters

 Execution parameters control how the transformation and enrichment jobs are orchestrated. 

## Parameters
Parameters

 You can configure the following **Execution parameters** after you turn on **Enable data processing**. 


|  **Parameter**  |  **Description**  |  **Values**  | 
| --- | --- | --- | 
|  Data processing interval/Fixed Rate  |  Specify the interval to batch the data for data processing by fixed rate  |  1 hour 12 hours 1 day  | 
|  Data processing interval/Cron Expression  |  Specify the interval to batch the data for data processing by cron expression  |  cron(0 \$1 \$1 ? \$1)  cron(0 0,12 \$1 ? \$1) cron(0 0 \$1 ? \$1)   | 
|  Event freshness  |  Specify the days after which the guidance will ignore the event data. For example, if you specify 3 days for this parameter, the guidance will ignore any event which arrived more than 3 days after the events are triggered  |  3 days 5 days 30 days   | 

## Cron expression syntax
Cron expression syntax

 Syntax 

 cron(minutes hours day-of-month month day-of-week year) 

 For more information, refer to [Cron-based schedules](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html?icmpid=docs_console_unmapped#cron-based). 

## Config Spark job parameters
Config Spark job parameters

 By default, the Clickstream pipeline automatically adjusts EMR job parameters based on the dataset volume that requires processing. In most of time, you do not need to adjust the EMR job parameters, but if you want to override the EMR job parameters, you can put `spark-config.json` file in S3 bucket to set your job parameters. 

 To add your customized the EMR job parameters, you can add a file `s3://{PipelineS3Bucket}/{PipelineS3Prefix}{ProjectId}/config/spark-config.json` in the S3 bucket. 

 Please replace `{PipelineS3Bucket}`, `{PipelineS3Prefix}`, and `{ProjectId}` with the values of your data pipeline. These values are found in the `Clickstream-DataProcessing-<uuid>` stack's `Parameters`. 

 Also, you can get these values by running the below commands, 

```
stackNames=$(aws cloudformation list-stacks --stack-status-filter CREATE_COMPLETE UPDATE_COMPLETE --no-paginate  | jq -r '.StackSummaries[].StackName' | grep  Clickstream-DataProcessing  | grep -v Nested)

echo -e "$stackNames" | while read stackName; do
    aws cloudformation describe-stacks --stack-name $stackName  | jq '.Stacks[].Parameters' | jq 'map(select(.ParameterKey == "PipelineS3Bucket" or .ParameterKey == "PipelineS3Prefix" or .ParameterKey == "ProjectId"))'
done
```

 Here is an example of the file `spark-config.json`: 

```
{
   "sparkConfig": [
        "spark.emr-serverless.executor.disk=200g",
        "spark.executor.instances=16",
        "spark.dynamicAllocation.initialExecutors=16",
        "spark.executor.memory=100g",
        "spark.executor.cores=16",
        "spark.network.timeout=10000000",
        "spark.executor.heartbeatInterval=10000000",
        "spark.shuffle.registration.timeout=120000",
        "spark.shuffle.registration.maxAttempts=5",
        "spark.shuffle.file.buffer=2m",
        "spark.shuffle.unsafe.file.output.buffer=1m"
    ],
    "inputRePartitions": 2000
}
```

 Please make sure your account has enough emr-serverless quotas, you can view the quotas via emr-serverless-quotas in the Region us-east-1. For more configurations, please refer to [Spark job properties](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/jobs-spark.html#spark-defaults) and application [worker config](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/app-behavior.html#worker-configs). 

# Processing plugin
Processing plugin

 There are two types of plugins: **transformer** and **enrichment**. You can choose to have only one **transformer**, and zero or multiple **enrichment**. 

## Built-in plugins
Built-in plugins

 Below plugins are provided by Clickstream Analytics on AWS. 


|  **Plugin name**  |  **Type**  |  **Description**  | 
| --- | --- | --- | 
|  UAEnrichment  |  enrichment  |  User-agent enrichment, use ua\$1parser Java library to enrich User-Agent in the HTTP header to ua\$1browser,ua\$1browser\$1version,ua\$1os,ua\$1os\$1version,ua\$1device  | 
|  IpEnrichment  |  enrichment  |  IP address enrichment, use GeoLite2 data by MaxMind to enrich IP to city, continent, country  | 

 The UAEnrichment uses [UA Parser](https://mvnrepository.com/artifact/ua_parser/ua-parser) to parse user-agent in Http header 

 The IpEnrichment plugin uses [GeoLite2-City data](https://cdn.jsdelivr.net/npm/geolite2-city@1.0.0/GeoLite2-City.mmdb.gz) created by MaxMind, available from [https://www.maxmind.com](https://www.maxmind.com/) 

## Custom plugins
Custom plugins

 You can add custom plugins to transform raw event data or enrich the data for your need. 

**Note**  
 To add custom plugins, you must develop your own plugins firstly, see [Develop Custom Plugins](#develop-custom-plugins). 

 To add your plugins, choose **Add Plugin**, which will open a new window, in which you can upload your plugins. 

1.  Enter the plugin **Name** and **Description** 

1.  Choose **Plugin Type** 

1.  **Enrichment**: Plugin to add fields into event data collected by SDK (both Clickstream SDK or third-party SDK) 

1.  **Transformation**: A plugin used to transform a third-party SDK’s raw data into guidance built-in schema 

1.  Upload plugin java JAR file 

1.  (Optional) Upload the dependency files if any 

1.  **Main function class**: fill the full class name of your plugin class name, e.g. com.company.sol.CustomTransformer 

## Develop Custom Plugins
Develop Custom Plugins

 The simplest way to develop custom plugins is making changes based on our example project. 

1.  Clone/Fork the example project. 

   ```
   git clone https://github.com/aws-solutions-library-samples/guidance-for-clickstream-analytics-on-aws.git
   cd examples/custom-plugins
   ```
   +  For enrichment plugin, please refer to the example: `custom-enrich/ `
   +  For transformer plugin, please refer to the example: `custom-sdk-transformer/ `

1.  Change packages and classes name as you desired. 

1.  Implement the method `public Dataset<row> transform(Dataset<row> dataset)` to do transformation or enrichment. 

1.  (Optional) Write test code. 

1.  Run gradle to package code to jar `./gradlew clean build`. 

1.  Get the jar file in build output directory `./build/libs/`. 

# Data modeling
Data modeling

 Once the data pipeline processes the event data, you can load the data into an analytics engine for data modeling, such as Redshift or Athena, where data will be aggregated and organized into different views (such as event, device, session), as well as calculated metrics that are commonly used. Below are the preset data views this guidance provides if you choose to enable data modeling module. 

You can choose to use Redshift or Athena, or both. 

**Note**  
We recommended you select both, that is, using Redshift for hot data modeling and using Athena for all-time data analysis. 

 You can set below configurations for Redshift. 
+  **Redshift Mode**: Select Redshift serverless or provisioned mode. 
  +  **Serverless mode** 
    +  **Base RPU**: RPU stands for Redshift Processing Unit. Amazon Redshift Serverless measures data warehouse capacity in RPUs, which are resources used to handle workloads. The base capacity specifies the base data warehouse capacity Amazon Redshift uses to serve queries and is specified in RPUs. Setting higher base capacity improves query performance, especially for data processing jobs that consume a lot of resources. 
    +  **VPC**: A virtual private cloud (VPC) based on the Amazon VPC service is your private, logically isolated network in the AWS Cloud. 
**Note**  
If you place the cluster within the isolated subnets, the VPC must have VPC endpoints for S3, Logs, Dynamodb, STS, States, Redshift and Redshift-data service. 
    +  **Security Group**: This VPC security group defines which subnets and IP ranges can access the endpoint of Redshift cluster. 
    +  **Subnets**: Select at least three existing VPC subnets. 
**Note**  
  We recommend using private subnets to deploy in accordance with the security best practices. 
+  **Provisioned mode**
  + **Redshift Cluster**: With a provisioned Amazon Redshift cluster, you build a cluster with node types that meet your cost and performance specifications. You have to set up, tune, and manage Amazon Redshift provisioned clusters.
  + **Database user**: The guidance requires permissions to access and create database in Redshift cluster. By default, it grants Redshift Data API with the permissions of the admin user to execute the commands to create DB, tables, and views, as well as loading data.
+  **Data range**: Considering the cost performance issue of having Redshift to save all the data, we recommend that Redshift save hot data and that all data are stored in S3. It is necessary to delete expired data in Redshift on a regular basis. 
+  **Athena**: Choose Athena to query all data on S3 using the table created in the AWS Glue Data Catalog. 

# Reporting
Reporting

Once the data are processed and modeled by the data pipeline, you can enable the Analytics Studio for the pipeline, which will allow the guidance create out-of-the-box dashboards in QuickSight, provide advanced analytics model for user to query their clickstream data, and data management functionalities.

**Note**  
 To enable this module, your AWS account needs to have subscription in QuickSight. If it hasn't, please follow this [sign up for Quick](https://docs.aws.amazon.com/quicksight/latest/user/signing-up.html) to create a subscription first. 

You need to make the following configuration for Reporting.
+ **Create sample dashboard in QuickSight**: Enabling this feature allows the guidance to create sample dashboards in your QuickSight account.
+ **QuickSight user**: Select an admin user for the guidance to create QuickSight resources. (Only required for AWS China Regions)

# App registration
App registration

After the data pipeline transitions to an **Active** state, register your application(s) with the pipeline to enable the reception of clickstream data. After the application is added, you can integrate the SDK into your application to send data to the pipeline. Complete the following steps to register an application to a data pipeline.

**Steps**

1. Sign in to Clickstream Analytics on AWS Console. 

1. In the left navigation pane, choose **Projects**, then select a project you want to register app, click its title, and it will bring you to the project page. 

1. Click on **\$1 Add application** to start adding application to the pipeline. 

1. Complete the form by filling in the following fields: 
   + **App name:** Provide a name for your app. 
   + **App ID: **The system will generate one ID based on the name, which you can customize if needed. 
   + **App reporting time zone:** Specify a reporting time zone for your app. This time zone is used by the preset dashboard to calculate daily metrics. 
   + **Description:** Provide a description for your app. 
   + **Android package name:** Provide a package name if your app has an Android client. 
   + **App Bundle ID: **Provide the bundle ID if your app has an iOS client. 

1. Choose **Register App & Generate SDK Instruction**, and wait for the registration to be completed. 

1. You should now see tabs for SDK integration for different platforms. Choose the relevant tab to view detailed instructions on adding the SDK to your application. Follow the provided steps to integrate the SDK. For certain platforms, you can choose the **Download the config json file** button to obtain the configuration file. 

The pipeline update with the newly added application takes approximately 3 to 5 minutes. After the pipeline status returns to **Active**, it is ready to receive data from your application. 

# Pipeline maintenance
Pipeline maintenance

 This guidance provides three features to help you manage and operate the data pipeline after it gets created. 

## Monitoring and Alarms
Monitoring and Alarms

 The guidance collects metrics from each resource in the data pipeline and creates monitoring dashboards in CloudWatch, which provides you a comprehensive view into the pipeline status. It also provides a set of alarms that will notify project owner if anything goes abnormal. 

 Following are steps to view monitoring dashboards and alarms. 

### Monitoring dashboards
Monitoring dashboards

 To view monitoring dashboard for a data pipeline, follows below steps: 

1.  Go to project detail page. 

1.  Choose project id or **View Details**, which will direct to the pipeline detail page. 

1.  Select the "**Monitoring**" tab. 

1.  In the tab, choose **View in CloudWatch**, which will direct you to the monitoring dashboard. 

### Alarms
Alarms

 To view alarms for a data pipeline, follows below steps: 

1.  Go to project detail page. 

1.  Choose project id or **View Details**, which will direct to the pipeline detail page. 

1.  Select the "**Alarms**" tab. 

1.  In the tab, you can view all the alarms. You can also choose **View in CloudWatch**, which will direct you to CloudWatch alarm pages to view alarm details. 

1.  You can also enable or disable an alarm by selecting the alarm then choosing **Enable** or **Disable**. 

## Pipeline modification
Pipeline modification

 You are able to modify some configuration the data pipeline after it created, follow below steps to update a pipeline. 

1.  Go to project detail page. 

1.  Choose project id or **View Details**, which will direct to the pipeline detail page. 

1.  In the project details page, choose **Edit**, which will bring you to the pipeline creation wizard page. Note that some configuration are in disable mode, which means they cannot be updated after creation. 

1.  If needed, update those configuration options which are editable. 

1.  After editing the configuration, choose **Next** until you reach last page, and choose **Save**. 

 You will see pipeline is in Updating status. 

## Pipeline upgrade
Pipeline upgrade

For detailed procedure, see [upgrade the guidance](upgrade-the-solution.md).