

# Using Lambda with Amazon MSK
<a name="with-msk"></a>

[Amazon Managed Streaming for Apache Kafka (Amazon MSK)](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) is a fully-managed service that you can use to build and run applications that use Apache Kafka to process streaming data. Amazon MSK simplifies the setup, scaling, and management of Kafka clusters. Amazon MSK also makes it easier to configure your application for multiple Availability Zones and for security with AWS Identity and Access Management (IAM).

This chapter explains how to use an Amazon MSK cluster as an event source for your Lambda function. The general process for integrating Amazon MSK with Lambda involves the following steps:

1. **[Cluster and network setup](with-msk-cluster-network.md)** – First, set up your [Amazon MSK cluster](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html). This includes the correct networking configuration to allow Lambda to access your cluster.

1. **[Event source mapping setup](with-msk-configure.md)** – Then, create the [event source mapping](invocation-eventsourcemapping.md) resource that Lambda needs to securely connect your Amazon MSK cluster to your function.

1. **[Function and permissions setup](with-msk-permissions.md)** – Finally, ensure that your function is correctly set up, and has the necessary permissions in its [execution role](lambda-intro-execution-role.md).

**Note**  
You can now create and manage your Amazon MSK event source mappings directly from either the Lambda or the Amazon MSK console. Both consoles offer the option to automatically handle the setup of the necessary Lambda execution role permissions for a more streamlined configuration process.

For examples on how to set up a Lambda integration with an Amazon MSK cluster, see [Tutorial: Using an Amazon MSK event source mapping to invoke a Lambda function](services-msk-tutorial.md), [Using Amazon MSK as an event source for AWS Lambda](https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/) on the AWS Compute Blog, and [ Amazon MSK Lambda Integration](https://amazonmsk-labs.workshop.aws/en/msklambda.html) in the Amazon MSK Labs.

**Topics**
+ [

## Example event
](#msk-sample-event)
+ [

# Configuring your Amazon MSK cluster and Amazon VPC network for Lambda
](with-msk-cluster-network.md)
+ [

# Configuring Lambda permissions for Amazon MSK event source mappings
](with-msk-permissions.md)
+ [

# Configuring Amazon MSK event sources for Lambda
](with-msk-configure.md)
+ [

# Tutorial: Using an Amazon MSK event source mapping to invoke a Lambda function
](services-msk-tutorial.md)

## Example event
<a name="msk-sample-event"></a>

Lambda sends the batch of messages in the event parameter when it invokes your function. The event payload contains an array of messages. Each array item contains details of the Amazon MSK topic and partition identifier, together with a timestamp and a base64-encoded message.

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

# Configuring your Amazon MSK cluster and Amazon VPC network for Lambda
<a name="with-msk-cluster-network"></a>

To connect your AWS Lambda function to your Amazon MSK cluster, you need to correctly configure your cluster and the [Amazon Virtual Private Cloud (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html) it resides in. This page describes how to configure your cluster and VPC. If your cluster and VPC are already configured properly, see [Configuring Amazon MSK event sources for Lambda](with-msk-configure.md) to configure the event source mapping.

**Topics**
+ [

## Overview of network configuration requirements for Lambda and MSK integrations
](#msk-network-requirements)
+ [

## Configuring a NAT gateway for an MSK event source
](#msk-nat-gateway)
+ [

## Configuring AWS PrivateLink endpoints for an MSK event source
](#msk-vpc-privatelink)

## Overview of network configuration requirements for Lambda and MSK integrations
<a name="msk-network-requirements"></a>

The networking configuration required for a Lambda and MSK integration depends on the network architecture of your application. There are three main resources involved in this integration: the Amazon MSK cluster, the Lambda function, and the Lambda event source mapping. Each of these resources resides in a different VPC:
+ Your Amazon MSK cluster typically resides in a private subnet of a VPC that you manage.
+ Your Lambda function resides in an AWS-managed VPC owned by Lambda.
+ Your Lambda event source mapping resides in another AWS-managed VPC owned by Lambda, separate from the VPC that contains your function.

The [event source mapping](invocation-eventsourcemapping.md) is the intermediary resource between the MSK cluster and the Lambda function. The event source mapping has two primary jobs. First, it polls your MSK cluster for new messages. Then, it invokes your Lambda function with those messages. Since these three resources are in different VPCs, both the poll and invoke operations require cross-VPC network calls.

The network configuration requirements for your event source mapping depends on whether it uses [provisioned mode](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode) or on-demand mode, as shown in the following diagram:

![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/MSK-esm-network-overview.png)


The way that the Lambda event source mapping polls your MSK cluster for new messages is the same in both modes. To establish a connection between your event source mapping and your MSK cluster, Lambda creates a [hyperplane ENI](configuration-vpc.md#configuration-vpc-enis) (or reuses an existing one, if available) in your private subnet to establish a secure connection. As illustrated in the diagram, this hyperplane ENI uses the subnet and security group configuration of your MSK cluster, not your Lambda function.

After polling the message from the cluster, the way Lambda invokes your function is different in each mode:
+ In provisioned mode, Lambda automatically handles the connection between the event source mapping VPC and the function VPC. So, you don’t need any additional networking components to successfully invoke your function.
+ In on-demand mode, your Lambda event source mapping invokes your function via a path through your customer-managed VPC. Because of this, you need to configure either a [NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) in the public subnet of your VPC, or [AWS PrivateLink](https://docs.aws.amazon.com/vpc/latest/privatelink/what-is-privatelink.html) endpoints in the private subnet of the VPC that provide access to Lambda, [AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html), and optionally, [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html). Correctly configuring either one of these options allows a connection between your VPC and the Lambda-managed runtime VPC, which is necessary to invoke your function.

A NAT gateway allows resources in your private subnet to access the public internet. Using this configuration means your traffic traverses the internet before invoking the Lambda function. AWS PrivateLink endpoints allow private subnets to securely connect to AWS services or other private VPC resources without traversing the public internet. See [Configuring a NAT gateway for an MSK event source](#msk-nat-gateway) or [Configuring AWS PrivateLink endpoints for an MSK event source](#msk-vpc-privatelink) for details on how to configure these resources.

So far, we’ve assumed that your MSK cluster resides in a private subnet within your VPC, which is the more common case. However, even if your MSK cluster is in a public subnet within your VPC, you must configure AWS PrivateLink endpoints to enable a secure connection. The following table summarizes the networking configuration requirements based on how you configure your MSK cluster and Lambda event source mapping:


| MSK cluster location (in customer-managed VPC) | Lambda event source mapping scaling mode | Required networking configuration | 
| --- | --- | --- | 
|  Private subnet  |  On-demand mode  |  NAT gateway (in your VPC's public subnet), or AWS PrivateLink endpoints (in your VPC's private subnet) to enable access to Lambda, AWS STS, and optionally, Secrets Manager.  | 
|  Public subnet  |  On-demand mode  |  AWS PrivateLink endpoints (in your VPC's public subnet) to enable access to Lambda, AWS STS, and optionally, Secrets Manager.  | 
|  Private subnet  |  Provisioned mode  |  None  | 
|  Public subnet  |  Provisioned mode  |  None  | 

In addition, the security groups associated with your MSK cluster must allow traffic over the correct ports. Ensure that you have the following security group rules configured:
+ **Inbound rules** – Allow all traffic on the default broker port. The port that MSK uses depends on the type of authentication on the cluster: `9098` for IAM authentication, `9096` for SASL/SCRAM, and `9094` for TLS. Alternatively, you can use a self-referencing security group rule to allow access from instances within the same security group.
+ **Outbound rules** – Allow all traffic on port `443` for external destinations if your function needs to communicate with other AWS services. Alternatively, you can use a self-referencing security group rule to limit access to the broker if you don’t need to communicate with other AWS services.
+ **Amazon VPC endpoint inbound rules** – If you’re using an Amazon VPC endpoint, the security group associated with the endpoint must allow inbound traffic on port `443` from the cluster’s security group.

## Configuring a NAT gateway for an MSK event source
<a name="msk-nat-gateway"></a>

You can configure a NAT gateway to allow your event source mapping to poll messages from your cluster, and invoke the function via a path through your VPC. This is required only if your event source mapping uses on-demand mode, and your cluster resides within a private subnet of your VPC. If your cluster resides in a public subnet of your VPC, or your event source mapping uses provisioned mode, you don’t need to configure a NAT gateway.

A [NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html) allows resources in a private subnet to access the public internet. If you need private connectivity to Lambda, see [Configuring AWS PrivateLink endpoints for an MSK event source](#msk-vpc-privatelink) instead.

After you configure your NAT gateway, you must configure the appropriate route tables. This allows traffic from your private subnet to route to the public internet via the NAT gateway.

![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/MSK-NAT-Gateway.png)


The following steps guide you through configuring a NAT gateway using the console. Repeat these steps as necessary for each Availability Zone (AZ).

**To configure a NAT gateway and proper routing (console)**

1. Follow the steps in [ Create a NAT gateway](https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-working-with.html), noting the following:
   + NAT gateways should always reside in a public subnet. Create NAT gateways with [public connectivity](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html).
   + If your MSK cluster is replicated across multiple AZs, create one NAT gateway per AZ. For example, in each AZ, your VPC should have one private subnet containing your cluster, and one public subnet containing your NAT gateway. For a setup with three AZs, you’ll have three private subnets, three public subnets, and three NAT gateways.

1. After you create your NAT gateway, open the [Amazon VPC console](https://console.aws.amazon.com/vpc/) and choose **Route tables** in the left menu.

1. Choose **Create route table**.

1. Associate this route table with the VPC that contains your MSK cluster. Optionally, enter a name for your route table.

1. Choose **Create route table**.

1. Choose the route table you just created.

1. Under the **Subnet associations** tab, choose **Edit subnet associations**.
   + Associate this route table with the private subnet that contains your MSK cluster.

1. Choose **Edit routes**.

1. Choose **Add route**:

   1. For **Destination**, choose `0.0.0.0/0`.

   1. For **Target**, choose **NAT gateway**.

   1. In the search box, choose the NAT gateway you created in step 1. This should be the NAT gateway in the same AZ as the private subnet that contains your MSK cluster (the private subnet that you associated with this route table in step 6).

1. Choose **Save changes**.

## Configuring AWS PrivateLink endpoints for an MSK event source
<a name="msk-vpc-privatelink"></a>

You can configure AWS PrivateLink endpoints to poll messages from your cluster, and invoke the function via a path through your VPC. These endpoints should allow your MSK cluster to access the following:
+ The Lambda service
+ The [AWS Security Token Service (STS)](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html)
+ Optionally, the [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) service. This is required if the secret required for cluster authentication is stored in Secrets Manager.

Configuring PrivateLink endpoints is required only if your event source mapping uses on-demand mode. If your event source mapping uses provisioned mode, Lambda establishes the required connections for you.

PrivateLink endpoints allow secure, private access to AWS services over AWS PrivateLink. Alternatively, to configure a NAT gateway to give your MSK cluster access to the public internet, see [Configuring a NAT gateway for an MSK event source](#msk-nat-gateway).

After you configure your VPC endpoints, your MSK cluster should have direct and private access to Lambda, STS, and optionally, Secrets Manager.

![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/MSK-PrivateLink-Endpoints.png)


The following steps guide you through configuring a PrivateLink endpoint using the console. Repeat these steps as necessary for each endpoint (Lambda, STS, Secrets Manager).

**To configure VPC PrivateLink endpoints (console)**

1. Open the [Amazon VPC console](https://console.aws.amazon.com/vpc/) and choose **Endpoints** in the left menu.

1. Choose **Create endpoint**.

1. Optionally, enter a name for your endpoint.

1. For **Type**, choose **AWS services**.

1. Under **Services**, start typing the name of the service. For example, to create an endpoint to connect to Lambda, type `lambda` in the search box.

1. In the results, you should see the service endpoint in the current region. For example, in the US East (N. Virginia) region, you should see `com.amazonaws.us-east-2.lambda`. Select this service.

1. Under **Network settings**, select the VPC that contains your MSK cluster.

1. Under **Subnets**, select the AZs that your MSK cluster is in.
   + For each AZ, under **Subnet ID**, choose the private subnet that contains your MSK cluster.

1. Under **Security groups**, select the security groups associated with your MSK cluster.

1. Choose **Create endpoint**.

By default, Amazon VPC endpoints have open IAM policies that allow broad access to resources. Best practice is to restrict these policies to perform the needed actions using that endpoint. For example, for your Secrets Manager endpoint, you can modify its policy such that it allows only your function’s execution role to access the secret.

**Example VPC endpoint policy – Secrets Manager endpoint**  

```
{
    "Statement": [
        {
            "Action": "secretsmanager:GetSecretValue",
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws::iam::123456789012:role/my-role"
                ]
            },
            "Resource": "arn:aws::secretsmanager:us-west-2:123456789012:secret:my-secret"
        }
    ]
}
```

For the AWS STS and Lambda endpoints, you can restrict the calling principal to the Lambda service principal. However, ensure that you use `"Resource": "*"` in these policies.

**Example VPC endpoint policy – AWS STS endpoint**  

```
{
    "Statement": [
        {
            "Action": "sts:AssumeRole",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

**Example VPC endpoint policy – Lambda endpoint**  

```
{
    "Statement": [
        {
            "Action": "lambda:InvokeFunction",
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "lambda.amazonaws.com"
                ]
            },
            "Resource": "*"
        }
    ]
}
```

# Configuring Lambda permissions for Amazon MSK event source mappings
<a name="with-msk-permissions"></a>

To access the Amazon MSK cluster, your function and event source mapping need permissions to perform various Amazon MSK API actions. Add these permissions to the function's [execution role](lambda-intro-execution-role.md). If your users need access, add the required permissions to the identity policy for the user or role.

The [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) managed policy contains the minimum required permissions for Amazon MSK Lambda event source mappings. To simplify the permissions process, you can:
+ Attach the [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) managed policy to your execution role.
+ Let the Lambda console generate the permissions for you. When you [create an Amazon MSK event source mapping in the console](msk-esm-create.md#msk-console), Lambda evaluates your execution role and alerts you if any permissions are missing. Choose **Generate permissions** to automatically update your execution role. This doesn't work if you manually created or modified your execution role policies, or if the policies are attached to multiple roles. Note that additional permissions may still be required in your execution role when using advanced features such as [On-Failure Destination](kafka-on-failure.md) or [AWS Glue Schema Registry](services-consume-kafka-events.md).

**Topics**
+ [

## Required permissions
](#msk-required-permissions)
+ [

## Optional permissions
](#msk-optional-permissions)

## Required permissions
<a name="msk-required-permissions"></a>

Your Lambda function execution role must have the following required permissions for Amazon MSK event source mappings. These permissions are included in the [AWSLambdaMSKExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaMSKExecutionRole.html) managed policy.

### CloudWatch Logs permissions
<a name="msk-basic-permissions"></a>

The following permissions allow Lambda to create and store logs in Amazon CloudWatch Logs.
+ [logs:CreateLogGroup](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html)
+ [logs:CreateLogStream](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html)
+ [logs:PutLogEvents](https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)

### MSK cluster permissions
<a name="msk-cluster-permissions"></a>

The following permissions allow Lambda to access your Amazon MSK cluster on your behalf:
+ [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html)
+ [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html)
+ [kafka:GetBootstrapBrokers](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-bootstrap-brokers.html)

We recommend using [kafka:DescribeClusterV2](https://docs.aws.amazon.com/MSK/2.0/APIReference/v2-clusters-clusterarn.html) instead of [kafka:DescribeCluster](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn.html). The v2 permission works with both provisioned and serverless Amazon MSK clusters. You only need one of these permissions in your policy.

### VPC permissions
<a name="msk-vpc-permissions"></a>

The following permissions allow Lambda to create and manage network interfaces when connecting to your Amazon MSK cluster:
+ [ec2:CreateNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_CreateNetworkInterface.html)
+ [ec2:DescribeNetworkInterfaces](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeNetworkInterfaces.html)
+ [ec2:DescribeVpcs](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html)
+ [ec2:DeleteNetworkInterface](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DeleteNetworkInterface.html)
+ [ec2:DescribeSubnets](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSubnets.html)
+ [ec2:DescribeSecurityGroups](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeSecurityGroups.html)

## Optional permissions
<a name="msk-optional-permissions"></a>

 Your Lambda function might also need permissions to: 
+ Access cross-account Amazon MSK clusters. For cross-account event source mappings, you need [kafka:DescribeVpcConnection](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connection-arn.html) in the execution role. An IAM principal creating a cross-account event source mapping needs [kafka:ListVpcConnections](https://docs.aws.amazon.com/msk/1.0/apireference/vpc-connections.html).
+ Access your SCRAM secret, if you're using [SASL/SCRAM authentication](msk-cluster-auth.md#msk-sasl-scram). This lets your function use a username and password to connect to Kafka.
+ Describe your Secrets Manager secret, if you're using SASL/SCRAM or [mTLS authentication](msk-cluster-auth.md#msk-mtls). This allows your function to retrieve the credentials or certificates needed for secure connections.
+ Access your AWS KMS customer managed key, if your AWS Secrets Manager secret is encrypted with an AWS KMS customer managed key.
+ Access your schema registry secrets, if you're using a schema registry with authentication:
  + For AWS Glue Schema Registry: Your function needs `glue:GetRegistry` and `glue:GetSchemaVersion` permissions. These allow your function to look up and use the message format rules stored in AWS Glue.
  + For [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/security/index.html) with `BASIC_AUTH` or `CLIENT_CERTIFICATE_TLS_AUTH`: Your function needs `secretsmanager:GetSecretValue` permission for the secret containing the authentication credentials. This lets your function retrieve the username/password or certificates needed to access the Confluent Schema Registry.
  + For private CA certificates: Your function needs secretsmanager:GetSecretValue permission for the secret containing the certificate. This allows your function to verify the identity of schema registries that use custom certificates.
+ Access Kafka cluster consumer groups and poll messages from the topic, if you're using IAM authentication for the event source mapping.

 These correspond to the following required permissions: 
+ [kafka:ListScramSecrets](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html) - Allows listing of SCRAM secrets for Kafka authentication
+ [secretsmanager:GetSecretValue](https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html) - Enables retrieval of secrets from Secrets Manager
+ [kms:Decrypt](https://docs.aws.amazon.com/kms/latest/APIReference/API_Decrypt.html) - Permits decryption of encrypted data using AWS KMS
+ [glue:GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) - Allows access to AWS Glue Schema Registry
+ [glue:GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) - Enables retrieval of specific schema versions from AWS Glue Schema Registry
+ [kafka-cluster:Connect](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Grants permission to connect and authenticate to the cluster
+ [kafka-cluster:AlterGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Grants permission to join groups on a cluster, equivalent to Apache Kafka's READ GROUP ACL
+ [kafka-cluster:DescribeGroup](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Grants permission to describe groups on a cluster, equivalent to Apache Kafka's DESCRIBE GROUP ACL
+ [kafka-cluster:DescribeTopic](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Grants permission to describe topics on a cluster, equivalent to Apache Kafka's DESCRIBE TOPIC ACL
+ [kafka-cluster:ReadData](https://docs.aws.amazon.com/service-authorization/latest/reference/list_apachekafkaapisforamazonmskclusters.html) - Grants permission to read data from topics on a cluster, equivalent to Apache Kafka's READ TOPIC ACL

 Additionally, if you want to send records of failed invocations to an on-failure destination, you'll need the following permissions depending on the destination type: 
+ For Amazon SQS destinations: [sqs:SendMessage](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html) - Allows sending messages to an Amazon SQS queue
+ For Amazon SNS destinations: [sns:Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html) - Permits publishing messages to an Amazon SNS topic
+ For Amazon S3 bucket destinations: [s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) and [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBucket.html) - Enables writing and listing objects in an Amazon S3 bucket

For troubleshooting authentication and authorization errors, see [Troubleshooting Kafka event source mapping errors](with-kafka-troubleshoot.md).

# Configuring Amazon MSK event sources for Lambda
<a name="with-msk-configure"></a>

To use an Amazon MSK cluster as an event source for your Lambda function, you create an [event source mapping](invocation-eventsourcemapping.md) that connects the two resources. This page describes how to create an event source mapping for Amazon MSK.

This page assumes that you've already properly configured your MSK cluster and the [Amazon Virtual Private Cloud (VPC)](https://docs.aws.amazon.com/vpc/latest/userguide/what-is-amazon-vpc.html) it resides in. If you need to set up your cluster or VPC, see [Configuring your Amazon MSK cluster and Amazon VPC network for Lambda](with-msk-cluster-network.md). To configure retry behavior for error handling, see [Configuring error handling controls for Kafka event sources](kafka-retry-configurations.md).

**Topics**
+ [

## Using an Amazon MSK cluster as an event source
](#msk-esm-overview)
+ [

# Configuring Amazon MSK cluster authentication methods in Lambda
](msk-cluster-auth.md)
+ [

# Creating a Lambda event source mapping for an Amazon MSK event source
](msk-esm-create.md)
+ [

# Creating cross-account event source mappings in Lambda
](msk-cross-account.md)
+ [

# All Amazon MSK event source configuration parameters in Lambda
](msk-esm-parameters.md)

## Using an Amazon MSK cluster as an event source
<a name="msk-esm-overview"></a>

When you add your Apache Kafka or Amazon MSK cluster as a trigger for your Lambda function, the cluster is used as an [event source](invocation-eventsourcemapping.md).

Lambda reads event data from the Kafka topics that you specify as `Topics` in a [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) request, based on the [starting position](kafka-starting-positions.md) that you specify. After successful processing, your Kafka topic is committed to your Kafka cluster.

Lambda reads messages sequentially for each Kafka topic partition. A single Lambda payload can contain messages from multiple partitions. When more records are available, Lambda continues processing records in batches, based on the BatchSize value that you specify in a [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) request, until your function catches up with the topic.

After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for any of the messages in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing.

**Note**  
While Lambda functions typically have a maximum timeout limit of 15 minutes, event source mappings for Amazon MSK, self-managed Apache Kafka, Amazon DocumentDB, and Amazon MQ for ActiveMQ and RabbitMQ only support functions with maximum timeout limits of 14 minutes.

# Configuring Amazon MSK cluster authentication methods in Lambda
<a name="msk-cluster-auth"></a>

Lambda needs permission to access your Amazon MSK cluster, retrieve records, and perform other tasks. Amazon MSK supports several ways to authenticate with your MSK cluster.

**Topics**
+ [

## Unauthenticated access
](#msk-unauthenticated)
+ [

## SASL/SCRAM authentication
](#msk-sasl-scram)
+ [

## Mutual TLS authentication
](#msk-mtls)
+ [

## IAM authentication
](#msk-iam-auth)
+ [

## How Lambda chooses a bootstrap broker
](#msk-bootstrap-brokers)

## Unauthenticated access
<a name="msk-unauthenticated"></a>

If no clients access the cluster over the internet, you can use unauthenticated access.

## SASL/SCRAM authentication
<a name="msk-sasl-scram"></a>

Lambda supports [ Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM)](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password-tutorial.html) authentication, with the SHA-512 hash function and Transport Layer Security (TLS) encryption. For Lambda to connect to the cluster, store the authentication credentials (username and password) in a Secrets Manager secret, and reference this secret when configuring your event source mapping.

For more information about using Secrets Manager, see [Sign-in credentials authentication with Secrets Manager](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

**Note**  
Amazon MSK doesn't support SASL/PLAIN authentication.

## Mutual TLS authentication
<a name="msk-mtls"></a>

Mutual TLS (mTLS) provides two-way authentication between the client and the server. The client sends a certificate to the server for the server to verify the client. The server also sends a certificate to the client for the client to verify the server.

For Amazon MSK integrations with Lambda, your MSK cluster acts as the server, and Lambda acts as the client.
+ For Lambda to verify your MSK cluster, you configure a client certificate as a secret in Secrets Manager, and reference this certificate in your event source mapping configuration. The client certificate must be signed by a certificate authority (CA) in the server's trust store.
+ The MSK cluster also sends a server certificate to Lambda. The server certificate must be signed by a certificate authority (CA) in the AWS trust store.

Amazon MSK doesn't support self-signed server certificates. All brokers in Amazon MSK use [public certificates](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html) signed by [Amazon Trust Services CAs](https://www.amazontrust.com/repository/), which Lambda trusts by default.

### Configuring the mTLS secret
<a name="mtls-auth-secret"></a>

The CLIENT\$1CERTIFICATE\$1TLS\$1AUTH secret requires a certificate field and a private key field. For an encrypted private key, the secret requires a private key password. Both the certificate and private key must be in PEM format.

**Note**  
Lambda supports the [PBES1](https://datatracker.ietf.org/doc/html/rfc2898/#section-6.1) (but not PBES2) private key encryption algorithms.

The certificate field must contain a list of certificates, beginning with the client certificate, followed by any intermediate certificates, and ending with the root certificate. Each certificate must start on a new line with the following structure:

```
-----BEGIN CERTIFICATE-----  
        <certificate contents>
-----END CERTIFICATE-----
```

Secrets Manager supports secrets up to 65,536 bytes, which is enough space for long certificate chains.

The private key must be in [PKCS \$18](https://datatracker.ietf.org/doc/html/rfc5208) format, with the following structure:

```
-----BEGIN PRIVATE KEY-----  
         <private key contents>
-----END PRIVATE KEY-----
```

For an encrypted private key, use the following structure:

```
-----BEGIN ENCRYPTED PRIVATE KEY-----  
          <private key contents>
-----END ENCRYPTED PRIVATE KEY-----
```

The following example shows the contents of a secret for mTLS authentication using an encrypted private key. For an encrypted private key, you include the private key password in the secret.

```
{
 "privateKeyPassword": "testpassword",
 "certificate": "-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
...
j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
...
rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----",
 "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp
...
QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----"
}
```

For more information about mTLS for Amazon MSK, and instructions on how to generate a client certificate, see [Mutual TLS client authentication for Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

## IAM authentication
<a name="msk-iam-auth"></a>

You can use AWS Identity and Access Management (IAM) to authenticate the identity of clients that connect to the MSK cluster. With IAM auth, Lambda relies on the permissions in your function's [execution role](lambda-intro-execution-role.md) to connect to the cluster, retrieve records, and perform other required actions. For a sample policy that contains the necessary permissions, see [ Create authorization policies for the IAM role](https://docs.aws.amazon.com/msk/latest/developerguide/create-iam-access-control-policies.html) in the *Amazon Managed Streaming for Apache Kafka Developer Guide*.

If IAM auth is active on your MSK cluster, and you don't provide a secret, Lambda automatically defaults to using IAM auth.

For more information about IAM authentication in Amazon MSK, see [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html).

## How Lambda chooses a bootstrap broker
<a name="msk-bootstrap-brokers"></a>

Lambda chooses a [ bootstrap broker](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html) based on the authentication methods available on your cluster, and whether you provide a secret for authentication. If you provide a secret for mTLS or SASL/SCRAM, Lambda automatically chooses that auth method. If you don't provide a secret, Lambda selects the strongest auth method that's active on your cluster. The following is the order of priority in which Lambda selects a broker, from strongest to weakest auth:
+ mTLS (secret provided for mTLS)
+ SASL/SCRAM (secret provided for SASL/SCRAM)
+ SASL IAM (no secret provided, and IAM auth active)
+ Unauthenticated TLS (no secret provided, and IAM auth not active)
+ Plaintext (no secret provided, and both IAM auth and unauthenticated TLS are not active)

**Note**  
If Lambda can't connect to the most secure broker type, Lambda doesn't attempt to connect to a different (weaker) broker type. If you want Lambda to choose a weaker broker type, deactivate all stronger auth methods on your cluster.

# Creating a Lambda event source mapping for an Amazon MSK event source
<a name="msk-esm-create"></a>

To create an event source mapping, you can use the Lambda console, the [AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html), or an [AWS SDK](https://aws.amazon.com/getting-started/tools-sdks/).

**Note**  
When you create the event source mapping, Lambda creates a [ hyperplane ENI](configuration-vpc.md#configuration-vpc-enis) in the private subnet that contains your MSK cluster, allowing Lambda to establish a secure connection. This hyperplane ENI allows uses the subnet and security group configuration of your MSK cluster, not your Lambda function.

The following console steps add an Amazon MSK cluster as a trigger for your Lambda function. Under the hood, this creates an event source mapping resource.

**To add an Amazon MSK trigger to your Lambda function (console)**

1. Open the [Function page](https://console.aws.amazon.com/lambda/home#/functions) of the Lambda console.

1. Choose the name of the Lambda function you want to add an Amazon MSK trigger to.

1. Under **Function overview**, choose **Add trigger**.

1. Under **Trigger configuration**, choose **MSK**.

1. To specify your Kafka cluster details, do the following:

   1. For **MSK cluster**, select your cluster.

   1. For **Topic name**, enter the name of the Kafka topic to consume messages from.

   1. For **Consumer group ID**, enter the ID of a Kafka consumer group to join, if applicable. For more information, see [Customizable consumer group ID in Lambda](kafka-consumer-group-id.md).

1. For **Cluster authentication**, make the necessary configurations. For more information about cluster authentication, see [Configuring Amazon MSK cluster authentication methods in Lambda](msk-cluster-auth.md).
   + Toggle on **Use authentication** if you want Lambda to perform authentication with your MSK cluster when establishing a connection. Authentication is recommended.
   + If you use authentication, for **Authentication method**, choose the authentication method to use.
   + If you use authentication, for **Secrets Manager key**, choose the Secrets Manager key that contains the authentication credentials needed to access your cluster.

1. Under **Event poller configuration**, make the necessary configurations.
   + Choose **Activate trigger** to enable the trigger immediately after creation.
   + Choose whether you want to **Configure provisioned mode** for your event source mapping. For more information, see [Apache Kafka event poller scaling modes in Lambda](kafka-scaling-modes.md).
     + If you configure provisioned mode, enter a value for **Minimum event pollers**, a value for **Maximum event pollers**, and an optional value for PollerGroupName to specify grouping of multiple ESMs within the same event source VPC.
   + For **Starting position**, choose how you want Lambda to start reading from your stream. For more information, see [Apache Kafka polling and stream starting positions in Lambda](kafka-starting-positions.md).

1. Under **Batching**, make the necessary configurations. For more information about batching, see [Batching behavior](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

   1. For **Batch size**, enter the maximum number of messages to receive in a single batch.

   1. For **Batch window**, enter the maximum number of seconds that Lambda spends gathering records before invoking the function.

1. Under **Filtering**, make the necessary configurations. For more information about filtering, see [Filtering events from Amazon MSK and self-managed Apache Kafka event sources](kafka-filtering.md).
   + For **Filter criteria**, add filter criteria definitions to determine whether or not to process an event.

1. Under **Failure handling**, make the necessary configurations. For more information about failure handling, see [Capturing discarded batches for Amazon MSK and self-managed Apache Kafka event sources](kafka-on-failure.md).
   + For **On-failure destination**, specify the ARN of your on-failure destination.

1. For **Tags**, enter the tags to associate with this event source mapping.

1. To create the trigger, choose **Add**.

You can also create the event source mapping using the AWS CLI with the [ create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html) command. The following example creates an event source mapping to map the Lambda function `my-msk-function` to the `AWSKafkaTopic` topic, starting from the `LATEST` message. This command also uses the [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object to instruct Lambda to use [SASL/SCRAM](msk-cluster-auth.md#msk-sasl-scram) authentication when connecting to the cluster.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

If the cluster uses [mTLS authentication](msk-cluster-auth.md#msk-mtls), include a [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object that specifies `CLIENT_CERTIFICATE_TLS_AUTH` and a Secrets Manager key ARN. This is shown in the following command:

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
```

When the cluster uses [IAM authentication](msk-cluster-auth.md#msk-iam-auth), you don’t need a [ SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object. This is shown in the following command:

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

# Creating cross-account event source mappings in Lambda
<a name="msk-cross-account"></a>

You can use [multi-VPC private connectivity](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html) to connect a Lambda function to a provisioned MSK cluster in a different AWS account. Multi-VPC connectivity uses AWS PrivateLink, which keeps all traffic within the AWS network.

**Note**  
You can't create cross-account event source mappings for serverless MSK clusters.

To create a cross-account event source mapping, you must first [configure multi-VPC connectivity for the MSK cluster](https://docs.aws.amazon.com/msk/latest/developerguide/aws-access-mult-vpc.html#mvpc-cluster-owner-action-turn-on). When you create the event source mapping, use the managed VPC connection ARN instead of the cluster ARN, as shown in the following examples. The [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) operation also differs depending on which authentication type the MSK cluster uses.

**Example — Create cross-account event source mapping for cluster that uses IAM authentication**  
When the cluster uses [IAM role-based authentication](msk-cluster-auth.md#msk-iam-auth), you don't need a [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object. Example:  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function
```

**Example — Create cross-account event source mapping for cluster that uses SASL/SCRAM authentication**  
If the cluster uses [SASL/SCRAM authentication](msk-cluster-auth.md#msk-sasl-scram), you must include a [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object that specifies `SASL_SCRAM_512_AUTH` and a Secrets Manager secret ARN.  
There are two ways to use secrets for cross-account Amazon MSK event source mappings with SASL/SCRAM authentication:  
+ Create a secret in the Lambda function account and sync it with the cluster secret. [Create a rotation](https://docs.aws.amazon.com/secretsmanager/latest/userguide/rotating-secrets.html) to keep the two secrets in sync. This option allows you to control the secret from the function account.
+ Use the secret that's associated with the MSK cluster. This secret must allow cross-account access to the Lambda function account. For more information, see [Permissions to AWS Secrets Manager secrets for users in a different account](https://docs.aws.amazon.com/secretsmanager/latest/userguide/auth-and-access_examples_cross.html).

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

**Example — Create cross-account event source mapping for cluster that uses mTLS authentication**  
If the cluster uses [mTLS authentication](msk-cluster-auth.md#msk-mtls), you must include a [SourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/api/API_SourceAccessConfiguration.html) object that specifies `CLIENT_CERTIFICATE_TLS_AUTH` and a Secrets Manager secret ARN. The secret can be stored in the cluster account or the Lambda function account.  

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \
  --topics AWSKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
```

# All Amazon MSK event source configuration parameters in Lambda
<a name="msk-esm-parameters"></a>

All Lambda event source types share the same [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) and [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) API operations. However, only some of the parameters apply to Amazon MSK, as shown in the following table.


| Parameter | Required | Default | Notes | 
| --- | --- | --- | --- | 
|  AmazonManagedKafkaEventSourceConfig  |  N  |  Contains the ConsumerGroupId field, which defaults to a unique value.  |  Can set only on Create  | 
|  BatchSize  |  N  |  100  |  Maximum: 10,000  | 
|  DestinationConfig  |  N  |  N/A  |  [Capturing discarded batches for Amazon MSK and self-managed Apache Kafka event sources](kafka-on-failure.md)  | 
|  Enabled  |  N  |  True  |    | 
|  BisectBatchOnFunctionError  |  N  |  False  |  [Configuring error handling controls for Kafka event sources](kafka-retry-configurations.md)  | 
|  FunctionResponseTypes  |  N  |  N/A  |  [Configuring error handling controls for Kafka event sources](kafka-retry-configurations.md)  | 
|  MaximumRecordAgeInSeconds  |  N  |  -1 (infinite)  |  [Configuring error handling controls for Kafka event sources](kafka-retry-configurations.md)  | 
|  MaximumRetryAttempts  |  N  |  -1 (infinite)  |  [Configuring error handling controls for Kafka event sources](kafka-retry-configurations.md)  | 
|  EventSourceArn  |  Y  | N/A |  Can set only on Create  | 
|  FilterCriteria  |  N  |  N/A  |  [Control which events Lambda sends to your function](invocation-eventfiltering.md)  | 
|  FunctionName  |  Y  |  N/A  |    | 
|  KMSKeyArn  |  N  |  N/A  |  [Encryption of filter criteria](invocation-eventfiltering.md#filter-criteria-encryption)  | 
|  MaximumBatchingWindowInSeconds  |  N  |  500 ms  |  [Batching behavior](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)  | 
|  ProvisionedPollersConfig  |  N  |  `MinimumPollers`: default value of 1 if not specified `MaximumPollers`: default value of 200 if not specified `PollerGroupName`: N/A  |  [Provisioned mode](kafka-scaling-modes.md#kafka-provisioned-mode)  | 
|  SourceAccessConfigurations  |  N  |  No credentials  |  SASL/SCRAM or CLIENT\$1CERTIFICATE\$1TLS\$1AUTH (MutualTLS) authentication credentials for your event source  | 
|  StartingPosition  |  Y  | N/A |  AT\$1TIMESTAMP, TRIM\$1HORIZON, or LATEST Can set only on Create  | 
|  StartingPositionTimestamp  |  N  |  N/A  |  Required if StartingPosition is set to AT\$1TIMESTAMP  | 
|  Tags  |  N  |  N/A  |  [Using tags on event source mappings](tags-esm.md)  | 
|  Topics  |  Y  | N/A |  Kafka topic name Can set only on Create  | 

**Note**  
When you specify a `PollerGroupName`, multiple ESMs within the same Amazon VPC can share Event Poller Unit (EPU) capacity. You can use this option to optimize Provisioned mode costs for your ESMs. Requirements for ESM grouping:  
ESMs must be within the same Amazon VPC
Maximum of 100 ESMs per poller group
Aggregate maximum pollers across all ESMs in a group cannot exceed 2000
You can update the `PollerGroupName` to move an ESM to a different group, or remove an ESM from a group by setting `PollerGroupName` to an empty string ("").

# Tutorial: Using an Amazon MSK event source mapping to invoke a Lambda function
<a name="services-msk-tutorial"></a>

In this tutorial, you will perform the following:
+ Create a Lambda function in the same AWS account as an existing Amazon MSK cluster.
+ Configure networking and authentication for Lambda to communicate with Amazon MSK.
+ Set up a Lambda Amazon MSK event source mapping, which runs your Lambda function when events show up in the topic.

After you are finished with these steps, when events are sent to Amazon MSK, you will be able to set up a Lambda function to process those events automatically with your own custom Lambda code.

 **What can you do with this feature?** 

**Example solution: Use an MSK event source mapping to deliver live scores to your customers.**

Consider the following scenario: Your company hosts a web application where your customers can view information about live events, such as sports games. Information updates from the game are provided to your team through a Kafka topic on Amazon MSK. You want to design a solution that consumes updates from the MSK topic to provide an updated view of the live event to customers inside an application you develop. You have decided on the following design approach: Your client applications will communicate with a serverless backend hosted in AWS. Clients will connect over websocket sessions using the Amazon API Gateway WebSocket API.

In this solution, you need a component that reads MSK events, performs some custom logic to prepare those events for the application layer and then forwards that information to the API Gateway API. You can implement this component with AWS Lambda, by providing your custom logic in a Lambda function, then calling it with a AWS Lambda Amazon MSK event source mapping.

For more information about implementing solutions using the Amazon API Gateway WebSocket API, see [WebSocket API tutorials](https://docs.aws.amazon.com/apigateway/latest/developerguide/websocket-api-chat-app.html) in the API Gateway documentation.

## Prerequisites
<a name="w2aad101c23c15c35c19"></a>

An AWS account with the following preconfigured resources:

**To fulfill these prerequisites, we recommend following [Getting started using Amazon MSK](https://docs.aws.amazon.com//msk/latest/developerguide/getting-started.html) in the Amazon MSK documentation.**
+ An Amazon MSK cluster. See [Create an Amazon MSK cluster](https://docs.aws.amazon.com//msk/latest/developerguide/create-cluster.html) in *Getting started using Amazon MSK*.
+ The following configuration:
  + Ensure **IAM role-based authentication** is **Enabled** in your cluster security settings. This improves your security by limiting your Lambda function to only access the Amazon MSK resources needed. This is enabled by default on new Amazon MSK clusters.
  + Ensure **Public access** is off in your cluster networking settings. Restricting your Amazon MSK cluster's access to the internet improves your security by limiting how many intermediaries handle your data. This is enabled by default on new Amazon MSK clusters.
+ A Kafka topic in your Amazon MSK cluster to use for this solution. See [Create a topic](https://docs.aws.amazon.com//msk/latest/developerguide/create-topic.html) in *Getting started using Amazon MSK*.
+ A Kafka admin host set up to retrieve information from your Kafka cluster and send Kafka events to your topic for testing, such as an Amazon EC2 instance with the Kafka admin CLI and Amazon MSK IAM library installed. See [Create a client machine](https://docs.aws.amazon.com//msk/latest/developerguide/create-client-machine.html) in *Getting started using Amazon MSK*.

Once you have set up these resources, gather the following information from your AWS account to confirm that you are ready to continue.
+ The name of your Amazon MSK cluster. You can find this information in the Amazon MSK console.
+ The cluster UUID, part of the ARN for your Amazon MSK cluster, which you can find in the Amazon MSK console. Follow the procedures in [Listing clusters](https://docs.aws.amazon.com/msk/latest/developerguide/msk-list-clusters.html) in the Amazon MSK documentation to find this information.
+ The security groups associated with your Amazon MSK cluster. You can find this information in the Amazon MSK console. In the following steps, refer to these as your *clusterSecurityGroups*.
+ The id of the Amazon VPC containing your Amazon MSK cluster. You can find this information by identifying subnets associated with your Amazon MSK cluster in the Amazon MSK console, then identifying the Amazon VPC associated with the subnet in the Amazon VPC Console.
+ The name of the Kafka topic used in your solution. You can find this information by calling your Amazon MSK cluster with the Kafka `topics` CLI from your Kafka admin host. For more information about the topics CLI, see [Adding and removing topics](https://kafka.apache.org/documentation/#basic_ops_add_topic) in the Kafka documentation.
+ The name of a consumer group for your Kafka topic, suitable for use by your Lambda function. This group can be created automatically by Lambda, so you don't need to create it with the Kafka CLI. If you do need to manage your consumer groups, to learn more about the consumer-groups CLI, see [Managing Consumer Groups](https://kafka.apache.org/documentation/#basic_ops_consumer_group) in the Kafka documentation.

The following permissions in your AWS account:
+ Permission to create and manage a Lambda function.
+ Permission to create IAM policies and associate them with your Lambda function.
+ Permission to create Amazon VPC endpoints and alter networking configuration in the Amazon VPC hosting your Amazon MSK cluster.

### Install the AWS Command Line Interface
<a name="install_aws_cli"></a>

If you have not yet installed the AWS Command Line Interface, follow the steps at [Installing or updating the latest version of the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) to install it.

The tutorial requires a command line terminal or shell to run commands. In Linux and macOS, use your preferred shell and package manager.

**Note**  
In Windows, some Bash CLI commands that you commonly use with Lambda (such as `zip`) are not supported by the operating system's built-in terminals. To get a Windows-integrated version of Ubuntu and Bash, [install the Windows Subsystem for Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Configure network connectivity for Lambda to communicate with Amazon MSK
<a name="w2aad101c23c15c35c21"></a>

 Use AWS PrivateLink to connect Lambda and Amazon MSK. You can do so by creating interface Amazon VPC endpoints in the Amazon VPC console. For more information about networking configuration, see [Configuring your Amazon MSK cluster and Amazon VPC network for Lambda](with-msk-cluster-network.md). 

When a Amazon MSK event source mapping runs on the behalf of a Lambda function, it assumes the Lambda function’s execution role. This IAM role authorizes the mapping to access resources secured by IAM, such as your Amazon MSK cluster. Although the components share an execution role, the Amazon MSK mapping and your Lambda function have separate connectivity requirements for their respective tasks, as shown in the following diagram.

![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/msk_tut_network.png)


Your event source mapping belongs to your Amazon MSK cluster security group. In this networking step, create Amazon VPC endpoints from your Amazon MSK cluster VPC to connect the event source mapping to the Lambda and STS services. Secure these endpoints to accept traffic from your Amazon MSK cluster security group. Then, adjust the Amazon MSK cluster security groups to allow the event source mapping to communicate with the Amazon MSK cluster.

 You can configure the following steps using the AWS Management Console.

**To configure interface Amazon VPC endpoints to connect Lambda and Amazon MSK**

1. Create a security group for your interface Amazon VPC endpoints, *endpointSecurityGroup*, that allows inbound TCP traffic on 443 from *clusterSecurityGroups*. Follow the procedure in [Create a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#creating-security-group) in the Amazon EC2 documentation to create a security group. Then, follow the procedure in [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) in the Amazon EC2 documentation to add appropriate rules. 

   **Create a security group with the following information:**

   When adding your inbound rules, create a rule for each security group in *clusterSecurityGroups*. For each rule:
   + For **Type**, select **HTTPS**.
   + For **Source**, select one of *clusterSecurityGroups*.

1.  Create an endpoint connecting the Lambda service to the Amazon VPC containing your Amazon MSK cluster. Follow the procedure in [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Create an interface endpoint with the following information:**
   + For **Service name**, select `com.amazonaws.regionName.lambda`, where *regionName* hosts your Lambda function.
   + For **VPC**, select the Amazon VPC containing your Amazon MSK cluster.
   + For **Security groups**, select *endpointSecurityGroup*, which you created earlier.
   + For **Subnets**, select the subnets that host your Amazon MSK cluster.
   + For **Policy**, provide the following policy document, which secures the endpoint for use by the Lambda service principal for the `lambda:InvokeFunction` action.

     ```
     {
         "Statement": [
             {
                 "Action": "lambda:InvokeFunction",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Ensure **Enable DNS name** remains set.

1.  Create an endpoint connecting the AWS STS service to the Amazon VPC containing your Amazon MSK cluster. Follow the procedure in [Create an interface endpoint](https://docs.aws.amazon.com//vpc/latest/privatelink/create-interface-endpoint.html).

   **Create an interface endpoint with the following information:**
   + For **Service name**, select AWS STS.
   + For **VPC**, select the Amazon VPC containing your Amazon MSK cluster.
   + For **Security groups**, select *endpointSecurityGroup*.
   + For **Subnets**, select the subnets that host your Amazon MSK cluster.
   + For **Policy**, provide the following policy document, which secures the endpoint for use by the Lambda service principal for the `sts:AssumeRole` action.

     ```
     {
         "Statement": [
             {
                 "Action": "sts:AssumeRole",
                 "Effect": "Allow",
                 "Principal": {
                     "Service": [
                         "lambda.amazonaws.com"
                     ]
                 },
                 "Resource": "*"
             }
         ]
     }
     ```
   + Ensure **Enable DNS name** remains set.

1. For each security group associated with your Amazon MSK cluster, that is, in *clusterSecurityGroups*, allow the following:
   + Allow all inbound and outbound TCP traffic on 9098 to all of *clusterSecurityGroups*, including within itself.
   + Allow all outbound TCP traffic on 443.

   Some of this traffic is allowed by default security group rules, so if your cluster is attached to a single security group, and that group has default rules, additional rules are not necessary. To adjust security group rules, follow the procedures in [Add rules to a security group](https://docs.aws.amazon.com//AWSEC2/latest/UserGuide/working-with-security-groups.html#adding-security-group-rule) in the Amazon EC2 documentation.

   **Add rules to your security groups with the following information:**
   + For each inbound rule or outbound rule for port 9098, provide
     + For **Type**, select **Custom TCP**.
     + For **Port range**, provide 9098.
     + For **Source**, provide one of *clusterSecurityGroups*.
   + For each inbound rule for port 443, for **Type**, select **HTTPS**.

## Create an IAM role for Lambda to read from your Amazon MSK topic
<a name="w2aad101c23c15c35c23"></a>

Identify the auth requirements for Lambda to read from your Amazon MSK topic, then define them in a policy. Create a role, *lambdaAuthRole*, that authorizes Lambda to use those permissions. Authorize actions on your Amazon MSK cluster using `kafka-cluster` IAM actions. Then, authorize Lambda to perform Amazon MSK `kafka` and Amazon EC2 actions needed to discover and connect to your Amazon MSK cluster, as well as CloudWatch actions so Lambda can log what it has done.

**To describe the auth requirements for Lambda to read from Amazon MSK**

1. Write an IAM policy document (a JSON document), *clusterAuthPolicy*, that allows Lambda to read from your Kafka topic in your Amazon MSK cluster using your Kafka consumer group. Lambda requires a Kafka consumer group to be set when reading.

   Alter the following template to align with your prerequisites:

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "kafka-cluster:Connect",
                   "kafka-cluster:DescribeGroup",
                   "kafka-cluster:AlterGroup",
                   "kafka-cluster:DescribeTopic",
                   "kafka-cluster:ReadData",
                   "kafka-cluster:DescribeClusterDynamicConfiguration"
               ],
               "Resource": [
                   "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid",
                   "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName",
                   "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName"
               ]
           }
       ]
   }
   ```

------

   For more information, consult [Configuring Lambda permissions for Amazon MSK event source mappings](with-msk-permissions.md). When writing your policy:
   + Replace *us-east-1* and *111122223333* with the AWS Region and AWS account of your Amazon MSK cluster.
   + For *mskClusterName*, provide the name of your Amazon MSK cluster.
   + For *cluster-uuid*, provide the UUID in the ARN for your Amazon MSK cluster.
   + For *mskTopicName*, provide the name of your Kafka topic.
   + For *mskGroupName*, provide the name of your Kafka consumer group.

1. Identify the Amazon MSK, Amazon EC2 and CloudWatch permissions required for Lambda to discover and connect your Amazon MSK cluster, and log those events.

   The `AWSLambdaMSKExecutionRole` managed policy permissively defines the required permissions. Use it in the following steps.

   In a production environment, assess `AWSLambdaMSKExecutionRole` to restrict your execution role policy based on the principle of least privilege, then write a policy for your role that replaces this managed policy.

For details about the IAM policy language, see the [IAM documentation](https://docs.aws.amazon.com//iam/).

Now that you have written your policy document, create an IAM policy so you can attach it to your role. You can do this using the console with the following procedure.

**To create an IAM policy from your policy document**

1. Sign in to the AWS Management Console and open the IAM console at [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/).

1. In the navigation pane on the left, choose **Policies**. 

1. Choose **Create policy**.

1. In the **Policy editor** section, choose the **JSON** option.

1. Paste *clusterAuthPolicy*.

1. When you are finished adding permissions to the policy, choose **Next**.

1. On the **Review and create** page, type a **Policy Name** and a **Description** (optional) for the policy that you are creating. Review **Permissions defined in this policy** to see the permissions that are granted by your policy.

1. Choose **Create policy** to save your new policy.

For more information, see [Creating IAM policies](https://docs.aws.amazon.com//IAM/latest/UserGuide/access_policies_create.html) in the IAM documentation.

Now that you have appropriate IAM policies, create a role and attach them to it. You can do this using the console with the following procedure.

**To create an execution role in the IAM console**

1. Open the [Roles page](https://console.aws.amazon.com/iam/home#/roles) in the IAM console.

1. Choose **Create role**.

1. Under **Trusted entity type**, choose **AWS service**.

1. Under **Use case**, choose **Lambda**.

1. Choose **Next**.

1. Select the following policies:
   + *clusterAuthPolicy*
   + `AWSLambdaMSKExecutionRole`

1. Choose **Next**.

1. For **Role name**, enter *lambdaAuthRole* and then choose **Create role**.

For more information, see [Defining Lambda function permissions with an execution role](lambda-intro-execution-role.md).

## Create a Lambda function to read from your Amazon MSK topic
<a name="w2aad101c23c15c35c25"></a>

Create a Lambda function configured to use your IAM role. You can create your Lambda function using the console.

**To create a Lambda function using your auth configuration**

1.  Open the Lambda console and select **Create function** from the header. 

1. Select **Author from scratch**.

1. For **Function name**, provide an appropriate name of your choice.

1. For **Runtime**, choose the **Latest supported** version of `Node.js` to use the code provided in this tutorial.

1. Choose **Change default execution role**.

1. Select **Use an existing role**.

1. For **Existing role**, select *lambdaAuthRole*.

In a production environment, you usually need to add further policies to the execution role for your Lambda function to meaningfully process your Amazon MSK events. For more information on adding policies to your role, see [Add or remove identity permissions](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_manage-attach-detach.html#add-policies-console) in the IAM documentation.

## Create an event source mapping to your Lambda function
<a name="w2aad101c23c15c35c27"></a>

Your Amazon MSK event source mapping provides the Lambda service the information necessary to invoke your Lambda when appropriate Amazon MSK events occur. You can create a Amazon MSK mapping using the console. Create a Lambda trigger, then the event source mapping is automatically set up.

**To create a Lambda trigger (and event source mapping)**

1. Navigate to your Lambda function's overview page.

1. In the function overview section, choose **Add trigger** on the bottom left.

1. In the **Select a source** dropdown, select **Amazon MSK**.

1. Don't set **authentication**.

1. For **MSK cluster**, select your cluster's name.

1. For **Batch size**, enter 1. This step makes this feature easier to test, and is not an ideal value in production.

1. For **Topic name**, provide the name of your Kafka topic.

1. For **Consumer group ID**, provide the id of your Kafka consumer group.

## Update your Lambda function to read your streaming data
<a name="w2aad101c23c15c35c29"></a>

 Lambda provides information about Kafka events through the event method parameter. For an example structure of a Amazon MSK event, see [Example event](with-msk.md#msk-sample-event). After you understand how to interpret Lambda forwarded Amazon MSK events, you can alter your Lambda function code to use the information they provide. 

 Provide the following code to your Lambda function to log the contents of a Lambda Amazon MSK event for testing purposes: 

------
#### [ .NET ]

**SDK for .NET**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using .NET.  

```
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KafkaEvents;


// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace MSKLambda;

public class Function
{
    
    
    /// <param name="input">The event for the Lambda function handler to process.</param>
    /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param>
    /// <returns></returns>
    public void FunctionHandler(KafkaEvent evnt, ILambdaContext context)
    {

        foreach (var record in evnt.Records)
        {
            Console.WriteLine("Key:" + record.Key); 
            foreach (var eventRecord in record.Value)
            {
                var valueBytes = eventRecord.Value.ToArray();    
                var valueText = Encoding.UTF8.GetString(valueBytes);
                
                Console.WriteLine("Message:" + valueText);
            }
        }
    }
    

}
```

------
#### [ Go ]

**SDK for Go V2**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using Go.  

```
package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(event events.KafkaEvent) {
	for key, records := range event.Records {
		fmt.Println("Key:", key)

		for _, record := range records {
			fmt.Println("Record:", record)

			decodedValue, _ := base64.StdEncoding.DecodeString(record.Value)
			message := string(decodedValue)
			fmt.Println("Message:", message)
		}
	}
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using Java.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent;
import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord;

import java.util.Base64;
import java.util.Map;

public class Example implements RequestHandler<KafkaEvent, Void> {

    @Override
    public Void handleRequest(KafkaEvent event, Context context) {
        for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) {
            String key = entry.getKey();
            System.out.println("Key: " + key);

            for (KafkaEventRecord record : entry.getValue()) {
                System.out.println("Record: " + record);

                byte[] value = Base64.getDecoder().decode(record.getValue());
                String message = new String(value);
                System.out.println("Message: " + message);
            }
        }

        return null;
    }
}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using JavaScript.  

```
exports.handler = async (event) => {
    // Iterate through keys
    for (let key in event.records) {
      console.log('Key: ', key)
      // Iterate through records
      event.records[key].map((record) => {
        console.log('Record: ', record)
        // Decode base64
        const msg = Buffer.from(record.value, 'base64').toString()
        console.log('Message:', msg)
      }) 
    }
}
```
Consuming an Amazon MSK event with Lambda using TypeScript.  

```
import { MSKEvent, Context } from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "msk-handler-sample",
});

export const handler = async (
  event: MSKEvent,
  context: Context
): Promise<void> => {
  for (const [topic, topicRecords] of Object.entries(event.records)) {
    logger.info(`Processing key: ${topic}`);

    // Process each record in the partition
    for (const record of topicRecords) {
      try {
        // Decode the message value from base64
        const decodedMessage = Buffer.from(record.value, 'base64').toString();

        logger.info({
          message: decodedMessage
        });
      }
      catch (error) {
        logger.error('Error processing event', { error });
        throw error;
      }
    };
  }
}
```

------
#### [ PHP ]

**SDK for PHP**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using PHP.  

```
<?php
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kafka\KafkaEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): void
    {
        $kafkaEvent = new KafkaEvent($event);
        $this->logger->info("Processing records");
        $records = $kafkaEvent->getRecords();

        foreach ($records as $record) {
            try {
                $key = $record->getKey();
                $this->logger->info("Key: $key");

                $values = $record->getValue();
                $this->logger->info(json_encode($values));

                foreach ($values as $value) {
                    $this->logger->info("Value: $value");
                }
                
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using Python.  

```
import base64

def lambda_handler(event, context):
    # Iterate through keys
    for key in event['records']:
        print('Key:', key)
        # Iterate through records
        for record in event['records'][key]:
            print('Record:', record)
            # Decode base64
            msg = base64.b64decode(record['value']).decode('utf-8')
            print('Message:', msg)
```

------
#### [ Ruby ]

**SDK for Ruby**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using Ruby.  

```
require 'base64'

def lambda_handler(event:, context:)
  # Iterate through keys
  event['records'].each do |key, records|
    puts "Key: #{key}"

    # Iterate through records
    records.each do |record|
      puts "Record: #{record}"

      # Decode base64
      msg = Base64.decode64(record['value'])
      puts "Message: #{msg}"
    end
  end
end
```

------
#### [ Rust ]

**SDK for Rust**  
 There's more on GitHub. Find the complete example and learn how to set up and run in the [Serverless examples](https://github.com/aws-samples/serverless-snippets/tree/main/integration-msk-to-lambda) repository. 
Consuming an Amazon MSK event with Lambda using Rust.  

```
use aws_lambda_events::event::kafka::KafkaEvent;
use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent};
use base64::prelude::*;
use serde_json::{Value};
use tracing::{info};

/// Pre-Requisites:
/// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html
/// 2. Add packages tracing, tracing-subscriber, serde_json, base64
///
/// This is the main body for the function.
/// Write your code inside it.
/// There are some code example in the following URLs:
/// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples
/// - https://github.com/aws-samples/serverless-rust-demo/

async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> {

    let payload = event.payload.records;

    for (_name, records) in payload.iter() {

        for record in records {

         let record_text = record.value.as_ref().ok_or("Value is None")?;
         info!("Record: {}", &record_text);

         // perform Base64 decoding
         let record_bytes = BASE64_STANDARD.decode(record_text)?;
         let message = std::str::from_utf8(&record_bytes)?;
         
         info!("Message: {}", message);
        }

    }

    Ok(().into())
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // required to enable CloudWatch error logging by the runtime
    tracing::init_default_subscriber();
    info!("Setup CW subscriber!");

    run(service_fn(function_handler)).await
}
```

------

You can provide function code to your Lambda using the console.

**To update function code using the console code editor**

1. Open the [Functions page](https://console.aws.amazon.com/lambda/home#/functions) of the Lambda console and select your function.

1. Select the **Code** tab.

1. In the **Code source** pane, select your source code file and edit it in the integrated code editor.

1. In the **DEPLOY** section, choose **Deploy** to update your function's code:  
![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/getting-started-tutorial/deploy-console.png)

## Test your Lambda function to verify it is connected to your Amazon MSK topic
<a name="w2aad101c23c15c35c31"></a>

You can now verify whether or not your Lambda is being invoked by the event source by inspecting CloudWatch event logs.

**To verify whether your Lambda function is being invoked**

1. Use your Kafka admin host to generate Kafka events using the `kafka-console-producer` CLI. For more information, see [Write some events into the topic](https://kafka.apache.org/documentation/#quickstart_send) in the Kafka documentation. Send enough events to fill up the batch defined by batch size for your event source mapping defined in the previous step, or Lambda will wait for more information to invoke.

1. If your function runs, Lambda writes what happened to CloudWatch. In the console, navigate to your Lambda function's detail page.

1. Select the **Configuration** tab.

1. From the sidebar, select **Monitoring and operations tools**.

1. Identify the **CloudWatch log group** under **Logging configuration**. The log group should start with `/aws/lambda`. Choose the link to the log group.

1. In the CloudWatch console, inspect the **Log events** for the log events Lambda has sent to the log stream. Identify if there are log events containing the message from your Kafka event, as in the following image. If there are, you have successfully connected a Lambda function to Amazon MSK with a Lambda event source mapping.  
![\[\]](http://docs.aws.amazon.com/lambda/latest/dg/images/msk_tut_log.png)