

# Amazon Athena Apache Kafka 连接器
<a name="connectors-kafka"></a>

可通过适用于 Apache Kafka 的 Amazon Athena 连接器支持 Amazon Athena 对 Apace Kafka 主题运行 SQL 查询。使用此连接器在 Athena 中以表的形式查看 [Apache Kafka](https://kafka.apache.org/) 主题，以行的形式查看消息。

此连接器不使用 Glue 连接将配置属性集中保存到 Glue 中。连接配置通过 Lambda 完成。

## 先决条件
<a name="connectors-kafka-prerequisites"></a>

可以使用 Athena 控制台或 AWS Serverless Application Repository 将该连接器部署到您的 AWS 账户。有关更多信息，请参阅 [创建数据来源连接](connect-to-a-data-source.md) 或 [使用 AWS Serverless Application Repository 部署数据来源连接器](connect-data-source-serverless-app-repo.md)。

## 限制
<a name="connectors-kafka-limitations"></a>
+ 不支持写入 DDL 操作。
+ 任何相关的 Lambda 限制。有关更多信息，请参阅《AWS Lambda 开发人员指南》**中的 [Lambda 配额](https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html)。
+ 必须将筛选条件中的日期和时间戳数据类型转换为适当的数据类型。
+ CSV 文件类型不支持日期和时间戳数据类型，它们被视为 VARCHAR 值。
+ 不支持映射到嵌套 JSON 字段。连接器仅映射顶级字段。
+ 连接器不支持复杂类型。复杂类型解释为字符串。
+ 要提取或处理复杂的 JSON 值，请使用 Athena 中可用的 JSON 相关函数。有关更多信息，请参阅 [从字符串中提取 JSON 数据](extracting-data-from-JSON.md)。
+ 连接器不支持对 Kafka 消息元数据的访问。

## 术语
<a name="connectors-kafka-terms"></a>
+ **元数据处理程序** — 从您的数据库实例中检索元数据的 Lambda 处理程序。
+ **记录处理程序** — 从您的数据库实例中检索数据记录的 Lambda 处理程序。
+ **复合处理程序** — 从您的数据库实例中检索元数据和数据记录的 Lambda 处理程序。
+ **Kafka 端点** – 文本字符串，用于建立与 Kafka 实例的连接。

## 集群兼容性
<a name="connectors-kafka-cluster-compatibility"></a>

Kafka 连接器可用于以下集群类型。
+ **独立 Kafka** - 与 Kafka 直接连接（经过或未经身份验证）。
+ **Confluent** - 与 Confluent Kafka 直接连接。有关将 Athena 与 Confluent Kafka 数据配合使用的信息，请参阅 *AWS 商业智能博客*中的 [Visualize Confluent data in Quick using Amazon Athena](https://aws.amazon.com/blogs/business-intelligence/visualize-confluent-data-in-amazon-quicksight-using-amazon-athena/)。

### 连接到 Confluent
<a name="connectors-kafka-connecting-to-confluent"></a>

要连接到 Confluent，需要执行以下步骤：

1. 从 Confluent 生成 API 密钥。

1. 将 Confluent API 密钥的用户名和密码存储到 AWS Secrets Manager 中。

1. 在 Kafka 连接器中提供 `secrets_manager_secret` 环境变量的密钥名称。

1. 按照本文档的 [设置 Kafka 连接器](#connectors-kafka-setup) 节中的步骤执行操作。

## 支持的身份验证方法
<a name="connectors-kafka-supported-authentication-methods"></a>

连接器支持以下身份验证方法。
+ [SSL](https://kafka.apache.org/documentation/#security_ssl)
+ [SASL/SCRAM](https://kafka.apache.org/documentation/#security_sasl_scram)
+ SASL/PLAIN
+ SASL/PLAINTEXT
+ NO\$1AUTH
+ **自行管理的 Kafka 和 Confluent 平台** – SSL、SASL/SCRAM、SASL/PLAINTEXT、NO\$1AUTH
+ **自行管理的 Kafka 和 Confluent Cloud** - SASL/PLAIN

有关更多信息，请参阅 [为 Athena Kafka 连接器配置身份验证](#connectors-kafka-setup-configuring-authentication)。

## 支持的输入数据格式
<a name="connectors-kafka-supported-input-data-formats"></a>

连接器支持以下输入数据格式。
+ JSON
+ CSV
+ AVRO
+ PROTOBUF (PROTOCOL BUFFERS)

## 参数
<a name="connectors-kafka-parameters"></a>

使用本节中的参数来配置 Athena Kafka 连接器。
+ **auth\$1type** – 指定集群的身份验证类型。连接器支持以下身份验证类型：
  + **NO\$1AUTH** - 直接连接到 Kafka（例如，连接到部署在 EC2 实例上的 Kafka 集群，但不使用身份验证）。
  + **SASL\$1SSL\$1PLAIN** – 此方法使用 `SASL_SSL` 安全协议和 `PLAIN` SASL 机制。有关更多信息，请参阅 Apache Kafka 文档中的 [SASL 配置](https://kafka.apache.org/documentation/#security_sasl_config)。
  + **SASL\$1PLAINTEXT\$1PLAIN** – 此方法使用 `SASL_PLAINTEXT` 安全协议和 `PLAIN` SASL 机制。有关更多信息，请参阅 Apache Kafka 文档中的 [SASL 配置](https://kafka.apache.org/documentation/#security_sasl_config)。
  + **SASL\$1SSL\$1SCRAM\$1SHA512** - 您可以使用此身份验证类型控制对 Apache Kafka 集群的访问权限。此方法将用户名和密码存储在 AWS Secrets Manager 中。密钥必须与 Kafka 集群相关。有关更多信息，请参阅 Apache Kafka 文档中的[使用 SASL/SCRAM 进行身份验证](https://kafka.apache.org/documentation/#security_sasl_scram)。
  + **SASL\$1PLAINTEXT\$1SCRAM\$1SHA512** - 此方法使用 `SASL_PLAINTEXT` 安全协议和 `SCRAM_SHA512 SASL` 机制。此方法使用存储在 AWS Secrets Manager 中的用户名和密码。有关更多信息，请参阅 Apache Kafka 文档中的 [SASL 配置](https://kafka.apache.org/documentation/#security_sasl_config)一节。
  + **SSL** - SSL 身份验证使用密钥存储和信任存储文件来连接 Apache Kafka 集群。您必须生成信任存储和密钥存储文件，将其上传到 Amazon S3 存储桶，并在部署连接器时提供对 Amazon S3 的引用。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 私有密钥。有关更多信息，请参阅 Apache Kafka 文档中的[使用 SSL 进行加密和身份验证](https://kafka.apache.org/documentation/#security_ssl)。

    有关更多信息，请参阅 [为 Athena Kafka 连接器配置身份验证](#connectors-kafka-setup-configuring-authentication)。
+ **certificates\$1s3\$1reference** – 包含证书（密钥存储和信任存储文件）的 Amazon S3 位置。
+ **disable\$1spill\$1encryption** -（可选）当设置为 `True` 时，将禁用溢出加密。默认值为 `False`，此时将使用 AES-GCM 对溢出到 S3 的数据使用进行加密 - 使用随机生成的密钥，或者使用 KMS 生成密钥。禁用溢出加密可以提高性能，尤其是当您的溢出位置使用[服务器端加密](https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html)时。
+ **kafka\$1endpoint** – 提供给 Kafka 的端点详细信息。
+ **schema\$1registry\$1url** – 架构注册表的 URL 地址（例如 `http://schema-registry.example.org:8081`）。适用于 `AVRO` 和 `PROTOBUF` 数据格式。Athena 仅支持 Confluent 架构注册表。
+ **secrets\$1manager\$1secret** – 保存凭证的 AWS 密钥名称。
+ **溢出参数** – Lambda 函数将不适合内存的数据临时存储（“溢出”）到 Amazon S3。由同一 Lambda 函数访问的所有数据库实例都会溢出到同一位置。使用下表中的参数指定溢出位置。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)
+ **子网 ID** - 与 Lambda 函数可用于访问数据来源的子网对应的一个或多个子网 ID。
  + **公有 Kafka 集群或标准 Confluent Cloud 集群** - 将连接器与具有 NAT 网关的私有子网关联。
  + **具有私有连接的 Confluent Cloud 集群** - 将连接器与具有通往 Confluent Cloud 集群的路由的私有子网关联。
    + 对于 [AWS 中转网关](https://docs.confluent.io/cloud/current/networking/aws-transit-gateway.html)，子网必须位于连接到 Confluent Cloud 使用的相同中转网关的 VPC 中。
    + 对于 [VPC 对等](https://docs.confluent.io/cloud/current/networking/peering/aws-peering.html)，子网必须位于与 Confluent Cloud VPC 对等的 VPC 中。
    + 对于 [AWS PrivateLink](https://docs.confluent.io/cloud/current/networking/private-links/aws-privatelink.html)，子网必须位于具有通往 VPC 端点的路由的 VPC 中，该端点连接到 Confluent Cloud。

**注意**  
如果您将连接器部署到 VPC 中以访问私有资源，并且还想连接到 Confluent 等可公开访问的服务，则必须将连接器关联到某个具有 NAT 网关的私有子网。有关更多信息，请参阅《Amazon VPC 用户指南》中的 [NAT 网关](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-nat-gateway.html)。

## 数据类型支持
<a name="connectors-kafka-data-type-support"></a>

下表显示了 Kafka 和 Apache Arrow 支持的相应数据类型。


****  

| Kafka | Arrow | 
| --- | --- | 
| CHAR | VARCHAR | 
| VARCHAR | VARCHAR | 
| TIMESTAMP | MILLISECOND | 
| DATE | DAY | 
| BOOLEAN | BOOL | 
| SMALLINT | SMALLINT | 
| INTEGER | INT | 
| BIGINT | BIGINT | 
| DECIMAL | FLOAT8 | 
| DOUBLE | FLOAT8 | 

## 分区和拆分
<a name="connectors-kafka-partitions-and-splits"></a>

Kafka 主题分为多个分区。每个分区会进行排序。分区中的每条消息都有一个增量 ID，称为*偏移*。每个 Kafka 分区进一步分为多个分区，以进行并行处理。数据在 Kafka 集群中配置的保留期内可用。

## 最佳实践
<a name="connectors-kafka-best-practices"></a>

最佳做法是在查询 Athena 时使用谓词下推，如下例所示。

```
SELECT * 
FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" 
WHERE integercol = 2147483647
```

```
SELECT * 
FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" 
WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'
```

## 设置 Kafka 连接器
<a name="connectors-kafka-setup"></a>

在使用连接器之前，您必须设置 Apache Kafka 集群，使用 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)定义架构，并为连接器配置身份验证。

使用 AWS Glue 架构注册表时，请注意以下几点：
+ 确保 AWS Glue 架构注册表的 **Description**（描述）字段中的文本包含 `{AthenaFederationKafka}` 字符串。此标记字符串是与 Amazon Athena Kafka 连接器一起使用的 AWS Glue 注册表中的必需项。
+ 为了获得最佳性能，数据库名称和表名称仅限使用小写。使用混合大小写会使连接器执行不区分大小写的搜索，这种搜索的计算密集度更高。

**设置 Apache Kafka 环境和 AWS Glue 架构注册表**

1. 设置 Apache Kafka 环境。

1. 将 JSON 格式的 Kafka 主题描述文件（即其架构）上传到 AWS Glue 架构注册表。有关更多信息，请参阅《AWS Glue 开发人员指南》中的[与 AWS Glue 架构注册表集成](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html)。

1. 要在 AWS Glue 架构注册表中定义架构时使用 `AVRO` 或 `PROTOBUF` 数据格式，请执行以下操作：
   + 对于**架构名称**，以原始名称相同的大小写输入 Kafka 主题名称。
   + 对于**数据格式**，选择 **Apache Avro** 或 **Protocol Buffers**。

    有关详细示例，请参阅以下章节。

### AWS Glue 架构注册表的架构示例
<a name="connectors-kafka-setup-schema-examples"></a>

将架构上传到 [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)时，请使用本节中的示例格式。

#### JSON 类型架构示例
<a name="connectors-kafka-setup-schema-examples-json"></a>

在以下示例中，要在 AWS Glue 架构注册表中创建的架构指定 `json` 作为 `dataFormat` 的值，并将 `datatypejson` 用于 `topicName`。

**注意**  
`topicName` 的值应使用与 Kafka 中主题名称相同的大小写。

```
{
  "topicName": "datatypejson",
  "message": {
    "dataFormat": "json",
    "fields": [
      {
        "name": "intcol",
        "mapping": "intcol",
        "type": "INTEGER"
      },
      {
        "name": "varcharcol",
        "mapping": "varcharcol",
        "type": "VARCHAR"
      },
      {
        "name": "booleancol",
        "mapping": "booleancol",
        "type": "BOOLEAN"
      },
      {
        "name": "bigintcol",
        "mapping": "bigintcol",
        "type": "BIGINT"
      },
      {
        "name": "doublecol",
        "mapping": "doublecol",
        "type": "DOUBLE"
      },
      {
        "name": "smallintcol",
        "mapping": "smallintcol",
        "type": "SMALLINT"
      },
      {
        "name": "tinyintcol",
        "mapping": "tinyintcol",
        "type": "TINYINT"
      },
      {
        "name": "datecol",
        "mapping": "datecol",
        "type": "DATE",
        "formatHint": "yyyy-MM-dd"
      },
      {
        "name": "timestampcol",
        "mapping": "timestampcol",
        "type": "TIMESTAMP",
        "formatHint": "yyyy-MM-dd HH:mm:ss.SSS"
      }
    ]
  }
}
```

#### CSV 类型架构示例
<a name="connectors-kafka-setup-schema-examples-csv"></a>

在以下示例中，要在 AWS Glue 架构注册表中创建的架构指定 `csv` 作为 `dataFormat` 的值，并将 `datatypecsvbulk` 用于 `topicName`。`topicName` 的值应使用与 Kafka 中主题名称相同的大小写。

```
{
  "topicName": "datatypecsvbulk",
  "message": {
    "dataFormat": "csv",
    "fields": [
      {
        "name": "intcol",
        "type": "INTEGER",
        "mapping": "0"
      },
      {
        "name": "varcharcol",
        "type": "VARCHAR",
        "mapping": "1"
      },
      {
        "name": "booleancol",
        "type": "BOOLEAN",
        "mapping": "2"
      },
      {
        "name": "bigintcol",
        "type": "BIGINT",
        "mapping": "3"
      },
      {
        "name": "doublecol",
        "type": "DOUBLE",
        "mapping": "4"
      },
      {
        "name": "smallintcol",
        "type": "SMALLINT",
        "mapping": "5"
      },
      {
        "name": "tinyintcol",
        "type": "TINYINT",
        "mapping": "6"
      },
      {
        "name": "floatcol",
        "type": "DOUBLE",
        "mapping": "7"
      }
    ]
  }
}
```

#### AVRO 类型架构示例
<a name="connectors-kafka-setup-schema-examples-avro"></a>

以下示例用于在 AWS Glue 架构注册表中创建基于 AVRO 的架构。在 AWS Glue 架构注册表中定义架构时，对于**架构名称**，以原始名称相同的大小写输入 Kafka 主题名称；对于**数据格式**，选择 **Apache Avro**。由于您直接在注册表中指定了此信息，因此 `dataformat` 和 `topicName` 字段不是必填字段。

```
{
    "type": "record",
    "name": "avrotest",
    "namespace": "example.com",
    "fields": [{
            "name": "id",
            "type": "int"
        },
        {
            "name": "name",
            "type": "string"
        }
    ]
}
```

#### PROTOBUF 类型架构示例
<a name="connectors-kafka-setup-schema-examples-protobuf"></a>

以下示例用于在 AWS Glue 架构注册表中创建基于 PROTOBUF 的架构。在 AWS Glue 架构注册表中定义架构时，对于**架构名称**，以原始名称相同的大小写输入 Kafka 主题名称；对于**数据格式**，选择 **Protocol Buffers**。由于您直接在注册表中指定了此信息，因此 `dataformat` 和 `topicName` 字段不是必填字段。第一行将架构定义为 PROTOBUF。

```
syntax = "proto3";
message protobuftest {
string name = 1;
int64 calories = 2;
string colour = 3;
}
```

有关在 AWS Glue 架构注册表中添加注册表和架构的更多信息，请参阅 AWS Glue 文档中的[架构注册表入门](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-gs.html)。

### 为 Athena Kafka 连接器配置身份验证
<a name="connectors-kafka-setup-configuring-authentication"></a>

您可以使用多种方法对 Apache Kafka 集群进行身份验证，包括 SSL、SASL/SCRAM、SASL/PLAIN 和 SASL/PLAINTEXT。

下表显示了连接器的身份验证类型以及每种连接器的安全协议和 SASL 机制。有关更多信息，请参阅 Apache Kafka 文档中的[安全性](https://kafka.apache.org/documentation/#security)一节。


****  

| auth\$1type | security.protocol | sasl.mechanism | 集群类型兼容性 | 
| --- | --- | --- | --- | 
| SASL\$1SSL\$1PLAIN | SASL\$1SSL | PLAIN |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)  | 
| SASL\$1PLAINTEXT\$1PLAIN | SASL\$1PLAINTEXT | PLAIN |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)  | 
| SASL\$1SSL\$1SCRAM\$1SHA512 | SASL\$1SSL | SCRAM-SHA-512 |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)  | 
| SASL\$1PLAINTEXT\$1SCRAM\$1SHA512 | SASL\$1PLAINTEXT | SCRAM-SHA-512 |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)  | 
| SSL | SSL | 不适用 |  [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/connectors-kafka.html)  | 

#### SSL
<a name="connectors-kafka-setup-configuring-authentication-tls"></a>

如果集群经过 SSL 身份验证，则您必须生成信任存储和密钥存储文件，并将其上传到 Amazon S3 存储桶。部署连接器时，必须提供此 Amazon S3 参考资料。密钥存储、信任存储和 SSL 密钥存储在 AWS Secrets Manager 中。部署连接器时需要提供 AWS 密钥。

有关在 Secrets Manager 中创建密钥的信息，请参阅[创建 AWS Secrets Manager 密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_secret.html)。

要使用此身份验证类型，请按下表所示设置环境变量。


****  

| 参数 | 值 | 
| --- | --- | 
| auth\$1type | SSL | 
| certificates\$1s3\$1reference | 包含证书的 Amazon S3 位置。 | 
| secrets\$1manager\$1secret | 您的 AWS 密钥名称。 | 

在 Secrets Manager 中创建密钥后，您可以在 Secrets Manager 控制台中查看该密钥。

**在 Secrets Manager 中查看密钥**

1. 打开 Secrets Manager 控制台，网址为 [https://console.aws.amazon.com/secretsmanager/](https://console.aws.amazon.com/secretsmanager/)。

1. 在导航窗格中，选择 **Secrets**（密钥）。

1. 在 **Secrets**（密钥）页面，选择密钥链接。

1. 在密钥的详细信息页面上，选择 **Retrieve secret value**（检索密钥值）。

   下图显示了示例密钥，其中包含三个键值对：`keystore_password`、`truststore_password` 和 `ssl_key_password`。  
![\[在 Secrets Manager 中检索 SSL 密钥\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/images/connectors-kafka-setup-1.png)

有关在 Kafka 中使用 SSL 的更多信息，请参阅 Apache Kafka 文档中的[使用 SSL 进行加密和身份验证](https://kafka.apache.org/documentation/#security_ssl)。

#### SASL/SCRAM
<a name="connectors-kafka-setup-configuring-authentication-sasl-scram"></a>

如果您的集群使用 SCRAM 身份验证，请在部署连接器时提供与集群关联的 Secrets Manager 密钥。用户的 AWS 凭证（密钥和访问密钥）用于与集群进行身份验证。

按下表所示设置环境变量。


****  

| 参数 | 值 | 
| --- | --- | 
| auth\$1type | SASL\$1SSL\$1SCRAM\$1SHA512 | 
| secrets\$1manager\$1secret | 您的 AWS 密钥名称。 | 

下图显示了 Secrets Manager 控制台中的示例密钥，其中包含两个键值对：一个用于 `username`，另一个用于 `password`。

![\[在 Secrets Manager 中检索 SCRAM 密钥\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/images/connectors-kafka-setup-2.png)


有关在 Kafka 中使用 SASL/SCRAM 的更多信息，请参阅 Apache Kafka 文档中的[使用 SASL/SCRAM 进行身份验证](https://kafka.apache.org/documentation/#security_sasl_scram)。

## 许可证信息
<a name="connectors-kafka-license-information"></a>

使用此连接器，即表示您确认包含第三方组件（这些组件的列表可在此连接器的 [pom.xml](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-kafka/pom.xml) 文件中找到），并同意 GitHub.com 上的 [LICENSE.txt](https://github.com/awslabs/aws-athena-query-federation/blob/master/athena-kafka/LICENSE.txt) 文件中提供的相应第三方许可证中的条款。

## 其他资源
<a name="connectors-kafka-additional-resources"></a>

有关此连接器的更多信息，请访问 GitHub.com 上的[相应站点](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-kafka)。