

# Amazon Athena MSK 连接器
<a name="connectors-msk"></a>

使用适用于 [Amazon MSK](https://aws.amazon.com/msk/) 的 Amazon Athena 连接器，Amazon Athena 能够对 Apace Kafka 主题运行 SQL 查询。使用此连接器在 Athena 中以表的形式查看 [Apache Kafka](https://kafka.apache.org/) 主题，以行的形式查看消息。有关更多信息，请参阅 AWS 大数据博客中的 [Analyze real-time streaming data in Amazon MSK with Amazon Athena](https://aws.amazon.com/blogs/big-data/analyze-real-time-streaming-data-in-amazon-msk-with-amazon-athena/)（使用 Amazon Athena 分析 Amazon MSK 中的实时流数据）。

此连接器不使用 Glue 连接将配置属性集中保存到 Glue 中。连接配置通过 Lambda 完成。

## 先决条件
<a name="connectors-msk-prerequisites"></a>

可以使用 Athena 控制台或 AWS Serverless Application Repository 将该连接器部署到您的 AWS 账户。有关更多信息，请参阅 [创建数据来源连接](connect-to-a-data-source.md) 或 [使用 AWS Serverless Application Repository 部署数据来源连接器](connect-data-source-serverless-app-repo.md)。

## 限制
<a name="connectors-msk-limitations"></a>
+ 不支持写入 DDL 操作。
+ 任何相关的 Lambda 限制。有关更多信息，请参阅《AWS Lambda 开发人员指南》**中的 [Lambda 配额](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html)。
+ 必须将筛选条件中的日期和时间戳数据类型转换为适当的数据类型。
+ CSV 文件类型不支持日期和时间戳数据类型，它们被视为 VARCHAR 值。
+ 不支持映射到嵌套 JSON 字段。连接器仅映射顶级字段。
+ 连接器不支持复杂类型。复杂类型解释为字符串。
+ 要提取或处理复杂的 JSON 值，请使用 Athena 中可用的 JSON 相关函数。有关更多信息，请参阅 [从字符串中提取 JSON 数据](extracting-data-from-JSON.md)。
+ 连接器不支持对 Kafka 消息元数据的访问。

## 术语
<a name="connectors-msk-terms"></a>
+ **元数据处理程序** — 从您的数据库实例中检索元数据的 Lambda 处理程序。
+ **记录处理程序** — 从您的数据库实例中检索数据记录的 Lambda 处理程序。
+ **复合处理程序** — 从您的数据库实例中检索元数据和数据记录的 Lambda 处理程序。
+ **Kafka 端点** – 文本字符串，用于建立与 Kafka 实例的连接。

## 集群兼容性
<a name="connectors-msk-cluster-compatibility"></a>

MSK 连接器可用于以下集群类型。
+ **MSK 预置集群** – 您可以手动指定、监控和扩展集群容量。
+ **MSK 无服务器集群** – 提供随应用程序 I/O 扩展而自动扩展的按需容量。
+ **独立 Kafka** - 与 Kafka 直接连接（经过或未经身份验证）。

## 支持的身份验证方法
<a name="connectors-msk-supported-authentication-methods"></a>

连接器支持以下身份验证方法。
+ [SASL/IAM](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html) 
+ [SSL](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)
+ [SASL/SCRAM](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html)
+ SASL/PLAIN
+ SASL/PLAINTEXT
+ NO\_AUTH

  有关更多信息，请参阅 [为 Athena MSK 连接器配置身份验证](#connectors-msk-setup-configuring-authentication)。

## 支持的输入数据格式
<a name="connectors-msk-supported-input-data-formats"></a>

连接器支持以下输入数据格式。
+ JSON
+ CSV

## 参数
<a name="connectors-msk-parameters"></a>

使用本节中的参数来配置 Athena MSK 连接器。

### Athena 数据目录联合连接器
<a name="connectors-msk-connection-legacy"></a>
+ **auth\_type** – 指定集群的身份验证类型。连接器支持以下身份验证类型：
  + **NO\_AUTH** – 无需身份验证即可直接连接到 Kafka（例如，连接到部署在 EC2 实例上的 Kafka 集群，但不使用身份验证）。
  + **SASL\_SSL\_PLAIN** – 此方法使用 `SASL_SSL` 安全协议和 `PLAIN` SASL 机制。
  + **SASL\_PLAINTEXT\_PLAIN** – 此方法使用 `SASL_PLAINTEXT` 安全协议和 `PLAIN` SASL 机制。
**注意**  
Apache Kafka 支持 `SASL_SSL_PLAIN` 和 `SASL_PLAINTEXT_PLAIN` 身份验证类型，但 Amazon MSK 不支持。
  + **SASL\_SSL\_AWS\_MSK\_IAM** – Amazon MSK 的 IAM 访问控制使您能够处理 MSK 集群的身份验证和授权。您用户的 AWS 凭证（密钥和访问密钥）用于与集群连接。有关更多信息，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html)（IAM 访问控制）。
  + **SASL\_SSL\_SCRAM\_SHA512** – 您可以使用这种身份验证类型来控制对 Amazon MSK 集群的访问权限。此方法将用户名和密码存储在 AWS Secrets Manager 上。密钥必须与 Amazon MSK 集群相关。有关更多信息，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的[为 Amazon MSK 集群设置 SASL/SCRAM 身份验证](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html#msk-password-tutorial)。
  + **SSL** – SSL 身份验证使用密钥存储和信任存储文件来连接 Amazon MSK 集群。您必须生成信任存储和密钥存储文件，将其上传到 Amazon S3 存储桶，并在部署连接器时提供对 Amazon S3 的引用。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 私有密钥。有关更多信息，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 [Mutual TLS authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)（双向 TLS 身份验证）。

    有关更多信息，请参阅 [为 Athena MSK 连接器配置身份验证](#connectors-msk-setup-configuring-authentication)。
+ **certificates\_s3\_reference** – 包含证书（密钥存储和信任存储文件）的 Amazon S3 位置。
+ **disable\_spill\_encryption** -（可选）当设置为 `True` 时，将禁用溢出加密。默认值为 `False`，此时将使用 AES-GCM 对溢出到 S3 的数据使用进行加密 - 使用随机生成的密钥，或者使用 KMS 生成密钥。禁用溢出加密可以提高性能，尤其是当您的溢出位置使用[服务器端加密](https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html)时。
+ **kafka\_endpoint** – 提供给 Kafka 的端点详细信息。例如，对于 Amazon MSK 集群，您可以为该集群提供[引导 URL](https://docs.aws.amazon.com/msk/latest/developerguide/msk-get-bootstrap-brokers.html)。
+ **secrets\_manager\_secret** – 保存凭证的 AWS 密钥名称。IAM 身份验证不需要此参数。
+ **溢出参数** – Lambda 函数将不适合内存的数据临时存储（“溢出”）到 Amazon S3。由同一 Lambda 函数访问的所有数据库实例都会溢出到同一位置。使用下表中的参数指定溢出位置。  
****    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-msk.html)

## 数据类型支持
<a name="connectors-msk-data-type-support"></a>

下表显示了 Kafka 和 Apache Arrow 支持的相应数据类型。


****  

| Kafka | Arrow | 
| --- | --- | 
| CHAR | VARCHAR | 
| VARCHAR | VARCHAR | 
| TIMESTAMP | MILLISECOND | 
| DATE | DAY | 
| BOOLEAN | BOOL | 
| SMALLINT | SMALLINT | 
| INTEGER | INT | 
| BIGINT | BIGINT | 
| DECIMAL | FLOAT8 | 
| DOUBLE | FLOAT8 | 

## 分区和拆分
<a name="connectors-msk-partitions-and-splits"></a>

Kafka 主题分为多个分区。每个分区会进行排序。分区中的每条消息都有一个增量 ID，称为*偏移*。每个 Kafka 分区进一步分为多个分区，以进行并行处理。数据在 Kafka 集群中配置的保留期内可用。

## 最佳实践
<a name="connectors-msk-best-practices"></a>

最佳做法是在查询 Athena 时使用谓词下推，如下例所示。

```
SELECT * 
FROM "{{msk_catalog_name}}"."{{glue_schema_registry_name}}"."{{glue_schema_name}}" 
WHERE integercol = 2147483647
```

```
SELECT * 
FROM "{{msk_catalog_name}}"."{{glue_schema_registry_name}}"."{{glue_schema_name}}" 
WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'
```

## 设置 MSK 连接器
<a name="connectors-msk-setup"></a>

在使用连接器之前，您必须设置 Amazon MSK 集群，使用 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)定义架构，并为连接器配置身份验证。

**注意**  
如果您将连接器部署到 VPC 中以访问私有资源，并且还想连接到 Confluent 等可公开访问的服务，则必须将连接器关联到某个具有 NAT 网关的私有子网。有关更多信息，请参阅《Amazon VPC 用户指南》中的 [NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)。

使用 AWS Glue 架构注册表时，请注意以下几点：
+ 确保 AWS Glue 架构注册表的 **Description**（描述）字段中的文本包含 `{AthenaFederationMSK}` 字符串。此标记字符串是与 Amazon Athena MSK 连接器一起使用的 AWS Glue 注册表中的必填字段。
+ 为了获得最佳性能，数据库名称和表名称仅限使用小写。使用混合大小写会使连接器执行不区分大小写的搜索，这种搜索的计算密集度更高。

**设置 Amazon MSK 环境和 AWS Glue 架构注册表**

1. 设置 Amazon MSK 环境。有关信息和步骤，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 [Setting up Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/before-you-begin.html)（设置 Amazon MSK）和 [Getting started using Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)（开始使用 Amazon MSK）。

1. 将 JSON 格式的 Kafka 主题描述文件（即其架构）上传到 AWS Glue 架构注册表。有关更多信息，请参阅《AWS Glue 开发人员指南》中的[与 AWS Glue 架构注册表集成](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html)。有关详细示例，请参阅以下章节。

### AWS Glue 架构注册表的架构示例
<a name="connectors-msk-setup-schema-examples"></a>

将架构上传到 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)时，请使用本节中的示例格式。

#### JSON 类型架构示例
<a name="connectors-msk-setup-schema-examples-json"></a>

在以下示例中，要在 AWS Glue 架构注册表中创建的架构指定 `json` 作为 `dataFormat` 的值，并将 `datatypejson` 用于 `topicName`。

**注意**  
`topicName` 的值应使用与 Kafka 中主题名称相同的大小写。

```
{
  "topicName": "datatypejson",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "intcol",
        "mapping": "intcol",
        "type": "INTEGER"
      },
      {
        "name": "varcharcol",
        "mapping": "varcharcol",
        "type": "VARCHAR"
      },
      {
        "name": "booleancol",
        "mapping": "booleancol",
        "type": "BOOLEAN"
      },
      {
        "name": "bigintcol",
        "mapping": "bigintcol",
        "type": "BIGINT"
      },
      {
        "name": "doublecol",
        "mapping": "doublecol",
        "type": "DOUBLE"
      },
      {
        "name": "smallintcol",
        "mapping": "smallintcol",
        "type": "SMALLINT"
      },
      {
        "name": "tinyintcol",
        "mapping": "tinyintcol",
        "type": "TINYINT"
      },
      {
        "name": "datecol",
        "mapping": "datecol",
        "type": "DATE",
        "formatHint": "yyyy-MM-dd"
      },
      {
        "name": "timestampcol",
        "mapping": "timestampcol",
        "type": "TIMESTAMP",
        "formatHint": "yyyy-MM-dd HH:mm:ss.SSS"
      }
    ]
  }
}
```

#### CSV 类型架构示例
<a name="connectors-msk-setup-schema-examples-csv"></a>

在以下示例中，要在 AWS Glue 架构注册表中创建的架构指定 `csv` 作为 `dataFormat` 的值，并将 `datatypecsvbulk` 用于 `topicName`。`topicName` 的值应使用与 Kafka 中主题名称相同的大小写。

```
{
  "topicName": "datatypecsvbulk",
  "message": {
    "dataFormat": "csv",
    "fields": [
      {
        "name": "intcol",
        "type": "INTEGER",
        "mapping": "0"
      },
      {
        "name": "varcharcol",
        "type": "VARCHAR",
        "mapping": "1"
      },
      {
        "name": "booleancol",
        "type": "BOOLEAN",
        "mapping": "2"
      },
      {
        "name": "bigintcol",
        "type": "BIGINT",
        "mapping": "3"
      },
      {
        "name": "doublecol",
        "type": "DOUBLE",
        "mapping": "4"
      },
      {
        "name": "smallintcol",
        "type": "SMALLINT",
        "mapping": "5"
      },
      {
        "name": "tinyintcol",
        "type": "TINYINT",
        "mapping": "6"
      },
      {
        "name": "floatcol",
        "type": "DOUBLE",
        "mapping": "7"
      }
    ]
  }
}
```

### 为 Athena MSK 连接器配置身份验证
<a name="connectors-msk-setup-configuring-authentication"></a>

您可以使用多种方法对 Amazon MSK 集群进行身份验证，包括 IAM、SSL、SCRAM 和独立的 Kafka。

下表显示了连接器的身份验证类型以及每种连接器的安全协议和 SASL 机制。有关更多信息，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 [Authentication and authorization for Apache Kafka APIs](https://docs.aws.amazon.com/msk/latest/developerguide/kafka_apis_iam.html)（Apache Kafka API 的身份验证和授权）。


****  

| auth\_type | security.protocol | sasl.mechanism | 
| --- | --- | --- | 
| SASL\_SSL\_PLAIN | SASL\_SSL | PLAIN | 
| SASL\_PLAINTEXT\_PLAIN | SASL\_PLAINTEXT | PLAIN | 
| SASL\_SSL\_AWS\_MSK\_IAM | SASL\_SSL | AWS\_MSK\_IAM | 
| SASL\_SSL\_SCRAM\_SHA512 | SASL\_SSL | SCRAM-SHA-512 | 
| SSL | SSL | 不适用 | 

**注意**  
Apache Kafka 支持 `SASL_SSL_PLAIN` 和 `SASL_PLAINTEXT_PLAIN` 身份验证类型，但 Amazon MSK 不支持。

#### SASL/IAM
<a name="connectors-msk-setup-configuring-authentication-sasl-iam"></a>

如果集群使用 IAM 身份验证，则在设置集群时必须为用户配置 IAM policy。有关更多信息，请参阅《Amazon Managed Streaming for Apache Kafka 开发人员指南》中的 [IAM access control](https://docs.aws.amazon.com/msk/latest/developerguide/IAM-access-control.html)（IAM 访问控制）。

要使用此身份验证类型，请将连接器的 `auth_type` Lambda 环境变量设置为 `SASL_SSL_AWS_MSK_IAM`。

#### SSL
<a name="connectors-msk-setup-configuring-authentication-tls"></a>

如果集群经过 SSL 身份验证，则您必须生成信任存储和密钥存储文件，并将其上传到 Amazon S3 存储桶。部署连接器时，必须提供此 Amazon S3 参考资料。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 密钥。

有关在 Secrets Manager 中创建密钥的信息，请参阅[创建 AWS Secrets Manager 密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)。

要使用此身份验证类型，请按下表所示设置环境变量。


****  

| 参数 | 值 | 
| --- | --- | 
| auth\_type | SSL | 
| certificates\_s3\_reference | 包含证书的 Amazon S3 位置。 | 
| secrets\_manager\_secret | 您的 AWS 密钥名称。 | 

在 Secrets Manager 中创建密钥后，您可以在 Secrets Manager 控制台中查看该密钥。

**在 Secrets Manager 中查看密钥**

1. 打开 Secrets Manager 控制台，网址为 [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/)。

1. 在导航窗格中，选择 **Secrets**（密钥）。

1. 在 **Secrets**（密钥）页面，选择密钥链接。

1. 在密钥的详细信息页面上，选择 **Retrieve secret value**（检索密钥值）。

   下图显示了示例密钥，其中包含三个键值对：`keystore_password`、`truststore_password` 和 `ssl_key_password`。  
![在 Secrets Manager 中检索 SSL 密钥](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/images/connectors-msk-setup-1.png)

#### SASL/SCRAM
<a name="connectors-msk-setup-configuring-authentication-sasl-scram"></a>

如果您的集群使用 SCRAM 身份验证，请在部署连接器时提供与集群关联的 Secrets Manager 密钥。用户的 AWS 凭证（密钥和访问密钥）用于与集群进行身份验证。

按下表所示设置环境变量。


****  

| 参数 | 值 | 
| --- | --- | 
| auth\_type | SASL\_SSL\_SCRAM\_SHA512 | 
| secrets\_manager\_secret | 您的 AWS 密钥名称。 | 

下图显示了 Secrets Manager 控制台中的示例密钥，其中包含两个键值对：一个用于 `username`，另一个用于 `password`。

![在 Secrets Manager 中检索 SCRAM 密钥](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/images/connectors-msk-setup-2.png)


## 许可证信息
<a name="connectors-msk-license-information"></a>

使用此连接器，即表示您确认包含第三方组件（这些组件的列表可在此连接器的 [pom.xml](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-msk/pom.xml) 文件中找到），并同意 GitHub.com 上的 [LICENSE.txt](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-msk/LICENSE.txt) 文件中提供的相应第三方许可证中的条款。

## 其他资源
<a name="connectors-msk-additional-resources"></a>

有关此连接器的更多信息，请访问 GitHub.com 上的[相应站点](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-msk)。