

# 查询 AWS 服务日志
<a name="querying-aws-service-logs"></a>

本部分包含使用 Amazon Athena 查询常见数据集的几个过程，例如 AWS CloudTrail 日志、Amazon CloudFront 日志、经典负载均衡器日志、Application Load Balancer 日志、Amazon VPC 流日志和网络负载均衡器日志。

本节中的任务使用 Athena 控制台，但您也可以使用其他工具，例如 [Athena JDBC 驱动程序](connect-with-jdbc.md)、[AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/athena/) 或 [Amazon Athena API 参考](https://docs.aws.amazon.com/athena/latest/APIReference/)。

有关使用 AWS CloudFormation 自动创建 AWS 服务 日志表、分区和 Athena 中示例查询的信息，请参阅 AWS 大数据博客中的《[使用 Amazon Athena 实现自动化 AWS 服务 日志表创建并使用 Amazon Athena 进行查询](https://aws.amazon.com/blogs/big-data/automating-aws-service-logs-table-creation-and-querying-them-with-amazon-athena/)》。要了解如何将 Python 库用于 AWS Glue 以创建一个用于处理 AWS 服务 日志的常用框架，并在 Athena 中查询它们，请参阅 [使用 Amazon Athena 轻松查询 AWS 服务 日志](https://aws.amazon.com/blogs/big-data/easily-query-aws-service-logs-using-amazon-athena/)。

本节中的主题假定您已配置适当权限来访问 Athena 和要查询的数据所在的 Amazon S3 存储桶。有关更多信息，请参阅[设置、管理和编程访问](setting-up.md) 和[开始使用](getting-started.md)。

**Topics**
+ [应用程序负载均衡器](application-load-balancer-logs.md)
+ [Elastic Load Balancing](elasticloadbalancer-classic-logs.md)
+ [CloudFront](cloudfront-logs.md)
+ [CloudTrail](cloudtrail-logs.md)
+ [Amazon EMR](emr-logs.md)
+ [Global Accelerator](querying-global-accelerator-flow-logs.md)
+ [GuardDuty](querying-guardduty.md)
+ [Network Firewall](querying-network-firewall-logs.md)
+ [网络负载均衡器](networkloadbalancer-classic-logs.md)
+ [Route 53](querying-r53-resolver-logs.md)
+ [Amazon SES](querying-ses-logs.md)
+ [Amazon VPC](vpc-flow-logs.md)
+ [AWS WAF](waf-logs.md)

# 查询应用程序负载均衡器日志
<a name="application-load-balancer-logs"></a>

Application Load Balancer 是 Elastic Load Balancing 的负载均衡选项，它允许使用容器在微服务部署中实现流量分配。通过查询 Application Load Balancer 日志，您可以查看进出 Elastic Load Balancing 实例和后端应用程序的流量来源、延迟和传输字节。有关更多信息，请参阅《User Guide for Application Load Balancers》**中的 [Access logs for your Application Load Balancer](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html) 和 [Connection logs for your Application Load Balancer](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html)。

## 先决条件
<a name="application-load-balancer-logs-prerequisites"></a>
+ 启用[访问日志记录](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html)功能或[连接日志记录](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html)功能，以便将应用程序负载均衡器日志保存到 Amazon S3 存储桶。
+ 用于保存您将为 Athena 创建的表的数据库。要创建数据库，你可以使用 Athena 或 AWS Glue 控制台。有关更多信息，请参阅本指南中的 [在 Athena 中创建数据库](creating-databases.md) 或《AWS Glue 开发人员指南**》中的 [通过 AWS Glue 控制台使用数据库](https://docs.aws.amazon.com/glue/latest/dg/console-databases.html)。

**Topics**
+ [先决条件](#application-load-balancer-logs-prerequisites)
+ [为 ALB 访问日志创建表](create-alb-access-logs-table.md)
+ [使用分区投影功能在 Athena 中为 ALB 访问日志创建表](create-alb-access-logs-table-partition-projection.md)
+ [ALB 访问日志的示例查询](query-alb-access-logs-examples.md)
+ [为 ALB 连接日志创建表](create-alb-connection-logs-table.md)
+ [使用分区投影功能在 Athena 中为 ALB 连接日志创建表](create-alb-connection-logs-table-partition-projection.md)
+ [ALB 连接日志的示例查询](query-alb-connection-logs-examples.md)
+ [其他资源](application-load-balancer-logs-additional-resources.md)

# 为 ALB 访问日志创建表
<a name="create-alb-access-logs-table"></a>

1. 将以下 `CREATE TABLE` 语句复制并粘贴到 Athena 控制台的查询编辑器中，然后根据您自己的日志条目要求进行必要的修改。有关 Amazon 控制台入门的更多信息，请参阅 [开始使用](getting-started.md)。将 `LOCATION` 子句中的路径替换为 Amazon S3 访问日志文件夹位置。有关访问日志文件位置的更多信息，请参阅《User Guide for Application Load Balancers》**中的 [Access log files](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format)。

   有关每个日志文件字段的信息，请参阅《*应用程序负载均衡器用户指南*》中的[访问日志条目](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-entry-format)。
**注意**  
以下示例 `CREATE TABLE` 语句包括最近添加的 `classification`、`classification_reason` 和 `conn_trace_id`（“可追溯性 ID”或 TID）列。要为不包含这些条目的应用程序负载均衡器访问日志创建表，请从 `CREATE TABLE` 语句中删除相应的列，并相应地修改正则表达式。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS alb_access_logs (
               type string,
               time string,
               elb string,
               client_ip string,
               client_port int,
               target_ip string,
               target_port int,
               request_processing_time double,
               target_processing_time double,
               response_processing_time double,
               elb_status_code int,
               target_status_code string,
               received_bytes bigint,
               sent_bytes bigint,
               request_verb string,
               request_url string,
               request_proto string,
               user_agent string,
               ssl_cipher string,
               ssl_protocol string,
               target_group_arn string,
               trace_id string,
               domain_name string,
               chosen_cert_arn string,
               matched_rule_priority string,
               request_creation_time string,
               actions_executed string,
               redirect_url string,
               lambda_error_reason string,
               target_port_list string,
               target_status_code_list string,
               classification string,
               classification_reason string,
               conn_trace_id string
               )
               ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
               WITH SERDEPROPERTIES (
               'serialization.format' = '1',
               'input.regex' = 
           '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) (.*) (- |[^ ]*)\" \"([^\"]*)\" ([A-Z0-9-_]+) ([A-Za-z0-9.-]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" ([-.0-9]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^ ]*)\" \"([^\\s]+?)\" \"([^\\s]+)\" \"([^ ]*)\" \"([^ ]*)\" ?([^ ]*)? ?( .*)?'
               )
               LOCATION 's3://amzn-s3-demo-bucket/access-log-folder-path/'
   ```
**注意**  
建议始终保留 `input.regex` 参数末尾的模式 *`?( .*)?`*，以便在添加新的 ALB 日志字段时处理新来的日志条目。

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `alb_access_logs` 表，使其中的数据可以供您发出查询。

# 使用分区投影功能在 Athena 中为 ALB 访问日志创建表
<a name="create-alb-access-logs-table-partition-projection"></a>

由于 ALB 访问日志具有一个可以预先指定其分区方案的已知结构，所以您可以使用 Athena 分区投影功能，以此来减少查询运行时间并自动管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

从指定日期开始到当前日期为止，以下示例 `CREATE TABLE` 语句会自动在 ALB 访问日志上为单个 AWS 区域使用分区投影。该语句以上一部分中的示例为基础，但添加了 `PARTITIONED BY` 和 `TBLPROPERTIES` 子句以启用分区投影。在 `LOCATION` 和 `storage.location.template` 子句中，将占位符替换为标识 ALB 访问日志在 Amazon S3 存储桶中位置的值。有关访问日志文件位置的更多信息，请参阅《User Guide for Application Load Balancers》**中的 [Access log files](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format)。对于 `projection.day.range`，将 *2022*/*01*/*01* 替换为要使用的开始日期。成功运行查询后，您可以查询表。您无需运行 `ALTER TABLE ADD PARTITION` 来加载分区。有关每个日志文件字段的信息，请参阅 [Access log entries](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-entry-format)。

```
CREATE EXTERNAL TABLE IF NOT EXISTS alb_access_logs (
            type string,
            time string,
            elb string,
            client_ip string,
            client_port int,
            target_ip string,
            target_port int,
            request_processing_time double,
            target_processing_time double,
            response_processing_time double,
            elb_status_code int,
            target_status_code string,
            received_bytes bigint,
            sent_bytes bigint,
            request_verb string,
            request_url string,
            request_proto string,
            user_agent string,
            ssl_cipher string,
            ssl_protocol string,
            target_group_arn string,
            trace_id string,
            domain_name string,
            chosen_cert_arn string,
            matched_rule_priority string,
            request_creation_time string,
            actions_executed string,
            redirect_url string,
            lambda_error_reason string,
            target_port_list string,
            target_status_code_list string,
            classification string,
            classification_reason string,
            conn_trace_id string
            )
            PARTITIONED BY
            (
             day STRING
            )
            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
            WITH SERDEPROPERTIES (
            'serialization.format' = '1',
            'input.regex' = 
        '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) (.*) (- |[^ ]*)\" \"([^\"]*)\" ([A-Z0-9-_]+) ([A-Za-z0-9.-]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" ([-.0-9]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^ ]*)\" \"([^\\s]+?)\" \"([^\\s]+)\" \"([^ ]*)\" \"([^ ]*)\" ?([^ ]*)? ?( .*)?'
            )
            LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/<ACCOUNT-NUMBER>/elasticloadbalancing/<REGION>/'
            TBLPROPERTIES
            (
             "projection.enabled" = "true",
             "projection.day.type" = "date",
             "projection.day.range" = "2022/01/01,NOW",
             "projection.day.format" = "yyyy/MM/dd",
             "projection.day.interval" = "1",
             "projection.day.interval.unit" = "DAYS",
             "storage.location.template" = "s3://amzn-s3-demo-bucket/AWSLogs/<ACCOUNT-NUMBER>/elasticloadbalancing/<REGION>/${day}"
            )
```

更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

**注意**  
建议 `input.regex` 参数末尾始终保留 *?( .\$1)?* 模式，以便在添加新的 ALB 日志字段时处理新来的日志条目。

# ALB 访问日志的示例查询
<a name="query-alb-access-logs-examples"></a>

以下查询计算按客户端 IP 地址分组的负载均衡器接收到的 HTTP GET 请求的数目：

```
SELECT COUNT(request_verb) AS
 count,
 request_verb,
 client_ip
FROM alb_access_logs
GROUP BY request_verb, client_ip
LIMIT 100;
```

另一个查询显示 Safari 浏览器用户访问的 URL：

```
SELECT request_url
FROM alb_access_logs
WHERE user_agent LIKE '%Safari%'
LIMIT 10;
```

以下查询显示了 ELB 状态代码值大于或等于 500 的记录。

```
SELECT * FROM alb_access_logs
WHERE elb_status_code >= 500
```

以下示例演示了如何根据 `datetime` 解析日志：

```
SELECT client_ip, sum(received_bytes) 
FROM alb_access_logs
WHERE parse_datetime(time,'yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z') 
     BETWEEN parse_datetime('2018-05-30-12:00:00','yyyy-MM-dd-HH:mm:ss') 
     AND parse_datetime('2018-05-31-00:00:00','yyyy-MM-dd-HH:mm:ss') 
GROUP BY client_ip;
```

以下查询会查询在指定日期内对所有 ALB 访问日志使用分区投影的表。

```
SELECT * 
FROM alb_access_logs 
WHERE day = '2022/02/12'
```

# 为 ALB 连接日志创建表
<a name="create-alb-connection-logs-table"></a>

1. 将以下示例 `CREATE TABLE` 语句复制并粘贴到 Athena 控制台的查询编辑器中，然后根据您自己的日志条目要求进行必要的修改。有关 Amazon 控制台入门的更多信息，请参阅 [开始使用](getting-started.md)。将 `LOCATION` 子句中的路径替换为 Amazon S3 连接日志文件夹位置。有关连接日志文件位置的更多信息，请参阅《User Guide for Application Load Balancers》**中的 [Connection log files](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html#connection-log-file-format)。有关每个日志文件字段的信息，请参阅 [Connection log entries](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html#connection-log-entry-format)。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS alb_connection_logs (
            time string,
            client_ip string,
            client_port int,
            listener_port int,
            tls_protocol string,
            tls_cipher string,
            tls_handshake_latency double,
            leaf_client_cert_subject string,
            leaf_client_cert_validity string,
            leaf_client_cert_serial_number string,
            tls_verify_status string,
            conn_trace_id string
            ) 
            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
            WITH SERDEPROPERTIES (
            'serialization.format' = '1',
            'input.regex' =
             '([^ ]*) ([^ ]*) ([0-9]*) ([0-9]*) ([A-Za-z0-9.-]*) ([^ ]*) ([-.0-9]*) \"([^\"]*)\" ([^ ]*) ([^ ]*) ([^ ]*) ?([^ ]*)?( .*)?'
            )
            LOCATION 's3://amzn-s3-demo-bucket/connection-log-folder-path/'
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `alb_connection_logs` 表，使其中的数据可以供您发出查询。

# 使用分区投影功能在 Athena 中为 ALB 连接日志创建表
<a name="create-alb-connection-logs-table-partition-projection"></a>

由于 ALB 日志具有一个可以预先指定其分区方案的已知结构，您可以使用 Athena 分区投影功能，以此来减少查询运行时间并自动管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

从指定日期开始到当前日期为止，以下示例 `CREATE TABLE` 语句会自动在 ALB 连接日志上为单个 AWS 区域使用分区投影。该语句以上一部分中的示例为基础，但添加了 `PARTITIONED BY` 和 `TBLPROPERTIES` 子句以启用分区投影。在 `LOCATION` 和 `storage.location.template` 子句中，将占位符替换为标识 ALB 连接日志在 Amazon S3 存储桶中位置的值。有关连接日志文件位置的更多信息，请参阅《User Guide for Application Load Balancers》**中的 [Connection log files](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html#connection-log-file-format)。对于 `projection.day.range`，将 *2023*/*01*/*01* 替换为要使用的开始日期。成功运行查询后，您可以查询表。您无需运行 `ALTER TABLE ADD PARTITION` 来加载分区。有关每个日志文件字段的信息，请参阅 [Connection log entries](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-connection-logs.html#connection-log-entry-format)。

```
CREATE EXTERNAL TABLE IF NOT EXISTS alb_connection_logs (
         time string,
         client_ip string,
         client_port int,
         listener_port int,
         tls_protocol string,
         tls_cipher string,
         tls_handshake_latency double,
         leaf_client_cert_subject string,
         leaf_client_cert_validity string,
         leaf_client_cert_serial_number string,
         tls_verify_status string,
         conn_trace_id string
         )
            PARTITIONED BY
            (
             day STRING
            )
            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
            WITH SERDEPROPERTIES (
            'serialization.format' = '1',
            'input.regex' =
             '([^ ]*) ([^ ]*) ([0-9]*) ([0-9]*) ([A-Za-z0-9.-]*) ([^ ]*) ([-.0-9]*) \"([^\"]*)\" ([^ ]*) ([^ ]*) ([^ ]*) ?([^ ]*)?( .*)?'
            )
            LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/<ACCOUNT-NUMBER>/elasticloadbalancing/<REGION>/'
            TBLPROPERTIES
            (
             "projection.enabled" = "true",
             "projection.day.type" = "date",
             "projection.day.range" = "2023/01/01,NOW",
             "projection.day.format" = "yyyy/MM/dd",
             "projection.day.interval" = "1",
             "projection.day.interval.unit" = "DAYS",
             "storage.location.template" = "s3://amzn-s3-demo-bucket/AWSLogs/<ACCOUNT-NUMBER>/elasticloadbalancing/<REGION>/${day}"
            )
```

更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

# ALB 连接日志的示例查询
<a name="query-alb-connection-logs-examples"></a>

以下查询对 `tls_verify_status` 值不为 `'Success'` 的情况进行计数，结果按客户端 IP 地址分组：

```
SELECT DISTINCT client_ip, count() AS count FROM alb_connection_logs
WHERE tls_verify_status != 'Success'
GROUP BY client_ip
ORDER BY count() DESC;
```

以下查询搜索指定时间范围内 `tls_handshake_latency` 值超过 2 秒的情况：

```
SELECT * FROM alb_connection_logs
WHERE 
  (
    parse_datetime(time, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS''Z') 
    BETWEEN 
    parse_datetime('2024-01-01-00:00:00', 'yyyy-MM-dd-HH:mm:ss') 
    AND 
    parse_datetime('2024-03-20-00:00:00', 'yyyy-MM-dd-HH:mm:ss') 
  ) 
  AND 
    (tls_handshake_latency >= 2.0);
```

# 其他资源
<a name="application-load-balancer-logs-additional-resources"></a>

有关使用 ALB 日志的更多信息，请参阅以下资源。
+ *AWS 知识中心*中的[如何使用 Amazon Athena 分析应用程序负载均衡器访问日志](https://repost.aws/knowledge-center/athena-analyze-access-logs)。
+ 有关弹性负载均衡 HTTP 状态代码的信息，请参阅《应用程序负载均衡器用户指南**》中的[对应用程序负载均衡器进行问题排查](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-troubleshooting.html)。
+ 在 *AWS 大数据博客*中的[使用 AWS Glue 自定义分类器和 Amazon Athena 更有效地编目和分析应用程序负载均衡器日志](https://aws.amazon.com/blogs/big-data/catalog-and-analyze-application-load-balancer-logs-more-efficiently-with-aws-glue-custom-classifiers-and-amazon-athena/)。

# 查询经典负载均衡器日志
<a name="elasticloadbalancer-classic-logs"></a>

使用经典负载均衡器日志，分析和了解传入和传出 Elastic Load Balancing 实例和后端应用程序的流量模式。您可以查看流量来源、延迟和已传输的字节。

在分析 Elastic Load Balancing 日志之前，请对其进行配置以保存在目标 Amazon S3 存储桶中。有关更多信息，请参阅[为经典负载均衡器启用访问日志](https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/enable-access-logs.html)。

**为 Elastic Load Balancing 日志创建表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。检查 Elastic Load Balancing 日志的[语法](https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/access-log-collection.html#access-log-entry-format)。您可能需要更新以下查询以包含列和最新版本的记录的 Regex 语法。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs (
    
    timestamp string,
    elb_name string,
    request_ip string,
    request_port int,
    backend_ip string,
    backend_port int,
    request_processing_time double,
    backend_processing_time double,
    client_response_time double,
    elb_response_code string,
    backend_response_code string,
    received_bytes bigint,
    sent_bytes bigint,
    request_verb string,
    url string,
    protocol string,
    user_agent string,
    ssl_cipher string,
    ssl_protocol string
   )
   ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
   WITH SERDEPROPERTIES (
    'serialization.format' = '1',
    'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \\\"([^ ]*) ([^ ]*) (- |[^ ]*)\\\" (\"[^\"]*\") ([A-Z0-9-]+) ([A-Za-z0-9.-]*)$'
   )
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/AWS_account_ID/elasticloadbalancing/';
   ```

1. 修改 `LOCATION` Amazon S3 存储桶以指定您的 Elastic Load Balancing 日志的目标。

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `elb_logs` 表，使其中的数据可以供查询。有关更多信息，请参阅 [示例查询](#query-elb-classic-example)。

## 示例查询
<a name="query-elb-classic-example"></a>

使用类似于以下示例的查询。它列出返回 `4XX` 或 `5XX` 错误响应代码的后端应用程序服务器。使用 `LIMIT` 运算符可限制每次查询的日志数目。

```
SELECT
 timestamp,
 elb_name,
 backend_ip,
 backend_response_code
FROM elb_logs
WHERE backend_response_code LIKE '4%' OR
      backend_response_code LIKE '5%'
LIMIT 100;
```

使用后续查询对按后端 IP 地址和 Elastic Load Balancing 实例名称分组的所有事务的响应时间进行求和。

```
SELECT sum(backend_processing_time) AS
 total_ms,
 elb_name,
 backend_ip
FROM elb_logs WHERE backend_ip <> ''
GROUP BY backend_ip, elb_name
LIMIT 100;
```

有关更多信息，请参阅[使用 Athena 分析 S3 中的数据](https://aws.amazon.com/blogs/big-data/analyzing-data-in-s3-using-amazon-athena/)。

# 查询 Amazon CloudFront 日志
<a name="cloudfront-logs"></a>

您可以配置 Amazon CloudFront CDN 以将 Web 分配访问日志导出到 Amazon Simple Storage Service。使用这些日志可跨 CloudFront 提供的 Web 属性浏览用户的网上冲浪模式。

在开始查询日志之前，请在您的首选 CloudFront 分配上启用 Web 分配访问登录。有关信息，请参阅《Amazon CloudFront 开发人员指南**》中的 [访问日志](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html)。记录保存这些日志的 Amazon S3 存储桶。

**Topics**
+ [为 CloudFront 标准日志创建表（旧版）](create-cloudfront-table-standard-logs.md)
+ [使用手动分区在 Athena 中为使用 JSON 的 CloudFront 日志创建表](create-cloudfront-table-manual-json.md)
+ [使用手动分区在 Athena 中为使用 Parquet 的 CloudFront 日志创建表](create-cloudfront-table-manual-parquet.md)
+ [使用分区投影在 Athena 中为使用 JSON 的 CloudFront 日志创建表](create-cloudfront-table-partition-json.md)
+ [使用分区投影在 Athena 中为使用 Parquet 的 CloudFront 日志创建表](create-cloudfront-table-partition-parquet.md)
+ [为 CloudFront 实时日志创建表](create-cloudfront-table-real-time-logs.md)
+ [其他资源](cloudfront-logs-additional-resources.md)

# 为 CloudFront 标准日志创建表（旧版）
<a name="create-cloudfront-table-standard-logs"></a>

**注意**  
以下过程适用于 CloudFront 中的 Web 分配访问日志。它不适用于 RTMP 分配中的流日志。

**为 CloudFront 标准日志文件字段创建表**

1. 将以下示例 DDL 语句复制并粘贴到 Athena 控制台的查询编辑器中。该示例语句使用《Amazon CloudFront 开发人员指南》**的[标准日志文件字段](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat)部分中记录的日志文件字段。修改用于存储日志的 Amazon S3 存储桶的 `LOCATION`。有关使用查询编辑器的信息，请参阅 [开始使用](getting-started.md)。

   此查询指定了 `ROW FORMAT DELIMITED` 和 `FIELDS TERMINATED BY '\t'`，表示字段由制表符分隔。对于 `ROW FORMAT DELIMITED`，Athena 默认使用 [LazySimpleSerDe](lazy-simple-serde.md)。列 `date` 使用反引号 (`) 转义，因为它是 Athena 中的保留字。有关信息，请参阅[转义查询中的保留关键字](reserved-words.md)。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_standard_logs (
     `date` DATE,
     time STRING,
     x_edge_location STRING,
     sc_bytes BIGINT,
     c_ip STRING,
     cs_method STRING,
     cs_host STRING,
     cs_uri_stem STRING,
     sc_status INT,
     cs_referrer STRING,
     cs_user_agent STRING,
     cs_uri_query STRING,
     cs_cookie STRING,
     x_edge_result_type STRING,
     x_edge_request_id STRING,
     x_host_header STRING,
     cs_protocol STRING,
     cs_bytes BIGINT,
     time_taken FLOAT,
     x_forwarded_for STRING,
     ssl_protocol STRING,
     ssl_cipher STRING,
     x_edge_response_result_type STRING,
     cs_protocol_version STRING,
     fle_status STRING,
     fle_encrypted_fields INT,
     c_port INT,
     time_to_first_byte FLOAT,
     x_edge_detailed_result_type STRING,
     sc_content_type STRING,
     sc_content_len BIGINT,
     sc_range_start BIGINT,
     sc_range_end BIGINT
   )
   ROW FORMAT DELIMITED 
   FIELDS TERMINATED BY '\t'
   LOCATION 's3://amzn-s3-demo-bucket/'
   TBLPROPERTIES ( 'skip.header.line.count'='2' )
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `cloudfront_standard_logs` 表，使其中的数据可以供您发出查询。

## 示例查询
<a name="query-examples-cloudfront-logs"></a>

以下查询将累计 2018 年 6 月 9 日到 6 月 11 日之间由 CloudFront 提供的字节的数量。将日期列名称用双引号引起来，因为它是保留字。

```
SELECT SUM(bytes) AS total_bytes
FROM cloudfront_standard_logs
WHERE "date" BETWEEN DATE '2018-06-09' AND DATE '2018-06-11'
LIMIT 100;
```

要从查询结果中消除重复的行（例如，重复的空行），您可以使用 `SELECT DISTINCT` 语句，如以下示例所示。

```
SELECT DISTINCT * 
FROM cloudfront_standard_logs 
LIMIT 10;
```

# 使用手动分区在 Athena 中为使用 JSON 的 CloudFront 日志创建表
<a name="create-cloudfront-table-manual-json"></a>

**为使用 JSON 格式的 CloudFront 标准日志文件字段创建表**

1. 将以下示例 DDL 语句复制并粘贴到 Athena 控制台的查询编辑器中。该示例语句使用《Amazon CloudFront 开发人员指南》**的[标准日志文件字段](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat)部分中记录的日志文件字段。修改用于存储日志的 Amazon S3 存储桶的 `LOCATION`。

   此查询使用 OpenX JSON SerDe 和以下 SerDe 属性来正确读取 Athena 中的 JSON 字段。

   ```
   CREATE EXTERNAL TABLE `cf_logs_manual_partition_json`(
     `date` string , 
     `time` string , 
     `x-edge-location` string , 
     `sc-bytes` string , 
     `c-ip` string , 
     `cs-method` string , 
     `cs(host)` string , 
     `cs-uri-stem` string , 
     `sc-status` string , 
     `cs(referer)` string , 
     `cs(user-agent)` string , 
     `cs-uri-query` string , 
     `cs(cookie)` string , 
     `x-edge-result-type` string , 
     `x-edge-request-id` string , 
     `x-host-header` string , 
     `cs-protocol` string , 
     `cs-bytes` string , 
     `time-taken` string , 
     `x-forwarded-for` string , 
     `ssl-protocol` string , 
     `ssl-cipher` string , 
     `x-edge-response-result-type` string , 
     `cs-protocol-version` string , 
     `fle-status` string , 
     `fle-encrypted-fields` string , 
     `c-port` string , 
     `time-to-first-byte` string , 
     `x-edge-detailed-result-type` string , 
     `sc-content-type` string , 
     `sc-content-len` string , 
     `sc-range-start` string , 
     `sc-range-end` string )
   ROW FORMAT SERDE 
     'org.openx.data.jsonserde.JsonSerDe' 
   WITH SERDEPROPERTIES ( 
     'paths'='c-ip,c-port,cs(Cookie),cs(Host),cs(Referer),cs(User-Agent),cs-bytes,cs-method,cs-protocol,cs-protocol-version,cs-uri-query,cs-uri-stem,date,fle-encrypted-fields,fle-status,sc-bytes,sc-content-len,sc-content-type,sc-range-end,sc-range-start,sc-status,ssl-cipher,ssl-protocol,time,time-taken,time-to-first-byte,x-edge-detailed-result-type,x-edge-location,x-edge-request-id,x-edge-response-result-type,x-edge-result-type,x-forwarded-for,x-host-header') 
   STORED AS INPUTFORMAT 
     'org.apache.hadoop.mapred.TextInputFormat' 
   OUTPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION
     's3://amzn-s3-demo-bucket/'
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `cf_logs_manual_partition_json` 表，使其中的数据可以供您发出查询。

## 示例查询
<a name="query-examples-cloudfront-logs-manual-json"></a>

以下查询将累计 2025 年 1 月 15 日由 CloudFront 提供的字节数。

```
SELECT sum(cast("sc-bytes" as BIGINT)) as sc
FROM cf_logs_manual_partition_json
WHERE "date"='2025-01-15'
```

要从查询结果中消除重复的行（例如，重复的空行），您可以使用 `SELECT DISTINCT` 语句，如以下示例所示。

```
SELECT DISTINCT * FROM cf_logs_manual_partition_json
```

# 使用手动分区在 Athena 中为使用 Parquet 的 CloudFront 日志创建表
<a name="create-cloudfront-table-manual-parquet"></a>

**为使用 Parquet 格式的 CloudFront 标准日志文件字段创建表**

1. 将以下示例 DDL 语句复制并粘贴到 Athena 控制台的查询编辑器中。该示例语句使用《Amazon CloudFront 开发人员指南》**的[标准日志文件字段](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#BasicDistributionFileFormat)部分中记录的日志文件字段。

   此查询使用 ParquetHiveSerDe 和以下 SerDe 属性来正确读取 Athena 中的 Parquet 字段。

   ```
   CREATE EXTERNAL TABLE `cf_logs_manual_partition_parquet`(
     `date` string, 
     `time` string, 
     `x_edge_location` string, 
     `sc_bytes` string, 
     `c_ip` string, 
     `cs_method` string, 
     `cs_host` string, 
     `cs_uri_stem` string, 
     `sc_status` string, 
     `cs_referer` string, 
     `cs_user_agent` string, 
     `cs_uri_query` string, 
     `cs_cookie` string, 
     `x_edge_result_type` string, 
     `x_edge_request_id` string, 
     `x_host_header` string, 
     `cs_protocol` string, 
     `cs_bytes` string, 
     `time_taken` string, 
     `x_forwarded_for` string, 
     `ssl_protocol` string, 
     `ssl_cipher` string, 
     `x_edge_response_result_type` string, 
     `cs_protocol_version` string, 
     `fle_status` string, 
     `fle_encrypted_fields` string, 
     `c_port` string, 
     `time_to_first_byte` string, 
     `x_edge_detailed_result_type` string, 
     `sc_content_type` string, 
     `sc_content_len` string, 
     `sc_range_start` string, 
     `sc_range_end` string)
   ROW FORMAT SERDE 
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
   STORED AS INPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
   OUTPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     's3://amzn-s3-demo-bucket/'
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `cf_logs_manual_partition_parquet` 表，使其中的数据可以供您发出查询。

## 示例查询
<a name="query-examples-cloudfront-logs-manual-parquet"></a>

以下查询将累计 2025 年 1 月 19 日由 CloudFront 提供的字节数。

```
SELECT sum(cast("sc_bytes" as BIGINT)) as sc
FROM cf_logs_manual_partition_parquet
WHERE "date"='2025-01-19'
```

要从查询结果中消除重复的行（例如，重复的空行），您可以使用 `SELECT DISTINCT` 语句，如以下示例所示。

```
SELECT DISTINCT * FROM cf_logs_manual_partition_parquet
```

# 使用分区投影在 Athena 中为使用 JSON 的 CloudFront 日志创建表
<a name="create-cloudfront-table-partition-json"></a>

您可以使用 Athena 分区投影功能缩短查询运行时间并自动化管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

以下示例 CREATE TABLE 语句将为单个 AWS 区域中截至当前时间的，来自指定 CloudFront 分配的 CloudFront 日志，自动使用分区投影功能。成功运行查询后，您可以查询表。

```
CREATE EXTERNAL TABLE `cloudfront_logs_pp`(
  `date` string, 
  `time` string, 
  `x-edge-location` string, 
  `sc-bytes` string, 
  `c-ip` string, 
  `cs-method` string, 
  `cs(host)` string, 
  `cs-uri-stem` string, 
  `sc-status` string, 
  `cs(referer)` string, 
  `cs(user-agent)` string, 
  `cs-uri-query` string, 
  `cs(cookie)` string, 
  `x-edge-result-type` string, 
  `x-edge-request-id` string, 
  `x-host-header` string, 
  `cs-protocol` string, 
  `cs-bytes` string, 
  `time-taken` string, 
  `x-forwarded-for` string, 
  `ssl-protocol` string, 
  `ssl-cipher` string, 
  `x-edge-response-result-type` string, 
  `cs-protocol-version` string, 
  `fle-status` string, 
  `fle-encrypted-fields` string, 
  `c-port` string, 
  `time-to-first-byte` string, 
  `x-edge-detailed-result-type` string, 
  `sc-content-type` string, 
  `sc-content-len` string, 
  `sc-range-start` string, 
  `sc-range-end` string)
  PARTITIONED BY(
         distributionid string,
         year int,
         month int,
         day int,
         hour int )
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ( 
  'paths'='c-ip,c-port,cs(Cookie),cs(Host),cs(Referer),cs(User-Agent),cs-bytes,cs-method,cs-protocol,cs-protocol-version,cs-uri-query,cs-uri-stem,date,fle-encrypted-fields,fle-status,sc-bytes,sc-content-len,sc-content-type,sc-range-end,sc-range-start,sc-status,ssl-cipher,ssl-protocol,time,time-taken,time-to-first-byte,x-edge-detailed-result-type,x-edge-location,x-edge-request-id,x-edge-response-result-type,x-edge-result-type,x-forwarded-for,x-host-header') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/CloudFront/'
TBLPROPERTIES (
  'projection.distributionid.type'='enum',
  'projection.distributionid.values'='E2Oxxxxxxxxxxx',
  'projection.day.range'='01,31', 
  'projection.day.type'='integer', 
  'projection.day.digits'='2', 
  'projection.enabled'='true', 
  'projection.month.range'='01,12', 
  'projection.month.type'='integer', 
  'projection.month.digits'='2', 
  'projection.year.range'='2025,2026', 
  'projection.year.type'='integer', 
  'projection.hour.range'='00,23',
  'projection.hour.type'='integer',
  'projection.hour.digits'='2',
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/CloudFront/${distributionid}/${year}/${month}/${day}/${hour}/')
```

对于上一示例中使用的属性，应注意以下事项。
+ **表名** – 表名 *`cloudfront_logs_pp`* 是可替换的。您可以将其更改为自己喜欢的任何名称。
+ **位置** – 修改 `s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/` 以指向您的 Amazon S3 存储桶。
+ **分配 ID** – 对于 `projection.distributionid.values`，您可以通过用逗号分隔符来指定多个分配 ID。例如，*<distributionID1>*, *<distributionID2>*。
+ **年份范围**–在 `projection.year.range` 中，您可以根据自己的数据设置年份范围。您可以将其调整为任何期间，例如 *2025*、*2026*。
**注意**  
如果包含空分区，例如针对未来日期的分区（例如：2025-2040），则可能会影响查询性能。但是，分区投影旨在有效处理未来日期。为保持最佳性能，请务必要谨慎管理分区，尽可能避免过多空分区。
+ **存储位置模板** – 您必须确保根据以下 CloudFront 分区结构和 S3 路径正确更新 `storage.location.template`。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/create-cloudfront-table-partition-json.html)

  在确认 CloudFront 分区结构和 S3 结构符合所需模式后，请按如下方式更新 `storage.location.template`：

  ```
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/account_id/CloudFront/${distributionid}/folder2/${year}/${month}/${day}/${hour}/folder3/'
  ```
**注意**  
正确配置 `storage.location.template` 对于确保正确的数据存储和检索至关重要。

# 使用分区投影在 Athena 中为使用 Parquet 的 CloudFront 日志创建表
<a name="create-cloudfront-table-partition-parquet"></a>

以下示例 CREATE TABLE 语句将为单个 AWS 区域中截至当前时间的，来自指定 CloudFront 分配的 Parquet 格式的 CloudFront 日志，自动使用分区投影功能。成功运行查询后，您可以查询表。

```
CREATE EXTERNAL TABLE `cloudfront_logs_parquet_pp`(
`date` string, 
`time` string, 
`x_edge_location` string, 
`sc_bytes` string, 
`c_ip` string, 
`cs_method` string, 
`cs_host` string, 
`cs_uri_stem` string, 
`sc_status` string, 
`cs_referer` string, 
`cs_user_agent` string, 
`cs_uri_query` string, 
`cs_cookie` string, 
`x_edge_result_type` string, 
`x_edge_request_id` string, 
`x_host_header` string, 
`cs_protocol` string, 
`cs_bytes` string, 
`time_taken` string, 
`x_forwarded_for` string, 
`ssl_protocol` string, 
`ssl_cipher` string, 
`x_edge_response_result_type` string, 
`cs_protocol_version` string, 
`fle_status` string, 
`fle_encrypted_fields` string, 
`c_port` string, 
`time_to_first_byte` string, 
`x_edge_detailed_result_type` string, 
`sc_content_type` string, 
`sc_content_len` string, 
`sc_range_start` string, 
`sc_range_end` string)
PARTITIONED BY(
 distributionid string,
 year int,
 month int,
 day int,
 hour int )
ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/CloudFront/'
TBLPROPERTIES (
'projection.distributionid.type'='enum',
'projection.distributionid.values'='E3OK0LPUNWWO3',
'projection.day.range'='01,31',
'projection.day.type'='integer',
'projection.day.digits'='2',
'projection.enabled'='true',
'projection.month.range'='01,12',
'projection.month.type'='integer',
'projection.month.digits'='2',
'projection.year.range'='2019,2025',
'projection.year.type'='integer',
'projection.hour.range'='01,12',
'projection.hour.type'='integer',
'projection.hour.digits'='2',
'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/CloudFront/${distributionid}/${year}/${month}/${day}/${hour}/')
```

对于上一示例中使用的属性，应注意以下事项。
+ **表名** – 表名 *`cloudfront_logs_pp`* 是可替换的。您可以将其更改为自己喜欢的任何名称。
+ **位置** – 修改 `s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_ID/` 以指向您的 Amazon S3 存储桶。
+ **分配 ID** – 对于 `projection.distributionid.values`，您可以通过用逗号分隔符来指定多个分配 ID。例如，*<distributionID1>*, *<distributionID2>*。
+ **年份范围**–在 `projection.year.range` 中，您可以根据自己的数据设置年份范围。您可以将其调整为任何期间，例如 *2025*、*2026*。
**注意**  
如果包含空分区，例如针对未来日期的分区（例如：2025-2040），则可能会影响查询性能。但是，分区投影旨在有效处理未来日期。为保持最佳性能，请务必要谨慎管理分区，尽可能避免过多空分区。
+ **存储位置模板** – 您必须确保根据以下 CloudFront 分区结构和 S3 路径正确更新 `storage.location.template`。  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/create-cloudfront-table-partition-parquet.html)

  在确认 CloudFront 分区结构和 S3 结构符合所需模式后，请按如下方式更新 `storage.location.template`：

  ```
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/account_id/CloudFront/${distributionid}/folder2/${year}/${month}/${day}/${hour}/folder3/'
  ```
**注意**  
正确配置 `storage.location.template` 对于确保正确的数据存储和检索至关重要。

# 为 CloudFront 实时日志创建表
<a name="create-cloudfront-table-real-time-logs"></a>

**为 CloudFront 实时日志文件字段创建表**

1. 将以下示例 DDL 语句复制并粘贴到 Athena 控制台的查询编辑器中。该示例语句使用了《Amazon CloudFront Developer Guide》**的 [Real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) 章节中记录的日志文件字段。修改用于存储日志的 Amazon S3 存储桶的 `LOCATION`。有关使用查询编辑器的信息，请参阅 [开始使用](getting-started.md)。

   此查询指定了 `ROW FORMAT DELIMITED` 和 `FIELDS TERMINATED BY '\t'`，表示字段由制表符分隔。对于 `ROW FORMAT DELIMITED`，Athena 默认使用 [LazySimpleSerDe](lazy-simple-serde.md)。列 `timestamp` 使用反引号 (`) 转义，因为它是 Athena 中的保留字。有关信息，请参阅[转义查询中的保留关键字](reserved-words.md)。

   以下示例包含所有可用字段。若有不需要的字段，可将相应字段注释掉或者删除掉。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_real_time_logs ( 
   `timestamp` STRING,
   c_ip STRING,
   time_to_first_byte BIGINT,
   sc_status BIGINT,
   sc_bytes BIGINT,
   cs_method STRING,
   cs_protocol STRING,
   cs_host STRING,
   cs_uri_stem STRING,
   cs_bytes BIGINT,
   x_edge_location STRING,
   x_edge_request_id STRING,
   x_host_header STRING,
   time_taken BIGINT,
   cs_protocol_version STRING,
   c_ip_version STRING,
   cs_user_agent STRING,
   cs_referer STRING,
   cs_cookie STRING,
   cs_uri_query STRING,
   x_edge_response_result_type STRING,
   x_forwarded_for STRING,
   ssl_protocol STRING,
   ssl_cipher STRING,
   x_edge_result_type STRING,
   fle_encrypted_fields STRING,
   fle_status STRING,
   sc_content_type STRING,
   sc_content_len BIGINT,
   sc_range_start STRING,
   sc_range_end STRING,
   c_port BIGINT,
   x_edge_detailed_result_type STRING,
   c_country STRING,
   cs_accept_encoding STRING,
   cs_accept STRING,
   cache_behavior_path_pattern STRING,
   cs_headers STRING,
   cs_header_names STRING,
   cs_headers_count BIGINT,
   primary_distribution_id STRING,
   primary_distribution_dns_name STRING,
   origin_fbl STRING,
   origin_lbl STRING,
   asn STRING
   )
   ROW FORMAT DELIMITED 
   FIELDS TERMINATED BY '\t'
   LOCATION 's3://amzn-s3-demo-bucket/'
   TBLPROPERTIES ( 'skip.header.line.count'='2' )
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `cloudfront_real_time_logs` 表，使其中的数据可以供您发出查询。

# 其他资源
<a name="cloudfront-logs-additional-resources"></a>

有关使用 Athena 查询 CloudFront 日志的详细信息，请参阅 [AWS 大数据博客](https://aws.amazon.com/blogs/big-data/)中的以下文章。

[使用 Amazon Athena 轻松查询 AWS 服务 日志](https://aws.amazon.com/blogs/big-data/easily-query-aws-service-logs-using-amazon-athena/)（2019 年 5 月 29 日）。

[大规模分析 Amazon CloudFront 访问日志](https://aws.amazon.com/blogs/big-data/analyze-your-amazon-cloudfront-access-logs-at-scale/)（2018 年 12 月 21 日）。

[使用 AWS Lambda、Amazon Athena 和适用于 Apache Flink 的亚马逊托管服务构建无服务器架构来分析 Amazon CloudFront 访问日志](https://aws.amazon.com/blogs/big-data/build-a-serverless-architecture-to-analyze-amazon-cloudfront-access-logs-using-aws-lambda-amazon-athena-and-amazon-kinesis-analytics/)（2017 年 5 月 26 日）。

# 查询 AWS CloudTrail日志
<a name="cloudtrail-logs"></a>

AWS CloudTrail 是一项服务，可记录 AWS API 调用和 Amazon Web Services 账户的事件。

CloudTrail 日志包含有关对 AWS 服务 进行的任何 API 调用的详细信息，包括控制台。CloudTrail 生成加密的日志文件并将其存储在 Amazon S3 中。有关更多信息，请参阅《[AWS CloudTrail 用户指南](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-user-guide.html)》。

**注意**  
如果您需要跨账户、区域和日期对 CloudTrail 事件信息执行 SQL 查询，请考虑使用 CloudTrail Lake。CloudTrail Lake 是一种可用于创建跟踪的替代 AWS 服务，可将来自企业的信息聚合到单个可搜索的事件数据存储中。它不使用 Amazon S3 存储桶存储，而是将事件存储在某个数据湖中，从而支持更丰富、更快的查询。您可以使用它创建 SQL 查询，以便在自定义时间范围内跨组织和区域搜索事件。由于 CloudTrail Lake 查询是在 CloudTrail 控制台中执行，因此使用 CloudTrail Lake 不需要 Athena。有关更多信息，请参阅 [CloudTrail Lake](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-lake.html) 文档。

将 Athena 与 CloudTrail 日志结合使用是加强对 AWS 服务 活动进行分析的强有力方法。例如，您可以使用查询来确定趋势，并根据属性 (如源 IP 地址或用户) 进一步隔离活动。

常见的应用程序是使用 CloudTrail 日志分析运营活动的安全性和合规性。有关详细示例的信息，请参阅 AWS 大数据博客文章：[使用 AWS CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动](https://aws.amazon.com/blogs/big-data/aws-cloudtrail-and-amazon-athena-dive-deep-to-analyze-security-compliance-and-operational-activity/)。

您可以使用 Athena 通过指定日志文件的 `LOCATION` 直接从 Amazon S3 查询这些日志文件。您可以通过两种方式之一来执行此操作：
+ 通过直接从 CloudTrail 控制台为 CloudTrail 日志文件创建表。
+ 通过在 Athena 控制台中手动为 CloudTrail 日志文件创建表。

**Topics**
+ [了解 CloudTrail 日志和 Athena 表](create-cloudtrail-table-understanding.md)
+ [使用 CloudTrail 控制台为 CloudTrail 日志创建 Athena 表](create-cloudtrail-table-ct.md)
+ [使用手动分区在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table.md)
+ [使用手动分区为整个组织范围的跟踪创建表](create-cloudtrail-table-org-wide-trail.md)
+ [使用分区投影在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table-partition-projection.md)
+ [CloudTrail 日志查询示例](query-examples-cloudtrail-logs.md)

# 了解 CloudTrail 日志和 Athena 表
<a name="create-cloudtrail-table-understanding"></a>

在开始创建表之前，应了解有关 CloudTrail 以及它如何存储数据的更多信息。这有助于创建所需的表，无论您从 CloudTrail 控制台或从 Athena 创建它们都是如此。

CloudTrail 将日志另存为采用压缩 gzip 格式的 JSON 文本文件（`*.json.gz`）。日志文件的位置取决于您如何设置跟踪、您正在记录的一个 AWS 区域 或多个区域以及其他因素。

有关日志存储位置、JSON 结构和记录文件内容的更多信息，请参阅《[AWS CloudTrail 用户指南](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-user-guide.html)》中的以下主题：
+  [查找 CloudTrail 日志文件](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-find-log-files.html) 
+  [CloudTrail 日志文件示例](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html) 
+  [CloudTrail 记录内容](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-record-contents.html)
+  [CloudTrail 事件参考](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference.html) 

要收集日志并将其保存到 Amazon S3，请从 AWS 管理控制台 启用 CloudTrail。有关更多信息，请参阅《AWS CloudTrail 用户指南**》中的 [创建跟踪记录](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-create-a-trail-using-the-console-first-time.html)。

# 使用 CloudTrail 控制台为 CloudTrail 日志创建 Athena 表
<a name="create-cloudtrail-table-ct"></a>

您可以创建未分区的 Athena 表，来直接从较旧的 CloudTrail 控制台查询 CloudTrail 日志。若要从 CloudTrail 控制台创建 Athena 表，您需要使用具有在 Athena 中创建表的足够权限的角色登录。

**注意**  
您不能使用 CloudTrail 控制台为组织跟踪记录日志创建 Athena 表。相反，请使用 Athena 控制台手动创建表，以便指定正确的存储位置。有关组织跟踪的信息，请参阅《*AWS CloudTrail 用户指南*》中的[为组织创建跟踪记录](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/creating-trail-organization.html)。
+ 有关设置 Athena 权限的信息，请参阅 [设置、管理和编程访问](setting-up.md)。
+ 有关创建带分区的表的信息，请参阅[使用手动分区在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table.md)。

**使用 CloudTrail 控制台为 CloudTrail 跟踪记录创建 Athena 表**

1. 访问 [https://console.aws.amazon.com/cloudtrail/](https://console.aws.amazon.com/cloudtrail/)，打开 CloudTrail 控制台。

1. 在导航窗格中，选择**事件历史记录**。

1. 选择 **Create Athena table**（创建 Athena 表）。  
![\[选择 Create Athena table（创建 Athena 表）\]](http://docs.aws.amazon.com/zh_cn/athena/latest/ug/images/cloudtrail-logs-create-athena-table.png)

1. 对于 **Storage location**（存储位置），使用向下箭头选择其中存储了供跟踪查询的日志文件的 Amazon S3 存储桶。
**注意**  
要查找与跟踪记录关联的存储桶的名称，请选择 CloudTrail 导航窗格中的 **Trails**（跟踪记录），然后查看跟踪记录的 **S3 bucket**（S3 存储桶）列。要查看 Amazon S3 中存储桶的位置，请在 **S3 bucket**（S3 存储桶）列中选择存储桶的链接。这样，就可以将 Amazon S3 控制台打开到 CloudTrail 存储桶位置。

1. 选择**创建表**。将使用包含 Amazon S3 存储桶名称的默认名称创建表。

# 使用手动分区在 Athena 中为 CloudTrail 日志创建表
<a name="create-cloudtrail-table"></a>

可以在 Athena 控制台中为 CloudTrail 日志文件手动创建表，然后在 Athena 中运行查询。

**使用 Athena 控制台为 CloudTrail 跟踪记录创建 Athena 表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台查询编辑器中，然后根据要求进行修改。请注意，由于 CloudTrail 日志文件不是公用 API 调用的有序堆栈跟踪，因此日志文件中的字段不会按照任何特定顺序显示。

   ```
   CREATE EXTERNAL TABLE cloudtrail_logs (
   eventversion STRING,
   useridentity STRUCT<
                  type:STRING,
                  principalid:STRING,
                  arn:STRING,
                  accountid:STRING,
                  invokedby:STRING,
                  accesskeyid:STRING,
                  username:STRING,
                  onbehalfof: STRUCT<
                       userid: STRING,
                       identitystorearn: STRING>,
     sessioncontext:STRUCT<
       attributes:STRUCT<
                  mfaauthenticated:STRING,
                  creationdate:STRING>,
       sessionissuer:STRUCT<  
                  type:STRING,
                  principalid:STRING,
                  arn:STRING, 
                  accountid:STRING,
                  username:STRING>,
       ec2roledelivery:string,
       webidfederationdata: STRUCT<
                  federatedprovider: STRING,
                  attributes: map<string,string>>
     >
   >,
   eventtime STRING,
   eventsource STRING,
   eventname STRING,
   awsregion STRING,
   sourceipaddress STRING,
   useragent STRING,
   errorcode STRING,
   errormessage STRING,
   requestparameters STRING,
   responseelements STRING,
   additionaleventdata STRING,
   requestid STRING,
   eventid STRING,
   resources ARRAY<STRUCT<
                  arn:STRING,
                  accountid:STRING,
                  type:STRING>>,
   eventtype STRING,
   apiversion STRING,
   readonly STRING,
   recipientaccountid STRING,
   serviceeventdetails STRING,
   sharedeventid STRING,
   vpcendpointid STRING,
   vpcendpointaccountid STRING,
   eventcategory STRING,
   addendum STRUCT<
     reason:STRING,
     updatedfields:STRING,
     originalrequestid:STRING,
     originaleventid:STRING>,
   sessioncredentialfromconsole STRING,
   edgedevicedetails STRING,
   tlsdetails STRUCT<
     tlsversion:STRING,
     ciphersuite:STRING,
     clientprovidedhostheader:STRING>
   )
   PARTITIONED BY (region string, year string, month string, day string)
   ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
   STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/Account_ID/';
   ```
**注意**  
建议使用示例中所示的 `org.apache.hive.hcatalog.data.JsonSerDe`。虽然存在 `com.amazon.emr.hive.serde.CloudTrailSerde`，但目前还无法处理一些较新的 CloudTrail 字段。

1. （可选）删除表格中所有非必填字段。如果您只需要读取一组特定的列，则您的表定义可以排除其他列。

1. 修改 `s3://amzn-s3-demo-bucket/AWSLogs/Account_ID/` 以指向包含要查询的日志数据的 Amazon S3 存储桶。该示例使用特定账户的日志的 `LOCATION` 值，但您可以使用最适合您的应用程序的明确度程度。例如：
   + 要分析多个账户中的数据，您可以通过使用 `LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/'`，回滚 `LOCATION` 说明符来指示所有 `AWSLogs`。
   + 要分析特定日期、账户和区域中的数据，请使用 `LOCATION 's3://amzn-s3-demo-bucket/123456789012/CloudTrail/us-east-1/2016/03/14/'.` 
   + 要分析网络活动数据而不是管理事件，请将 `LOCATION` 子句中的 `/CloudTrail/` 替换为 `/CloudTrail-NetworkActivity/`。

   如果使用对象层次结构中的最高级别，则可以在使用 Athena 查询时获得最高程度的灵活度。

1. 验证字段是否正确列出。有关 CloudTrail 记录中的完整字段列表的更多信息，请参阅 [CloudTrail 记录内容](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-record-contents.html)。

   步骤 1 中的示例 `CREATE TABLE` 语句使用 [Hive JSON SerDe](hive-json-serde.md)。在该示例中，字段 `requestparameters`、`responseelements` 和 `additionaleventdata` 作为查询中的 `STRING` 类型列出，但在 JSON 中使用 `STRUCT` 数据类型。因此，要将数据移出这些字段，请使用 `JSON_EXTRACT` 函数。有关更多信息，请参阅 [从字符串中提取 JSON 数据](extracting-data-from-JSON.md)。为了提高性能，此示例按 AWS 区域、年份、月份和日期对数据进行分区。

1. 在 Athena 控制台中运行 `CREATE TABLE` 语句。

1. 使用 [ALTER TABLE ADD PARTITION](alter-table-add-partition.md) 命令加载分区，以便您可以查询它们，如以下示例所示。

   ```
   ALTER TABLE table_name ADD 
      PARTITION (region='us-east-1',
                 year='2019',
                 month='02',
                 day='01')
      LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'
   ```

# 使用手动分区为整个组织范围的跟踪创建表
<a name="create-cloudtrail-table-org-wide-trail"></a>

要在 Athena 中为整个组织的 CloudTrail 日志文件创建表，请按照 [使用手动分区在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table.md) 中的步骤操作，但需要按照以下过程中的说明进行修改。

**为整个组织的 CloudTrail 日志记录创建 Athena 表**

1. 在 `CREATE TABLE` 语句中，修改 `LOCATION` 子句以包含组织 ID，如下例所示：

   ```
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/organization_id/'
   ```

1. 在 `PARTITIONED BY` 子句中，为账户 ID 添加一个字符串条目，如下例所示：

   ```
   PARTITIONED BY (account string, region string, year string, month string, day string)
   ```

   以下示例显示的是综合结果：

   ```
   ...
   
   PARTITIONED BY (account string, region string, year string, month string, day string) 
   ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
   STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/organization_id/Account_ID/CloudTrail/'
   ```

1. `ALTER TABLE` 语句的 `ADD PARTITION` 子句包含账户 ID，如下例所示：

   ```
   ALTER TABLE table_name ADD
   PARTITION (account='111122223333',
   region='us-east-1',
   year='2022',
   month='08',
   day='08')
   ```

1. `ALTER TABLE` 语句的 `LOCATION` 子句包含组织 ID、账户 ID 以及您要添加的分区，如下例所示：

   ```
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/organization_id/Account_ID/CloudTrail/us-east-1/2022/08/08/'
   ```

   以下示例 `ALTER TABLE` 语句显示的是综合结果：

   ```
   ALTER TABLE table_name ADD
   PARTITION (account='111122223333',
   region='us-east-1',
   year='2022',
   month='08',
   day='08')
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/organization_id/111122223333/CloudTrail/us-east-1/2022/08/08/'
   ```

请注意，在大型组织中，使用此方法为每个组织帐户 ID 手动添加和维护分区可能很麻烦。在这种情况下，请考虑使用 CloudTrail Lake 而不是 Athena。在这种情况下，CloudTrail Lake 具有以下优势：
+ 自动聚合整个组织的日志
+ 不需要设置或维护分区或 Athena 表
+ 查询直接在 CloudTrail 控制台中运行
+ 使用与 SQL 兼容的查询语言

有关更多信息，请参阅《*AWS CloudTrail 用户指南*》中的[使用 AWS CloudTrail Lake](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-lake.html)。

# 使用分区投影在 Athena 中为 CloudTrail 日志创建表
<a name="create-cloudtrail-table-partition-projection"></a>

由于 CloudTrail 日志具有一个已知结构，您可以预先指定该结构的分区方案，因此可以使用 Athena 分区投影功能减少查询运行时间并自动管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

以下示例 `CREATE TABLE` 语句会自动在 CloudTrail 日志上从指定日期开始到当前为单个 AWS 区域 使用分区投影。在 `LOCATION` 和 `storage.location.template` 子句中，将*存储桶*、*account-id* 和 *aws-region* 占位符替换为对应的相同值。对于 `projection.timestamp.range`，将 *2020*/*01*/*01* 替换为要使用的开始日期。成功运行查询后，您可以查询表。您无需运行 `ALTER TABLE ADD PARTITION` 来加载分区。

```
CREATE EXTERNAL TABLE cloudtrail_logs_pp(
    eventversion STRING,
    useridentity STRUCT<
        type: STRING,
        principalid: STRING,
        arn: STRING,
        accountid: STRING,
        invokedby: STRING,
        accesskeyid: STRING,
        username: STRING,
        onbehalfof: STRUCT<
             userid: STRING,
             identitystorearn: STRING>,
        sessioncontext: STRUCT<
            attributes: STRUCT<
                mfaauthenticated: STRING,
                creationdate: STRING>,
            sessionissuer: STRUCT<
                type: STRING,
                principalid: STRING,
                arn: STRING,
                accountid: STRING,
                username: STRING>,
            ec2roledelivery:string,
            webidfederationdata: STRUCT<
                federatedprovider: STRING,
                attributes: map<string,string>>
        >
    >,
    eventtime STRING,
    eventsource STRING,
    eventname STRING,
    awsregion STRING,
    sourceipaddress STRING,
    useragent STRING,
    errorcode STRING,
    errormessage STRING,
    requestparameters STRING,
    responseelements STRING,
    additionaleventdata STRING,
    requestid STRING,
    eventid STRING,
    readonly STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountid: STRING,
        type: STRING>>,
    eventtype STRING,
    apiversion STRING,
    recipientaccountid STRING,
    serviceeventdetails STRING,
    sharedeventid STRING,
    vpcendpointid STRING,
    vpcendpointaccountid STRING,
    eventcategory STRING,
    addendum STRUCT<
      reason:STRING,
      updatedfields:STRING,
      originalrequestid:STRING,
      originaleventid:STRING>,
    sessioncredentialfromconsole STRING,
    edgedevicedetails STRING,
    tlsdetails STRUCT<
      tlsversion:STRING,
      ciphersuite:STRING,
      clientprovidedhostheader:STRING>
  )
PARTITIONED BY (
   `timestamp` string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/account-id/CloudTrail/aws-region'
TBLPROPERTIES (
  'projection.enabled'='true', 
  'projection.timestamp.format'='yyyy/MM/dd', 
  'projection.timestamp.interval'='1', 
  'projection.timestamp.interval.unit'='DAYS', 
  'projection.timestamp.range'='2020/01/01,NOW', 
  'projection.timestamp.type'='date', 
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')
```

更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

# CloudTrail 日志查询示例
<a name="query-examples-cloudtrail-logs"></a>

以下示例显示从在 CloudTrail 事件日志创建的表，返回所有匿名（未签名）请求的查询部分。此查询选择 `useridentity.accountid` 匿名并且 `useridentity.arn` 未指定的那些请求：

```
SELECT *
FROM cloudtrail_logs
WHERE 
    eventsource = 's3.amazonaws.com' AND 
    eventname in ('GetObject') AND 
    useridentity.accountid = 'anonymous' AND 
    useridentity.arn IS NULL AND
    requestparameters LIKE '%[your bucket name ]%';
```

有关更多信息，请参阅 AWS 大数据博客文章：[使用 AWS CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动](https://aws.amazon.com/blogs/big-data/aws-cloudtrail-and-amazon-athena-dive-deep-to-analyze-security-compliance-and-operational-activity/)。

## 查询 CloudTrail 日志中的嵌套字段
<a name="cloudtrail-logs-nested-fields"></a>

由于 `userIdentity` 和 `resources` 字段是嵌套的数据类型，查询这些内容需要特殊处理。

`userIdentity` 对象由嵌套 `STRUCT` 类型组成。可以使用点分隔字段以分隔待查询的字段，如下例所示：

```
SELECT 
    eventsource, 
    eventname,
    useridentity.sessioncontext.attributes.creationdate,
    useridentity.sessioncontext.sessionissuer.arn
FROM cloudtrail_logs
WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL
ORDER BY eventsource, eventname
LIMIT 10
```

`resources` 字段是一个 `STRUCT` 对象数组。对于这些数组，请使用 `CROSS JOIN UNNEST` 来取消嵌套数组，以便您可以查询其对象。

下面的示例将返回资源 ARN 以 `example/datafile.txt` 结尾的所有行。为了便于读取，[replace](https://prestodb.io/docs/current/functions/string.html#replace) 函数将从 ARN 中删除初始 `arn:aws:s3:::` 子字符串。

```
SELECT 
    awsregion,
    replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource,
    eventname,
    eventtime,
    useragent
FROM cloudtrail_logs t
CROSS JOIN UNNEST(t.resources) unnested (resources_entry)
WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt'
ORDER BY eventtime
```

以下是 `DeleteBucket` 事件的示例查询。查询将从 `resources` 对象中提取存储桶的名称以及存储桶所属的账户 ID。

```
SELECT 
    awsregion,
    replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket,
    eventtime AS time_deleted,
    useridentity.username, 
    unnested.resources_entry.accountid as bucket_acct_id 
FROM cloudtrail_logs t
CROSS JOIN UNNEST(t.resources) unnested (resources_entry)
WHERE eventname = 'DeleteBucket'
ORDER BY eventtime
```

有关取消嵌套的更多信息，请参阅 [筛选数组](filtering-arrays.md)。

## 有关查询 CloudTrail 日志的提示
<a name="tips-for-querying-cloudtrail-logs"></a>

在探索 CloudTrail 日志数据时考虑以下各项：
+ 在查询日志之前，请验证您的日志表是否看似与[使用手动分区在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table.md)中的表一样。如果不是第一个表，请使用以下命令删除现有表：`DROP TABLE cloudtrail_logs`。
+ 删除现有表后，重新创建它。有关更多信息，请参阅 [使用手动分区在 Athena 中为 CloudTrail 日志创建表](create-cloudtrail-table.md)。

  确认正确列出了 Athena 查询中的字段。有关 CloudTrail 记录中的完整字段列表的信息，请参阅 [CloudTrail 记录内容](https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-record-contents.html)。

  如果您的查询包含 JSON 格式的字段，例如 `STRUCT`，请从 JSON 中提取数据。有关更多信息，请参阅 [从字符串中提取 JSON 数据](extracting-data-from-JSON.md)。

  关于针对 CloudTrail 表发布查询的一些建议如下：
+ 首先查看哪些用户调用了哪些 API 操作以及来自哪些源 IP 地址。
+ 将以下基本 SQL 查询用作您的模板。将查询粘贴到 Athena 控制台并运行它。

  ```
  SELECT
   useridentity.arn,
   eventname,
   sourceipaddress,
   eventtime
  FROM cloudtrail_logs
  LIMIT 100;
  ```
+ 修改查询以进一步探索您的数据。
+ 为了提高性能，请包含 `LIMIT` 子句以返回指定的行子集。

# 查询 Amazon EMR 日志
<a name="emr-logs"></a>

Amazon EMR 和在 Amazon EMR 上运行的大数据应用程序会生成日志文件。日志文件写入[主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-master-core-task-nodes.html)，您还可以配置 Amazon EMR 将日志文件自动归档到 Amazon S3。您可以使用 Amazon Athena 查询这些日志，以确定应用程序和集群的事件和趋势。有关 Amazon EMR 中日志文件类型以及将其保存到 Simple Storage Service (Amazon S3) 的更多信息，请参阅《Amazon EMR 管理指南**》中的[查看日志文件](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html)。

**Topics**
+ [基于 Amazon EMR 日志文件创建和查询基本表](emr-create-table.md)
+ [基于 Amazon EMR 日志创建和查询分区表](emr-create-table-partitioned.md)

# 基于 Amazon EMR 日志文件创建和查询基本表
<a name="emr-create-table"></a>

以下示例基于保存到的 `s3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/elasticmapreduce/` 日志文件创建基本表 `myemrlogs`。以下示例中使用的 Amazon S3 位置反映了 Amazon Web Services 账户 *123456789012* 在区域 *us-west-2* 中创建的 EMR 集群的默认日志位置的模式。如果使用自定义位置，则模式为 s3://amzn-s3-demo-bucket/*ClusterID*。

有关创建分区表以潜在改善查询性能和减少数据传输的信息，请参阅 [基于 Amazon EMR 日志创建和查询分区表](emr-create-table-partitioned.md)。

```
CREATE EXTERNAL TABLE `myemrlogs`(
  `data` string COMMENT 'from deserializer')
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6'
```

## 示例查询
<a name="emr-example-queries-basic"></a>

以下示例查询可以在上一示例创建的 `myemrlogs` 表上运行。

**Example – 查询步骤日志以查找出现的 ERROR、WARN、INFO、EXCEPTION、FATAL 或 DEBUG**  

```
SELECT data,
        "$PATH"
FROM "default"."myemrlogs"
WHERE regexp_like("$PATH",'s-86URH188Z6B1')
        AND regexp_like(data, 'ERROR|WARN|INFO|EXCEPTION|FATAL|DEBUG') limit 100;
```

**Example – 查询特定实例日志 i-00b3c0a839ece0a9c 以查找 ERROR、WARN、INFO、EXCEPTION、FATAL 或 DEBUG**  

```
SELECT "data",
        "$PATH" AS filepath
FROM "default"."myemrlogs"
WHERE regexp_like("$PATH",'i-00b3c0a839ece0a9c')
        AND regexp_like("$PATH",'state')
        AND regexp_like(data, 'ERROR|WARN|INFO|EXCEPTION|FATAL|DEBUG') limit 100;
```

**Example – 查询 Presto 应用程序日志以查找 ERROR、WARN、INFO、EXCEPTION、FATAL 或 DEBUG**  

```
SELECT "data",
        "$PATH" AS filepath
FROM "default"."myemrlogs"
WHERE regexp_like("$PATH",'presto')
        AND regexp_like(data, 'ERROR|WARN|INFO|EXCEPTION|FATAL|DEBUG') limit 100;
```

**Example – 查询 Namenode 应用程序日志以查找 ERROR、WARN、INFO、EXCEPTION、FATAL 或 DEBUG**  

```
SELECT "data",
        "$PATH" AS filepath
FROM "default"."myemrlogs"
WHERE regexp_like("$PATH",'namenode')
        AND regexp_like(data, 'ERROR|WARN|INFO|EXCEPTION|FATAL|DEBUG') limit 100;
```

**Example – 按日期和小时查询所有日志以查找 ERROR、WARN、INFO、EXCEPTION、FATAL 或 DEBUG**  

```
SELECT distinct("$PATH") AS filepath
FROM "default"."myemrlogs"
WHERE regexp_like("$PATH",'2019-07-23-10')
        AND regexp_like(data, 'ERROR|WARN|INFO|EXCEPTION|FATAL|DEBUG') limit 100;
```

# 基于 Amazon EMR 日志创建和查询分区表
<a name="emr-create-table-partitioned"></a>

这些示例使用相同的日志位置创建 Athena 表，但对表进行了分区，然后为每个日志位置创建一个分区。有关更多信息，请参阅 [对您的数据进行分区](partitions.md)。

以下查询将创建名为 `mypartitionedemrlogs` 的分区表：

```
CREATE EXTERNAL TABLE `mypartitionedemrlogs`(
  `data` string COMMENT 'from deserializer')
 partitioned by (logtype string)
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6'
```

然后，以下查询语句基于 Amazon EMR 在 Amazon S3: 中创建的不同日志类型的子目录创建表分区：

```
ALTER TABLE mypartitionedemrlogs ADD
     PARTITION (logtype='containers')
     LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/containers/'
```

```
ALTER TABLE mypartitionedemrlogs ADD
     PARTITION (logtype='hadoop-mapreduce')
     LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/hadoop-mapreduce/'
```

```
ALTER TABLE mypartitionedemrlogs ADD
     PARTITION (logtype='hadoop-state-pusher')
     LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/hadoop-state-pusher/'
```

```
ALTER TABLE mypartitionedemrlogs ADD
     PARTITION (logtype='node')
     LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/node/'
```

```
ALTER TABLE mypartitionedemrlogs ADD
     PARTITION (logtype='steps')
     LOCATION 's3://aws-logs-123456789012-us-west-2/elasticmapreduce/j-2ABCDE34F5GH6/steps/'
```

创建分区后，您可以在表上运行 `SHOW PARTITIONS` 查询以确认：

```
SHOW PARTITIONS mypartitionedemrlogs;
```

## 示例查询
<a name="emr-example-queries-partitioned"></a>

以下示例演示对特定日志条目的查询使用由上述示例创建的表和分区。

**Example – 查询容器分区中的应用程序 application\$11561661818238\$10002 日志以查找 ERROR 或 WARN**  

```
SELECT data,
        "$PATH"
FROM "default"."mypartitionedemrlogs"
WHERE logtype='containers'
        AND regexp_like("$PATH",'application_1561661818238_0002')
        AND regexp_like(data, 'ERROR|WARN') limit 100;
```

**Example – 查询 hadoop-Mapreduce 分区以查找任务 job\$11561661818238\$10004 和失败的减少操作**  

```
SELECT data,
        "$PATH"
FROM "default"."mypartitionedemrlogs"
WHERE logtype='hadoop-mapreduce'
        AND regexp_like(data,'job_1561661818238_0004|Failed Reduces') limit 100;
```

**Example – 查询节点分区中的 Hive 日志以查找查询 ID 056e0609-33e1-4611-956c-7a31b42d2663**  

```
SELECT data,
        "$PATH"
FROM "default"."mypartitionedemrlogs"
WHERE logtype='node'
        AND regexp_like("$PATH",'hive')
        AND regexp_like(data,'056e0609-33e1-4611-956c-7a31b42d2663') limit 100;
```

**Example – 查询节点分区中的 resourcemanager 日志以查找应用程序 1567660019320\$10001\$101\$1000001**  

```
SELECT data,
        "$PATH"
FROM "default"."mypartitionedemrlogs"
WHERE logtype='node'
        AND regexp_like(data,'resourcemanager')
        AND regexp_like(data,'1567660019320_0001_01_000001') limit 100
```

# 查询 AWS Global Accelerator 流日志
<a name="querying-global-accelerator-flow-logs"></a>

可以使用 AWS Global Accelerator 创建加速器，从而将网络流量定向到 AWS 全局网络上的最佳终端节点。有关 Global Accelerator 的更多信息，请参阅[什么是 AWS Global Accelerator](https://docs.aws.amazon.com/global-accelerator/latest/dg/what-is-global-accelerator.html)。

Global Accelerator 流日志允许您捕获有关进出加速器中的网络接口的 IP 地址流量的信息。流日志数据将发布到 Amazon S3，您可以在其中检索和查看您的数据。有关更多信息，请参阅 [AWS Global Accelerator 中的流日志](https://docs.aws.amazon.com/global-accelerator/latest/dg/monitoring-global-accelerator.flow-logs.html)。

可以使用 Athena 查询 Global Accelerator 流日志，方式是创建一个指定流日志在 Amazon S3 中的位置的表。

**为 Global Accelerator 流日志创建表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。此查询指定 *ROW FORMAT DELIMITED* 并且不再指定 [SerDe](serde-reference.md)，这意味着查询将使用 [`LazySimpleSerDe`](lazy-simple-serde.md)。在此查询中，字段由一个空格终止。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS aga_flow_logs (
     version string,
     account string,
     acceleratorid string,
     clientip string,
     clientport int,
     gip string,
     gipport int,
     endpointip string,
     endpointport int,
     protocol string,
     ipaddresstype string,
     numpackets bigint,
     numbytes int,
     starttime int,
     endtime int,
     action string,
     logstatus string,
     agasourceip string,
     agasourceport int,
     endpointregion string,
     agaregion string,
     direction string,
     vpc_id string,
     reject_reason string
   )
   PARTITIONED BY (dt string)
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ' '
   LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/account_id/globalaccelerator/region/'
   TBLPROPERTIES ("skip.header.line.count"="1");
   ```

1. 修改 `LOCATION` 值以指向包含您的日志数据的 Amazon S3 存储桶。

   ```
   's3://amzn-s3-demo-bucket/prefix/AWSLogs/account_id/globalaccelerator/region_code/'
   ```

1. 在 Athena 控制台中运行查询。查询完成后，Athena 注册 `aga_flow_logs` 表，使其中的数据可用于查询。

1. 创建分区可读取数据，如以下示例查询中所示。此查询创建指定日期的单个分区。替换日期和位置的占位符。

   ```
   ALTER TABLE aga_flow_logs
   ADD PARTITION (dt='YYYY-MM-dd')
   LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/account_id/globalaccelerator/region_code/YYYY/MM/dd';
   ```

## AWS Global Accelerator 流日志的示例查询
<a name="querying-global-accelerator-flow-logs-examples"></a>

**Example – 列出通过特定边缘站点的请求**  
以下示例查询列出了已通过 LHR 边缘站点的请求。使用 `LIMIT` 运算符可限制一次查询的日志数目。  

```
SELECT 
  clientip, 
  agaregion, 
  protocol, 
  action 
FROM 
  aga_flow_logs 
WHERE 
  agaregion LIKE 'LHR%' 
LIMIT 
  100;
```

**Example – 列出接收大多数 HTTPS 请求的端点 IP 地址**  
要查看哪些终端节点 IP 地址收到的 HTTPS 请求最多，请使用以下查询。此查询计算在 HTTPS 端口 443 上接收的数据包数，按目标 IP 地址对它们进行分组，并返回前 10 个 IP 地址。  

```
SELECT 
  SUM(numpackets) AS packetcount, 
  endpointip 
FROM 
  aga_flow_logs 
WHERE 
  endpointport = 443 
GROUP BY 
  endpointip 
ORDER BY 
  packetcount DESC 
LIMIT 
  10;
```

# 查询 Amazon GuardDuty 调查发现
<a name="querying-guardduty"></a>

[Amazon GuardDuty](https://aws.amazon.com/guardduty/) 是一项安全监控服务，用于帮助识别您 AWS 环境中的意外活动和潜在的未经授权或恶意活动。当检测到意外和潜在恶意活动时，GuardDuty 会生成安全[调查结果](https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings.html)，您可以将其导出到 Amazon S3 进行存储和分析。将调查结果导出到 Amazon S3 后，您可以使用 Athena 进行查询。本文介绍如何在 Athena 中为 GuardDuty 调查结果创建表并查询它们。

有关 Amazon GuardDuty 更多信息，请参阅《[Amazon GuardDuty 用户指南](https://docs.aws.amazon.com/guardduty/latest/ug/)》。

## 先决条件
<a name="querying-guardduty-prerequisites"></a>
+ 启用 GuardDuty 功能以将调查结果导出到 Amazon S3。有关步骤，请参阅《Amazon GuardDuty 用户指南》中[导出调查结果](https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_exportfindings.html)。

## 在 Athena 中为 GuardDuty 调查发现创建表
<a name="querying-guardduty-creating-a-table-in-athena-for-guardduty-findings"></a>

要从 Athena 查询您的 GuardDuty 调查结果，您必须为它们创建一个表。

**要在 Athena 中为 GuardDuty 调查结果创建表**

1. 从 [https://console.aws.amazon.com/athena/](https://console.aws.amazon.com/athena/home) 打开 Athena 控制台。

1. 将以下 DDL 语句粘贴到 Athena 控制台中。修改 `LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/account-id/GuardDuty/'` 中的值以指向您在 Amazon S3 中的 GuardDuty 调查结果。

   ```
   CREATE EXTERNAL TABLE `gd_logs` (
     `schemaversion` string,
     `accountid` string,
     `region` string,
     `partition` string,
     `id` string,
     `arn` string,
     `type` string,
     `resource` string,
     `service` string,
     `severity` string,
     `createdat` string,
     `updatedat` string,
     `title` string,
     `description` string)
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/account-id/GuardDuty/'
    TBLPROPERTIES ('has_encrypted_data'='true')
   ```
**注意**  
SerDe 期望每个 JSON 文档都位于单行文本中，并且不使用行终止字符分隔记录中的字段。如果 JSON 文本采用美观的打印格式，当您在创建表后尝试对其进行查询时，可能会收到类似以下内容的错误消息：HIVE\$1CURSOR\$1ERROR: Row is not a valid JSON Object（HIVE\$1CURSOR\$1ERROR：行不是有效的 JSON 对象）或 HIVE\$1CURSOR\$1ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT（HIVE\$1CURSOR\$1ERROR：JsonParseException：意外的输入结束：对象的预期关闭标记）。有关更多信息，请参阅 GitHub 上 OpenX SerDe 文档中的 [JSON 数据文件](https://github.com/rcongiu/Hive-JSON-Serde#json-data-files)。

1. 在 Athena 控制台中运行查询以注册 `gd_logs` 表。查询完成后，调查结果准备就绪，可供您从 Athena 查询。

## 示例查询
<a name="querying-guardduty-examples"></a>

以下示例演示了如何从 Athena 查询 GuardDuty 调查结果。

**Example – DNS 数据泄露**  
以下查询返回可能会通过 DNS 查询泄露数据的 Amazon EC2 实例的相关信息。  

```
SELECT
    title,
    severity,
    type,
    id AS FindingID,
    accountid,
    region,
    createdat,
    updatedat,
    json_extract_scalar(service, '$.count') AS Count,
    json_extract_scalar(resource, '$.instancedetails.instanceid') AS InstanceID,
    json_extract_scalar(service, '$.action.actiontype') AS DNS_ActionType,
    json_extract_scalar(service, '$.action.dnsrequestaction.domain') AS DomainName,
    json_extract_scalar(service, '$.action.dnsrequestaction.protocol') AS protocol,
    json_extract_scalar(service, '$.action.dnsrequestaction.blocked') AS blocked
FROM gd_logs
WHERE type = 'Trojan:EC2/DNSDataExfiltration'
ORDER BY severity DESC
```

**Example – 未授权的 IAM 用户访问**  
以下查询返回所有区域中 IAM 委托人的所有 `UnauthorizedAccess:IAMUser` 调查结果类型。  

```
SELECT title,
         severity,
         type,
         id,
         accountid,
         region,
         createdat,
         updatedat,
         json_extract_scalar(service, '$.count') AS Count, 
         json_extract_scalar(resource, '$.accesskeydetails.username') AS IAMPrincipal, 
         json_extract_scalar(service,'$.action.awsapicallaction.api') AS APIActionCalled
FROM gd_logs
WHERE type LIKE '%UnauthorizedAccess:IAMUser%' 
ORDER BY severity desc;
```

## 有关查询 GuardDuty 调查结果的提示
<a name="querying-guardduty-tips"></a>

创建查询时，请记住以下几点。
+ 要从嵌套 JSON 字段中提取数据，请使用 Presto `json_extract` 或 `json_extract_scalar` 函数。有关更多信息，请参阅 [从字符串中提取 JSON 数据](extracting-data-from-JSON.md)。
+ 请确保 JSON 字段中的所有字符均为小写字符。
+  有关下载查询结果的信息，请参阅[使用 Athena 控制台下载查询结果文件](saving-query-results.md)。

# 查询 AWS Network Firewall日志
<a name="querying-network-firewall-logs"></a>

AWS Network Firewall 是一种托管式服务，您可以使用它为您的 Amazon Virtual Private Cloud 实例部署必要的网络保护。AWS Network Firewall 与 AWS Firewall Manager 结合使用，以便您可以基于 AWS Network Firewall 规则构建策略，然后在您的 VPC 和账户中集中应用这些策略。有关 AWS Network Firewall 的更多信息，请参阅 [AWS Network Firewall](https://aws.amazon.com/network-firewall/)。

您可以为您转发到防火墙有状态规则引擎的流量配置 AWS Network Firewall 日志记录。日志记录为您提供有关网络流量的详细信息，包括有状态引擎接收数据包的时间、有关数据包的详细信息以及针对数据包采取的任何有状态规则操作。日志将发布到您配置的日志目标，您可以在其中检索和查看日志。有关更多信息，请参阅《*AWS Network Firewall 开发人员指南*》中的[录入来自 AWS Network Firewall 的网络流量](https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-logging.html)。

**Topics**
+ [为警报日志创建和查询表](querying-network-firewall-logs-sample-alert-logs-table.md)
+ [为 netflow 日志创建和查询表](querying-network-firewall-logs-sample-netflow-logs-table.md)

# 为警报日志创建和查询表
<a name="querying-network-firewall-logs-sample-alert-logs-table"></a>

1. 修改下面的 DDL 语句示例，使其符合警报日志的结构。您可能需要更新语句以包含最新版本日志的列。有关更多信息，请参阅《*AWS Network Firewall 开发人员指南*》中的[防火墙日志的内容](https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-logging.html#firewall-logging-contents)。

   ```
   CREATE EXTERNAL TABLE network_firewall_alert_logs (
     firewall_name string,
     availability_zone string,
     event_timestamp string,
     event struct<
       timestamp:string,
       flow_id:bigint,
       event_type:string,
       src_ip:string,
       src_port:int,
       dest_ip:string,
       dest_port:int,
       proto:string,
       app_proto:string,
       sni:string,
       tls_inspected:boolean,
       tls_error:struct<
         error_message:string>,
       revocation_check:struct<
         leaf_cert_fpr:string,
         status:string,
         action:string>,
       alert:struct<
         alert_id:string,
         alert_type:string,
         action:string,
         signature_id:int,
         rev:int,
         signature:string,
         category:string,
         severity:int,
         rule_name:string,
         alert_name:string,
         alert_severity:string,
         alert_description:string,
         file_name:string,
         file_hash:string,
         packet_capture:string,
         reference_links:array<string>
       >, 
       src_country:string, 
       dest_country:string, 
       src_hostname:string, 
       dest_hostname:string, 
       user_agent:string, 
       url:string
      >
   )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    LOCATION 's3://amzn-s3-demo-bucket/path_to_alert_logs_folder/';
   ```

1. 修改 `LOCATION` 子句以指定您在 Amazon S3 中的日志文件夹。

1. 在 Athena 查询编辑器中运行 `CREATE TABLE` 查询。查询完成后，Athena 将注册 `network_firewall_alert_logs` 表，使其指向的数据可以进行查询。

## 示例查询
<a name="querying-network-firewall-logs-alert-log-sample-query"></a>

本节中的示例警报日志查询会筛选执行了 TLS 检查且警报严重性级别为 2 或更高的事件。

查询使用别名来创建输出列标题，以显示列所属的 `struct`。例如，`event.alert.category` 字段的列标题是 `event_alert_category`，而不只是 `category`。若要进一步自定义列名，可以根据自己的喜好修改别名。例如，可以使用下划线或其他分隔符来分隔 `struct` 名称和字段名称。

请记得根据表定义和查询结果中所需的字段修改列名和 `struct` 引用。

```
SELECT 
  firewall_name,
  availability_zone,
  event_timestamp,
  event.timestamp AS event_timestamp,
  event.flow_id AS event_flow_id,
  event.event_type AS event_type,
  event.src_ip AS event_src_ip,
  event.src_port AS event_src_port,
  event.dest_ip AS event_dest_ip,
  event.dest_port AS event_dest_port,
  event.proto AS event_protol,
  event.app_proto AS event_app_proto,
  event.sni AS event_sni,
  event.tls_inspected AS event_tls_inspected,
  event.tls_error.error_message AS event_tls_error_message,
  event.revocation_check.leaf_cert_fpr AS event_revocation_leaf_cert,
  event.revocation_check.status AS event_revocation_check_status,
  event.revocation_check.action AS event_revocation_check_action,
  event.alert.alert_id AS event_alert_alert_id,
  event.alert.alert_type AS event_alert_alert_type,
  event.alert.action AS event_alert_action,
  event.alert.signature_id AS event_alert_signature_id,
  event.alert.rev AS event_alert_rev,
  event.alert.signature AS event_alert_signature,
  event.alert.category AS event_alert_category,
  event.alert.severity AS event_alert_severity,
  event.alert.rule_name AS event_alert_rule_name,
  event.alert.alert_name AS event_alert_alert_name,
  event.alert.alert_severity AS event_alert_alert_severity,
  event.alert.alert_description AS event_alert_alert_description,
  event.alert.file_name AS event_alert_file_name,
  event.alert.file_hash AS event_alert_file_hash,
  event.alert.packet_capture AS event_alert_packet_capture,
  event.alert.reference_links AS event_alert_reference_links,
  event.src_country AS event_src_country,
  event.dest_country AS event_dest_country,
  event.src_hostname AS event_src_hostname,
  event.dest_hostname AS event_dest_hostname,
  event.user_agent AS event_user_agent,
  event.url AS event_url
FROM 
  network_firewall_alert_logs 
WHERE 
  event.alert.severity >= 2
  AND event.tls_inspected = true 
LIMIT 10;
```

# 为 netflow 日志创建和查询表
<a name="querying-network-firewall-logs-sample-netflow-logs-table"></a>

1. 修改下面的 DDL 语句示例，使其符合 netflow 日志的结构。您可能需要更新语句以包含最新版本日志的列。有关更多信息，请参阅《*AWS Network Firewall 开发人员指南*》中的[防火墙日志的内容](https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-logging.html#firewall-logging-contents)。

   ```
   CREATE EXTERNAL TABLE network_firewall_netflow_logs (
     firewall_name string,
     availability_zone string,
     event_timestamp string,
     event struct<
       timestamp:string,
       flow_id:bigint,
       event_type:string,
       src_ip:string,
       src_port:int,
       dest_ip:string,
       dest_port:int,
       proto:string,
       app_proto:string,
       tls_inspected:boolean,
       netflow:struct<
         pkts:int,
         bytes:bigint,
         start:string,
         `end`:string,
         age:int,
         min_ttl:int,
         max_ttl:int,
         tcp_flags:struct<
           syn:boolean,
           fin:boolean,
           rst:boolean,
           psh:boolean,
           ack:boolean,
           urg:boolean
           >
         >
       >
   )
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
   LOCATION 's3://amzn-s3-demo-bucket/path_to_netflow_logs_folder/';
   ```

1. 修改 `LOCATION` 子句以指定您在 Amazon S3 中的日志文件夹。

1. 在 Athena 查询编辑器中运行 `CREATE TABLE` 查询。查询完成后，Athena 将注册 `network_firewall_netflow_logs` 表，使其指向的数据可以进行查询。

## 示例查询
<a name="querying-network-firewall-logs-netflow-log-sample-query"></a>

本节中的 netflow 日志查询示例会筛选执行了 TLS 检查的事件。

查询使用别名来创建输出列标题，以显示列所属的 `struct`。例如，`event.netflow.bytes` 字段的列标题是 `event_netflow_bytes`，而不只是 `bytes`。若要进一步自定义列名，可以根据自己的喜好修改别名。例如，可以使用下划线或其他分隔符来分隔 `struct` 名称和字段名称。

请记得根据表定义和查询结果中所需的字段修改列名和 `struct` 引用。

```
SELECT
  event.src_ip AS event_src_ip,
  event.dest_ip AS event_dest_ip,
  event.proto AS event_proto,
  event.app_proto AS event_app_proto,
  event.tls_inspected AS event_tls_inspected,
  event.netflow.pkts AS event_netflow_pkts,
  event.netflow.bytes AS event_netflow_bytes,
  event.netflow.tcp_flags.syn AS event_netflow_tcp_flags_syn 
FROM network_firewall_netflow_logs 
WHERE event.tls_inspected = true
```

# 查询网络负载均衡器日志
<a name="networkloadbalancer-classic-logs"></a>

使用 Athena 分析和处理来自 Network Load Balancer 的日志。这些日志将接收有关发送到 Network Load Balancer 的传输层安全性 (TLS) 请求的详细信息。您可以使用这些访问日志分析流量模式并解决问题。

在分析 Network Load Balancer 访问日志之前，请启用并配置它们以保存在目标 Amazon S3 存储桶中。有关更多信息以及有关每个网络负载均衡器访问日志条目的信息，请参阅[网络负载均衡器的访问日志](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/load-balancer-access-logs.html)。

**为 Network Load Balancer 日志创建表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。检查 Network Load Balancer 日志记录的[语法](https://docs.aws.amazon.com/elasticloadbalancing/latest/network/load-balancer-access-logs.html#access-log-file-format)。根据需要更新语句，以包含与日志记录对应的列和正则表达式。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS nlb_tls_logs (
               type string,
               version string,
               time string,
               elb string,
               listener_id string,
               client_ip string,
               client_port int,
               target_ip string,
               target_port int,
               tcp_connection_time_ms double,
               tls_handshake_time_ms double,
               received_bytes bigint,
               sent_bytes bigint,
               incoming_tls_alert int,
               cert_arn string,
               certificate_serial string,
               tls_cipher_suite string,
               tls_protocol_version string,
               tls_named_group string,
               domain_name string,
               alpn_fe_protocol string,
               alpn_be_protocol string,
               alpn_client_preference_list string,
               tls_connection_creation_time string
               )
               ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
               WITH SERDEPROPERTIES (
               'serialization.format' = '1',
               'input.regex' = 
               '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-0-9]*) ([-0-9]*) ([-0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ?([^ ]*)?( .*)?'
               )
               LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/AWS_account_ID/elasticloadbalancing/region';
   ```

1. 修改 `LOCATION` Amazon S3 存储桶以指定您的 Network Load Balancer 日志的目标。

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `nlb_tls_logs` 表，使其中的数据可以供查询。

## 示例查询
<a name="query-nlb-example"></a>

要查看证书的使用次数，请使用与以下示例类似的查询：

```
SELECT count(*) AS 
         ct,
         cert_arn
FROM "nlb_tls_logs"
GROUP BY  cert_arn;
```

以下查询显示了所用 TLS 版本低于 1.3 的用户数：

```
SELECT tls_protocol_version,
         COUNT(tls_protocol_version) AS 
         num_connections,
         client_ip
FROM "nlb_tls_logs"
WHERE tls_protocol_version < 'tlsv13'
GROUP BY tls_protocol_version, client_ip;
```

使用以下查询可确定需要较长的 TLS 握手时间的连接：

```
SELECT *
FROM "nlb_tls_logs"
ORDER BY  tls_handshake_time_ms DESC 
LIMIT 10;
```

使用以下查询可识别和统计过去 30 天内协商的 TLS 协议版本和密码套件。

```
SELECT tls_cipher_suite,
         tls_protocol_version,
         COUNT(*) AS ct
FROM "nlb_tls_logs"
WHERE from_iso8601_timestamp(time) > current_timestamp - interval '30' day
        AND NOT tls_protocol_version = '-'
GROUP BY tls_cipher_suite, tls_protocol_version
ORDER BY ct DESC;
```

# 查询 Amazon Route 53 Resolver 查询日志
<a name="querying-r53-resolver-logs"></a>

您可以为 Amazon Route 53 Resolver 查询日志创建 Athena 表，并从 Athena 查询这些日志。

Route 53 解析程序查询日志记录用于记录 VPC 中的资源提出的 DNS 查询、使用入站解析程序端点的本地部署资源、使用出站解析程序端点进行递归 DNS 解析的查询以及使用 Route 53 解析程序 DNS 防火墙规则阻止、允许或监控域列表的查询。有关解析程序查询日志记录的更多信息，请参阅《*Amazon Route 53 开发人员指南*》中的[解析程序查询日志记录](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver-query-logs.html)。有关日志中每个字段的信息，请参阅《**Amazon Route 53 开发人员指南》中的 [显示在解析程序查询日志中的值](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resolver-query-logs-format.html)。

**Topics**
+ [为解析程序查询日志创建表](querying-r53-resolver-logs-creating-the-table.md)
+ [使用分区投影](querying-r53-resolver-logs-partitioning-example.md)
+ [示例查询](querying-r53-resolver-logs-example-queries.md)

# 为解析程序查询日志创建表
<a name="querying-r53-resolver-logs-creating-the-table"></a>

您可以使用 Athena 控制台中的查询编辑器为 Route 53 解析程序查询日志创建和查询表。

**为 Route 53 解析程序查询日志创建和查询 Athena 表**

1. 从 [https://console.aws.amazon.com/athena/](https://console.aws.amazon.com/athena/home) 打开 Athena 控制台。

1. 在 Athena 查询编辑器中，输入以下 `CREATE TABLE` 语句。将 `LOCATION` 子句值替换为与您的解析程序日志在 Amazon S3 中的位置相对应的子句值。

   ```
   CREATE EXTERNAL TABLE r53_rlogs (
     version string,
     account_id string,
     region string,
     vpc_id string,
     query_timestamp string,
     query_name string,
     query_type string,
     query_class
       string,
     rcode string,
     answers array<
       struct<
         Rdata: string,
         Type: string,
         Class: string>
       >,
     srcaddr string,
     srcport int,
     transport string,
     srcids struct<
       instance: string,
       resolver_endpoint: string
       >,
     firewall_rule_action string,
     firewall_rule_group_id string,
     firewall_domain_list_id string
    )
        
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
   LOCATION 's3://amzn-s3-demo-bucket/AWSLogs/aws_account_id/vpcdnsquerylogs/{vpc-id}/'
   ```

   由于解析程序查询日志数据是 JSON 格式的，因此 CREATE TABLE 语句将使用 [JSON SerDe 库](json-serde.md)来分析数据。
**注意**  
SerDe 期望每个 JSON 文档都位于单行文本中，并且不使用行终止字符分隔记录中的字段。如果 JSON 文本采用美观的打印格式，当您在创建表后尝试对其进行查询时，可能会收到类似以下内容的错误消息：HIVE\$1CURSOR\$1ERROR: Row is not a valid JSON Object（HIVE\$1CURSOR\$1ERROR：行不是有效的 JSON 对象）或 HIVE\$1CURSOR\$1ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT（HIVE\$1CURSOR\$1ERROR：JsonParseException：意外的输入结束：对象的预期关闭标记）。有关更多信息，请参阅 GitHub 上 OpenX SerDe 文档中的 [JSON 数据文件](https://github.com/rcongiu/Hive-JSON-Serde#json-data-files)。

1. 选择**运行查询**。该语句创建名为 `r53_rlogs` 的 Athena 表，其中的列表示解析程序日志数据中的每个字段。

1. 在 Athena 控制台查询编辑器中，运行以下查询以验证您的表是否已创建。

   ```
   SELECT * FROM "r53_rlogs" LIMIT 10
   ```

# 使用分区投影
<a name="querying-r53-resolver-logs-partitioning-example"></a>

以下示例显示了 Resolver 查询日志的 `CREATE TABLE` 语句，该语句使用分区投影并按 VPC 和日期进行分区。更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

```
CREATE EXTERNAL TABLE r53_rlogs (
  version string,
  account_id string,
  region string,
  vpc_id string,
  query_timestamp string,
  query_name string,
  query_type string,
  query_class string,
  rcode string,
  answers array<
    struct<
      Rdata: string,
      Type: string,
      Class: string>
    >,
  srcaddr string,
  srcport int,
  transport string,
  srcids struct<
    instance: string,
    resolver_endpoint: string
    >,
  firewall_rule_action string,
  firewall_rule_group_id string,
  firewall_domain_list_id string
)
PARTITIONED BY (
`date` string,
`vpc` string
)
ROW FORMAT SERDE      'org.openx.data.jsonserde.JsonSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT          'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION              's3://amzn-s3-demo-bucket/route53-query-logging/AWSLogs/aws_account_id/vpcdnsquerylogs/'
TBLPROPERTIES(
'projection.enabled' = 'true',
'projection.vpc.type' = 'enum',
'projection.vpc.values' = 'vpc-6446ae02',
'projection.date.type' = 'date',
'projection.date.range' = '2023/06/26,NOW',
'projection.date.format' = 'yyyy/MM/dd',
'projection.date.interval' = '1',
'projection.date.interval.unit' = 'DAYS',
'storage.location.template' = 's3://amzn-s3-demo-bucket/route53-query-logging/AWSLogs/aws_account_id/vpcdnsquerylogs/${vpc}/${date}/'
)
```

# 示例查询
<a name="querying-r53-resolver-logs-example-queries"></a>

以下示例显示了您可以从 Athena 对解析程序查询日志执行的一些查询。

## 示例 1 – 按降序 query\$1timestamp 顺序查询日志
<a name="querying-r53-resolver-logs-example-1-query-logs-in-descending-query_timestamp-order"></a>

以下查询以降序 `query_timestamp` 顺序显示日志结果。

```
SELECT * FROM "r53_rlogs"
ORDER BY query_timestamp DESC
```

## 示例 2 – 指定的开始时间和结束时间内的查询日志
<a name="querying-r53-resolver-logs-example-2-query-logs-within-specified-start-and-end-times"></a>

以下查询查询在 2020 年 9 月 24 日午夜和上午 8 点之间日志。根据您自己的要求替换开始时间和结束时间。

```
SELECT query_timestamp, srcids.instance, srcaddr, srcport, query_name, rcode
FROM "r53_rlogs"
WHERE (parse_datetime(query_timestamp,'yyyy-MM-dd''T''HH:mm:ss''Z')
     BETWEEN parse_datetime('2020-09-24-00:00:00','yyyy-MM-dd-HH:mm:ss') 
     AND parse_datetime('2020-09-24-00:08:00','yyyy-MM-dd-HH:mm:ss'))
ORDER BY query_timestamp DESC
```

## 示例 3 – 基于指定 DNS 查询名称模式的查询日志
<a name="querying-r53-resolver-logs-example-3-query-logs-based-on-a-specified-dns-query-name-pattern"></a>

以下查询选择其查询名称包含字符串“example.com”的记录。

```
SELECT query_timestamp, srcids.instance, srcaddr, srcport, query_name, rcode, answers
FROM "r53_rlogs"
WHERE query_name LIKE '%example.com%'
ORDER BY query_timestamp DESC
```

## 示例 4 – 没有答案的查询日志请求
<a name="querying-r53-resolver-logs-example-4-query-log-requests-with-no-answer"></a>

以下查询选择请求未收到任何答案的日志条目。

```
SELECT query_timestamp, srcids.instance, srcaddr, srcport, query_name, rcode, answers
FROM "r53_rlogs"
WHERE cardinality(answers) = 0
```

## 示例 5 – 查询具有特定答案的日志
<a name="querying-r53-resolver-logs-example-5-query-logs-with-a-specific-answer"></a>

以下查询显示了 `answer.Rdata` 值具有指定的 IP 地址的日志。

```
SELECT query_timestamp, srcids.instance, srcaddr, srcport, query_name, rcode, answer.Rdata
FROM "r53_rlogs"
CROSS JOIN UNNEST(r53_rlogs.answers) as st(answer)
WHERE answer.Rdata='203.0.113.16';
```

# 查询 Amazon SES 事件日志
<a name="querying-ses-logs"></a>

您可以使用 Amazon Athena 查询[Amazon Simple Email Service](https://aws.amazon.com/ses/)（Amazon SES）事件日志。

Amazon SES 是一个电子邮件平台，让您能够使用自己的电子邮件地址和域，经济高效且方便地收发电子邮件。您可以使用事件、指标和统计信息更加细致地监控 Amazon SES 发送活动。

您可以根据自己定义的特征，将 Amazon SES 事件发布到 [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/)、[Amazon Data Firehose](https://aws.amazon.com/kinesis/data-firehose/) 或 [Amazon Simple Notification Service](https://aws.amazon.com/sns/)。将信息存储到 Amazon S3 以后，您可以从 Amazon Athena 进行查询。

[有关 Amazon SES 日志的 Athena `CREATE TABLE` 语句示例，包括如何在 Amazon SES 事件日志数据中创建视图和展平嵌套数组的步骤，请参阅 AWS 博客文章 Analyzing Amazon SES event data with AWS Analytics Services](https://aws.amazon.com/blogs/messaging-and-targeting/analyzing-amazon-ses-event-data-with-aws-analytics-services/) 中的“Step 3: Using Amazon Athena to query the SES event logs”。

# 查询 Amazon VPC 流日志
<a name="vpc-flow-logs"></a>

Amazon Virtual Private Cloud 流日志捕获有关在 VPC 中传入和传出网络接口的 IP 流量的信息。可使用日志来调查网络流量模式，并识别 VPC 网络中的威胁和风险。

若要查询 Amazon VPC 流日志，您有两种选择：

****
+ **Amazon VPC 控制台** – 使用 Amazon VPC 控制台中的 Athena 集成功能生成 CloudFormation 模板，用于创建 Athena 数据库、工作组和流日志表并为您进行分区。该模板还会创建一组[预定义的流日志查询](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-athena.html#predefined-queries)，可用于获取有关流经 VPC 的流量的洞察。

  有关此方法更多信息，请参阅《*Amazon VPC 用户指南*》中的[使用 Amazon Athena 查询流日志](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-athena.html)。
+ **Amazon Athena 控制台** – 直接在 Athena 控制台中创建表和查询。有关更多信息，请继续阅读此页面。

当您开始在 Athena 中查询日志之前，[启用 VPC 流日志](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs.html)，并将其配置为保存到您的 Amazon S3 存储桶。在您创建日志后，让它们运行几分钟以收集一些数据。这些日志是采用 Athena 允许您直接查询的 GZIP 压缩格式创建的。

创建 VPC 流日志时，当您希望指定在流日志中返回的字段以及这些字段的显示顺序时，请使用自定义格式。有关流日志记录的更多信息，请参阅《*Amazon VPC 用户指南*》中的[流日志记录](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html#flow-log-records)。

## 注意事项和限制
<a name="vpc-flow-logs-common-considerations"></a>

在 Athena 中为 Amazon VPC 流日志创建表时，请记住以下几点：
+ 默认情况下，在 Athena 中，Parquet 将按名称访问列。有关更多信息，请参阅 [处理架构更新](handling-schema-updates-chapter.md)。
+ 使用流日志记录中的名称作为 Athena 中列的名称。Athena 架构中列的名称应与 Amazon VPC 流日志中的字段名称完全匹配，但有以下不同之处：
  + 将 Amazon VPC 日志字段名称中的连字符替换为 Athena 列名称中的下划线。有关 Athena 中可接受的数据库名称、表名以及列名的字符的信息，请参阅[命名数据库、表和列](tables-databases-columns-names.md)。
  + 在 Athena 中，使用反引号将[保留关键字](reserved-words.md)中的流日志记录名称括起来，将其转义。
+ VPC 流日志特定于 AWS 账户。当您将日志文件发布到 Amazon S3 时，在 Amazon S3 中创建的 Amazon VPC 的路径包含用于创建流日志的 AWS 账户 ID。有关更多信息，请参阅 *Amazon VPC 用户指南*中的[将流日志发布到 Amazon S3](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3.html)。

**Topics**
+ [注意事项和限制](#vpc-flow-logs-common-considerations)
+ [为 Amazon VPC 流日志创建表并对其进行查询](vpc-flow-logs-create-table-statement.md)
+ [以 Apache Parquet 格式为流日志创建表](vpc-flow-logs-parquet.md)
+ [使用分区投影功能创建和查询 Amazon VPC 流日志表](vpc-flow-logs-partition-projection.md)
+ [使用分区投影功能并以 Apache Parquet 格式为流日志创建表](vpc-flow-logs-partition-projection-parquet-example.md)
+ [其他资源](query-examples-vpc-logs-additional-resources.md)

# 为 Amazon VPC 流日志创建表并对其进行查询
<a name="vpc-flow-logs-create-table-statement"></a>

以下过程将为 Amazon VPC 流日志创建 Amazon VPC 表。使用自定义格式创建流日志时，可以创建一个表，其字段与您在创建流日志时指定的字段相匹配，且字段顺序与您指定的字段顺序相同。

**为 Amazon VPC 流日志创建 Athena 表**

1. 按照 [注意事项和限制](vpc-flow-logs.md#vpc-flow-logs-common-considerations) 部分中的准则，在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。此示例语句创建一个表，其中包含 Amazon VPC 流日志版本 2 到 5 的列，如[流日志记录](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html#flow-log-records)中所示。如果使用不同的列集或列顺序，请相应地修改语句。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` (
     version int,
     account_id string,
     interface_id string,
     srcaddr string,
     dstaddr string,
     srcport int,
     dstport int,
     protocol bigint,
     packets bigint,
     bytes bigint,
     start bigint,
     `end` bigint,
     action string,
     log_status string,
     vpc_id string,
     subnet_id string,
     instance_id string,
     tcp_flags int,
     type string,
     pkt_srcaddr string,
     pkt_dstaddr string,
     region string,
     az_id string,
     sublocation_type string,
     sublocation_id string,
     pkt_src_aws_service string,
     pkt_dst_aws_service string,
     flow_direction string,
     traffic_path int
   )
   PARTITIONED BY (`date` date)
   ROW FORMAT DELIMITED
   FIELDS TERMINATED BY ' '
   LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/'
   TBLPROPERTIES ("skip.header.line.count"="1");
   ```

   请注意以下几点：
   + 此查询指定 `ROW FORMAT DELIMITED` 并省略指定 SerDe。这意味着查询使用[用于 CSV、TSV 和自定义分隔文件的 Lazy Simple SerDe](lazy-simple-serde.md)。在此查询中，字段由一个空格终止。
   + `PARTITIONED BY` 子句使用 `date` 类型。这样，就可以在查询中使用数学运算符来选择比特定日期更早或更新的内容。
**注意**  
因为 `date` 在 DDL 语句中是保留关键字，所以用反引号字符对其进行转义。有关更多信息，请参阅 [转义查询中的保留关键字](reserved-words.md)。
   + 对于具有不同自定义格式的 VPC 流日志，请修改字段以与您创建流日志时指定的字段相匹配。

1. 修改 `LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/'` 以指向包含您的日志数据的 Amazon S3 存储桶。

1. 在 Athena 控制台中运行查询。查询完成后，Athena 将注册 `vpc_flow_logs` 表，使其中的数据可以供您发出查询。

1. 创建分区以便能够读取数据，如以下示例查询中所示。此示例查询创建指定日期的单个分区。根据需要替换日期和位置的占位符。
**注意**  
此查询仅为您指定的日期创建单个分区。若要自动执行此过程，请使用运行此查询并以这种方式为 `year/month/day` 创建分区的脚本，或使用 `CREATE TABLE` 语句指定[分区投影](vpc-flow-logs-partition-projection.md)。

   ```
   ALTER TABLE vpc_flow_logs
   ADD PARTITION (`date`='YYYY-MM-dd')
   LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/YYYY/MM/dd';
   ```

## vpc\$1flow\$1logs 表的查询示例
<a name="query-examples-vpc-logs"></a>

使用 Athena 控制台中的查询编辑器在创建的表上运行 SQL 语句。您可以保存查询、查看之前的查询或下载 CSV 格式的查询结果。在以下示例中，将 `vpc_flow_logs` 替换为表名称。根据您的要求修改列值和其他变量。

以下示例查询列出了指定日期的最多 100 个流日志。

```
SELECT * 
FROM vpc_flow_logs 
WHERE date = DATE('2020-05-04') 
LIMIT 100;
```

以下查询列出所有被拒绝的 TCP 连接并使用新创建的日期分区列 `date` 来从中提取这些事件发生的星期几。

```
SELECT day_of_week(date) AS
  day,
  date,
  interface_id,
  srcaddr,
  action,
  protocol
FROM vpc_flow_logs
WHERE action = 'REJECT' AND protocol = 6
LIMIT 100;
```

若要查看哪个服务器收到最大数量的 HTTPS 请求，请使用以下查询。它计算在 HTTPS 端口 443 上接收的数据包数，按目标 IP 地址对它们进行分组，并返回上一周的前 10 个。

```
SELECT SUM(packets) AS
  packetcount,
  dstaddr
FROM vpc_flow_logs
WHERE dstport = 443 AND date > current_date - interval '7' day
GROUP BY dstaddr
ORDER BY packetcount DESC
LIMIT 10;
```

# 以 Apache Parquet 格式为流日志创建表
<a name="vpc-flow-logs-parquet"></a>

以下过程将以 Apache Parquet 格式为 Amazon VPC 流日志创建 Amazon VPC 表。

**以 Parquet 格式为 Amazon VPC 流日志创建 Athena 表**

1. 按照 [注意事项和限制](vpc-flow-logs.md#vpc-flow-logs-common-considerations) 部分中的准则，在 Athena 控制台查询编辑器中输入类似以下内容的 DDL 语句。以下示例语句创建一个表，其中包含 Amazon VPC 流日志版本 2 到 5 的列，如 Parquet 格式的[流日志记录](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html#flow-log-records)中所示，Hive 按小时分区。如果您没有按小时分区，请删除 `PARTITIONED BY` 子句中的 `hour`。

   ```
   CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_parquet (
     version int,
     account_id string,
     interface_id string,
     srcaddr string,
     dstaddr string,
     srcport int,
     dstport int,
     protocol bigint,
     packets bigint,
     bytes bigint,
     start bigint,
     `end` bigint,
     action string,
     log_status string,
     vpc_id string,
     subnet_id string,
     instance_id string,
     tcp_flags int,
     type string,
     pkt_srcaddr string,
     pkt_dstaddr string,
     region string,
     az_id string,
     sublocation_type string,
     sublocation_id string,
     pkt_src_aws_service string,
     pkt_dst_aws_service string,
     flow_direction string,
     traffic_path int
   )
   PARTITIONED BY (
     `aws-account-id` string,
     `aws-service` string,
     `aws-region` string,
     `year` string, 
     `month` string, 
     `day` string,
     `hour` string
   )
   ROW FORMAT SERDE 
     'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
   STORED AS INPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
   OUTPUTFORMAT 
     'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
   LOCATION
     's3://amzn-s3-demo-bucket/prefix/AWSLogs/'
   TBLPROPERTIES (
     'EXTERNAL'='true', 
     'skip.header.line.count'='1'
     )
   ```

1. 修改示例 `LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/'` 以指向包含您的日志数据的 Amazon S3 路径。

1. 在 Athena 控制台中运行查询。

1. 如果数据采用 Hive 兼容格式，请在 Athena 控制台中运行以下命令，更新并加载元存储中的 Hive 分区。查询完成后，可以查询 `vpc_flow_logs_parquet` 表中的数据。

   ```
   MSCK REPAIR TABLE vpc_flow_logs_parquet
   ```

   如果您使用的不是 Hive 兼容数据，请运行 [ALTER TABLE ADD PARTITION](alter-table-add-partition.md) 以加载分区。

有关使用 Athena 查询 Parquet 格式 Amazon VPC 流日志的更多信息，请参阅 *AWS 大数据博客*中的文章 [使用 Apache Pparquet 格式的 VPC 流日志优化性能并降低网络分析成本](https://aws.amazon.com/blogs/big-data/optimize-performance-and-reduce-costs-for-network-analytics-with-vpc-flow-logs-in-apache-parquet-format/)。

# 使用分区投影功能创建和查询 Amazon VPC 流日志表
<a name="vpc-flow-logs-partition-projection"></a>

使用如下所示的 `CREATE TABLE` 语句创建表、对表进行分区并使用[分区投影](partition-projection.md)自动填充分区。将示例中的表名称 `test_table_vpclogs` 替换为您的表名称。编辑 `LOCATION` 子句以指定包含 Amazon VPC 日志数据的 Amazon S3 存储桶。

以下 `CREATE TABLE` 语句适用于以非 Hive 样式的分区格式传送的 VPC 流日志。该示例支持多账户聚合。要将来自多个账户的 VPC 流日志集中到一个 Amazon S3 存储桶中，必须在 Amazon S3 路径中输入账户 ID。

```
CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol bigint,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string,
  az_id string,
  sublocation_type string,
  sublocation_id string,
  pkt_src_aws_service string,
  pkt_dst_aws_service string,
  flow_direction string,
  traffic_path int
)
PARTITIONED BY (accid string, region string, day string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
LOCATION '$LOCATION_OF_LOGS'
TBLPROPERTIES
(
"skip.header.line.count"="1",
"projection.enabled" = "true",
"projection.accid.type" = "enum",
"projection.accid.values" = "$ACCID_1,$ACCID_2",
"projection.region.type" = "enum",
"projection.region.values" = "$REGION_1,$REGION_2,$REGION_3",
"projection.day.type" = "date",
"projection.day.range" = "$START_RANGE,NOW",
"projection.day.format" = "yyyy/MM/dd",
"storage.location.template" = "s3://amzn-s3-demo-bucket/AWSLogs/${accid}/vpcflowlogs/${region}/${day}"
)
```

## test\$1table\$1vpclogs 表的查询示例
<a name="query-examples-vpc-logs-pp"></a>

以下示例查询将查询之前 `CREATE TABLE` 语句创建的 `test_table_vpclogs`。将查询中的 `test_table_vpclogs` 替换为您自己的表名称。根据您的要求修改列值和其他变量。

若要在指定时间段内按时间顺序返回前 100 个访问日志条目，请运行如下所示的查询。

```
SELECT *
FROM test_table_vpclogs
WHERE day >= '2021/02/01' AND day < '2021/02/28'
ORDER BY day ASC
LIMIT 100
```

若要查看哪个服务器在指定时间段内接收前十个 HTTP 数据包，请运行如下所示的查询。该查询计算在 HTTPS 端口 443 上接收的数据包数，按目标 IP 地址对其进行分组，并返回上一周的前 10 个条目。

```
SELECT SUM(packets) AS packetcount, 
       dstaddr
FROM test_table_vpclogs
WHERE dstport = 443
  AND day >= '2021/03/01'
  AND day < '2021/03/31'
GROUP BY dstaddr
ORDER BY packetcount DESC
LIMIT 10
```

若要返回在指定时间段内创建的日志，请运行如下所示的查询。

```
SELECT interface_id,
       srcaddr,
       action,
       protocol,
       to_iso8601(from_unixtime(start)) AS start_time,
       to_iso8601(from_unixtime("end")) AS end_time
FROM test_table_vpclogs
WHERE DAY >= '2021/04/01'
  AND DAY < '2021/04/30'
```

若要返回指定时间段内源 IP 地址的访问日志，请运行如下所示的查询。

```
SELECT *
FROM test_table_vpclogs
WHERE srcaddr = '10.117.1.22'
  AND day >= '2021/02/01'
  AND day < '2021/02/28'
```

若要列出已被拒绝的 TCP 连接，请运行如下所示的查询。

```
SELECT day,
       interface_id,
       srcaddr,
       action,
       protocol
FROM test_table_vpclogs
WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28'
LIMIT 10
```

若要返回以 `10.117` 开头的 IP 地址范围的访问日志，请运行如下所示的查询。

```
SELECT *
FROM test_table_vpclogs
WHERE split_part(srcaddr,'.', 1)='10'
  AND split_part(srcaddr,'.', 2) ='117'
```

若要返回某个时间范围内目标 IP 地址的访问日志，请运行如下所示的查询。

```
SELECT *
FROM test_table_vpclogs
WHERE dstaddr = '10.0.1.14'
  AND day >= '2021/01/01'
  AND day < '2021/01/31'
```

# 使用分区投影功能并以 Apache Parquet 格式为流日志创建表
<a name="vpc-flow-logs-partition-projection-parquet-example"></a>

以下用于 VPC 流日志的分区投影 `CREATE TABLE` 语句采用 Apache Parquet 格式，不兼容 Hive，按小时和日期而不是按天分区。将示例中的表名称 `test_table_vpclogs_parquet` 替换为您的表名称。编辑 `LOCATION` 子句以指定包含 Amazon VPC 日志数据的 Amazon S3 存储桶。

```
CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs_parquet (
  version int,
  account_id string,
  interface_id string,
  srcaddr string,
  dstaddr string,
  srcport int,
  dstport int,
  protocol bigint,
  packets bigint,
  bytes bigint,
  start bigint,
  `end` bigint,
  action string,
  log_status string,
  vpc_id string,
  subnet_id string,
  instance_id string,
  tcp_flags int,
  type string,
  pkt_srcaddr string,
  pkt_dstaddr string,
  az_id string,
  sublocation_type string,
  sublocation_id string,
  pkt_src_aws_service string,
  pkt_dst_aws_service string,
  flow_direction string,
  traffic_path int
)
PARTITIONED BY (region string, date string, hour string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://amzn-s3-demo-bucket/prefix/AWSLogs/{account_id}/vpcflowlogs/'
TBLPROPERTIES (
"EXTERNAL"="true",
"skip.header.line.count" = "1",
"projection.enabled" = "true",
"projection.region.type" = "enum",
"projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1",
"projection.date.type" = "date",
"projection.date.range" = "2021/01/01,NOW",
"projection.date.format" = "yyyy/MM/dd",
"projection.hour.type" = "integer",
"projection.hour.range" = "00,23",
"projection.hour.digits" = "2",
"storage.location.template" = "s3://amzn-s3-demo-bucket/prefix/AWSLogs/${account_id}/vpcflowlogs/${region}/${date}/${hour}"
)
```

# 其他资源
<a name="query-examples-vpc-logs-additional-resources"></a>

有关使用 Athena 分析 VPC 流日志的更多信息，请参阅以下 AWS 大数据博客文章：
+ [通过点击式 Amazon Athena 集成分析 VPC 流日志](https://aws.amazon.com/blogs/networking-and-content-delivery/analyze-vpc-flow-logs-with-point-and-click-amazon-athena-integration/) 
+ [使用 Amazon Athena 和 Quick 分析 VPC 流日志](https://aws.amazon.com/blogs/big-data/analyzing-vpc-flow-logs-using-amazon-athena-and-amazon-quicksight/)
+ [使用 Apache Parquet 格式的 VPC 流日志优化性能并降低网络分析成本](https://aws.amazon.com/blogs/big-data/optimize-performance-and-reduce-costs-for-network-analytics-with-vpc-flow-logs-in-apache-parquet-format/)

# 查询 AWS WAF日志
<a name="waf-logs"></a>

AWS WAF 是一个 Web 应用程序防火墙，可以监视和控制受保护的 Web 应用程序从客户端收到的 HTTP 和 HTTPS 请求。您可以通过在 AWS WAF Web 访问控制列表（ACL）中配置规则来定义如何处理 Web 请求。然后，您可以通过将 Web ACL 关联到 Web 应用程序来保护该应用程序。您可以使用 AWS WAF 保护的 Web 应用程序资源的示例包括 Amazon CloudFront 分配、Amazon API Gateway REST API 和应用程序负载均衡器。有关 AWS WAF 的更多信息，请参阅*《AWS WAF Developer Guide》*中的 [AWS WAF](https://docs.aws.amazon.com/waf/latest/developerguide/waf-chapter.html)。

AWS WAF 日志包含您的 Web ACL 所分析的流量相关信息，例如 AWS WAF 从 AWS 资源收到请求的时间，有关请求的详细信息，以及每个请求所匹配的规则的操作。

您可以配置 AWS WAF Web ACL 将日志发布到多个目标中的一个目标，即您可以查询和查看这些日志的地方。有关配置 Web ACL 日志记录和 AWS WAF 日志内容的更多信息，请参阅*《AWS WAF Developer Guide》*中的 [Logging AWS WAF web ACL traffic](https://docs.aws.amazon.com/waf/latest/developerguide/logging.html)。

有关如何使用 Athena 分析 AWS WAF 日志以深入了解威胁检测和潜在安全攻击的信息，请参阅 AWS 网络和内容交付博客文章 [How to use Amazon Athena queries to analyze AWS WAF logs and provide the visibility needed for threat detection](https://aws.amazon.com/blogs/networking-and-content-delivery/how-to-use-amazon-athena-queries-to-analyze-aws-waf-logs-and-provide-the-visibility-needed-for-threat-detection/)。

有关如何将 AWS WAF 日志聚合到中央数据湖存储库并使用 Athena 进行查询的示例，请参阅 AWS 大数据博客文章 [Analyzing AWS WAF logs with OpenSearch Service, Amazon Athena, and Quick](https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/)。

本主题提供了分区投影、手动分区和不使用任何分区的 `CREATE TABLE` 语句示例。

**注意**  
本主题中的 `CREATE TABLE` 语句可以用于 v1 和 v2 AWS WAF 日志。在 v1 中，`webaclid` 字段包含一个 ID。在 v2 中，`webaclid` 字段包含完整的 ARN。这里的 `CREATE TABLE` 语句通过使用 `string` 数据类型未知地处理此内容。

**Topics**
+ [使用分区投影为 Athena 中的 AWS WAF S3 日志创建表](create-waf-table-partition-projection.md)
+ [使用手动分区为 Athena 中的 AWS WAF S3 日志创建表](create-waf-table-manual-partition.md)
+ [创建不进行分区的 AWS WAF 日志表](create-waf-table.md)
+ [AWS WAF 日志的示例查询](query-examples-waf-logs.md)

# 使用分区投影为 Athena 中的 AWS WAF S3 日志创建表
<a name="create-waf-table-partition-projection"></a>

由于 AWS WAF 日志具有您可以预先指定其分区方案的已知结构，因此您可以使用 Athena [分区投影](partition-projection.md)功能减少查询运行时间并自动管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

以下示例 `CREATE TABLE` 语句会自动在 AWS WAF 日志上从指定日期开始到当前日期为止，为四个不同 AWS 区域使用分区投影。本示例中的 `PARTITION BY` 子句按区域和日期进行分区，但您可以根据自己的要求修改此子句。根据需要修改字段以匹配您的日志输出。在 `LOCATION` 和 `storage.location.template` 子句中，将 *amzn-s3-demo-bucket* 和 *AWS\$1ACCOUNT\$1NUMBER* 占位符替换为值，该值标识 AWS WAF 日志在 Amazon S3 存储桶中的位置。对于 `projection.day.range`，将 *2021*/*01*/*01* 替换为要使用的开始日期。成功运行查询后，您可以查询表。您无需运行 `ALTER TABLE ADD PARTITION` 来加载分区。

```
CREATE EXTERNAL TABLE `waf_logs_partition_projection`(
  `timestamp` bigint, 
  `formatversion` int, 
  `webaclid` string, 
  `terminatingruleid` string, 
  `terminatingruletype` string, 
  `action` string, 
  `terminatingrulematchdetails` array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>, 
  `httpsourcename` string, 
  `httpsourceid` string, 
  `rulegrouplist` array<struct<rulegroupid:string,terminatingrule:struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>>,nonterminatingmatchingrules:array<struct<ruleid:string,action:string,overriddenaction:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>,excludedrules:string>>, 
  `ratebasedrulelist` array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>, 
  `nonterminatingmatchingrules` array<struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>, 
  `requestheadersinserted` array<struct<name:string,value:string>>, 
  `responsecodesent` string, 
  `httprequest` struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string,fragment:string,scheme:string,host:string>,
  `labels` array<struct<name:string>>, 
  `captcharesponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `challengeresponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `ja3fingerprint` string, 
  `ja4fingerprint` string, 
  `oversizefields` string, 
  `requestbodysize` int, 
  `requestbodysizeinspectedbywaf` int)
  PARTITIONED BY ( 
   `log_time` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/testui/'
TBLPROPERTIES (
 'projection.enabled'='true',
  'projection.log_time.format'='yyyy/MM/dd/HH/mm',
  'projection.log_time.interval'='1',
  'projection.log_time.interval.unit'='minutes',
  'projection.log_time.range'='2025/01/01/00/00,NOW',
  'projection.log_time.type'='date',
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/testui/${log_time}')
```

**注意**  
示例中 `LOCATION` 子句中的路径格式是标准格式，但可能因所实施的 AWS WAF 配置而异。例如，以下示例 AWS WAF 日志路径适用于 CloudFront 分配：  

```
s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/cloudfronyt/2025/01/01/00/00/
```
如果您在创建或查询 AWS WAF 日志表时遇到问题，请确认日志数据位置或联系 [支持](https://console.aws.amazon.com/support/home/)。

更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

# 使用手动分区为 Athena 中的 AWS WAF S3 日志创建表
<a name="create-waf-table-manual-partition"></a>

本节介绍了如何使用手动分区来为 AWS WAF 日志创建表。

在 `LOCATION` 和 `storage.location.template` 子句中，将 *amzn-s3-demo-bucket* 和 *AWS\$1ACCOUNT\$1NUMBER* 占位符替换为值，该值标识 AWS WAF 日志在 Amazon S3 存储桶中的位置。

```
CREATE EXTERNAL TABLE `waf_logs_manual_partition`(
  `timestamp` bigint, 
  `formatversion` int, 
  `webaclid` string, 
  `terminatingruleid` string, 
  `terminatingruletype` string, 
  `action` string, 
  `terminatingrulematchdetails` array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>, 
  `httpsourcename` string, 
  `httpsourceid` string, 
  `rulegrouplist` array<struct<rulegroupid:string,terminatingrule:struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>>,nonterminatingmatchingrules:array<struct<ruleid:string,action:string,overriddenaction:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>,excludedrules:string>>, 
  `ratebasedrulelist` array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>, 
  `nonterminatingmatchingrules` array<struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>, 
  `requestheadersinserted` array<struct<name:string,value:string>>, 
  `responsecodesent` string, 
  `httprequest` struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string,fragment:string,scheme:string,host:string>, 
  `labels` array<struct<name:string>>, 
  `captcharesponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `challengeresponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `ja3fingerprint` string, 
  `ja4fingerprint` string, 
  `oversizefields` string, 
  `requestbodysize` int, 
  `requestbodysizeinspectedbywaf` int)
  PARTITIONED BY ( `year` string, `month` string, `day` string, `hour` string, `min` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/webacl/'
```

# 创建不进行分区的 AWS WAF 日志表
<a name="create-waf-table"></a>

本节介绍如何创建不进行分区或分区投影的 AWS WAF 日志表。

**注意**  
出于性能和成本原因，不建议使用非分区架构进行查询。有关更多信息，请参阅 AWS 大数据博客中的 [Top 10 Performance Tuning Tips for Amazon Athena](https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/)（Amazon Athena 的十大性能优化技巧）。

**创建 AWS WAF 表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。根据需要修改字段以匹配您的日志输出。修改 Amazon S3 存储桶的 `LOCATION` 以对应用于存储日志的存储桶。

   此查询使用 [OpenX JSON SerDe](openx-json-serde.md)。
**注意**  
SerDe 期望每个 JSON 文档都位于单行文本中，并且不使用行终止字符分隔记录中的字段。如果 JSON 文本采用美观的打印格式，当您在创建表后尝试对其进行查询时，可能会收到类似以下内容的错误消息：HIVE\$1CURSOR\$1ERROR: Row is not a valid JSON Object（HIVE\$1CURSOR\$1ERROR：行不是有效的 JSON 对象）或 HIVE\$1CURSOR\$1ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT（HIVE\$1CURSOR\$1ERROR：JsonParseException：意外的输入结束：对象的预期关闭标记）。有关更多信息，请参阅 GitHub 上 OpenX SerDe 文档中的 [JSON 数据文件](https://github.com/rcongiu/Hive-JSON-Serde#json-data-files)。

   ```
   CREATE EXTERNAL TABLE `waf_logs`(
     `timestamp` bigint,
     `formatversion` int,
     `webaclid` string,
     `terminatingruleid` string,
     `terminatingruletype` string,
     `action` string,
     `terminatingrulematchdetails` array <
                                       struct <
                                           conditiontype: string,
                                           sensitivitylevel: string,
                                           location: string,
                                           matcheddata: array < string >
                                             >
                                        >,
     `httpsourcename` string,
     `httpsourceid` string,
     `rulegrouplist` array <
                         struct <
                             rulegroupid: string,
                             terminatingrule: struct <
                                                 ruleid: string,
                                                 action: string,
                                                 rulematchdetails: array <
                                                                      struct <
                                                                          conditiontype: string,
                                                                          sensitivitylevel: string,
                                                                          location: string,
                                                                          matcheddata: array < string >
                                                                             >
                                                                       >
                                                   >,
                             nonterminatingmatchingrules: array <
                                                                 struct <
                                                                     ruleid: string,
                                                                     action: string,
                                                                     overriddenaction: string,
                                                                     rulematchdetails: array <
                                                                                          struct <
                                                                                              conditiontype: string,
                                                                                              sensitivitylevel: string,
                                                                                              location: string,
                                                                                              matcheddata: array < string >
                                                                                                 >
                                                                      >,
                                                                     challengeresponse: struct <
                                                                               responsecode: string,
                                                                               solvetimestamp: string
                                                                                 >,
                                                                     captcharesponse: struct <
                                                                               responsecode: string,
                                                                               solvetimestamp: string
                                                                                 >
                                                                       >
                                                                >,
                             excludedrules: string
                               >
                          >,
   `ratebasedrulelist` array <
                            struct <
                                ratebasedruleid: string,
                                limitkey: string,
                                maxrateallowed: int
                                  >
                             >,
     `nonterminatingmatchingrules` array <
                                       struct <
                                           ruleid: string,
                                           action: string,
                                           rulematchdetails: array <
                                                                struct <
                                                                    conditiontype: string,
                                                                    sensitivitylevel: string,
                                                                    location: string,
                                                                    matcheddata: array < string >
                                                                       >
                                                                >,
                                           challengeresponse: struct <
                                                               responsecode: string,
                                                               solvetimestamp: string
                                                                >,
                                           captcharesponse: struct <
                                                               responsecode: string,
                                                               solvetimestamp: string
                                                                >
                                             >
                                        >,
     `requestheadersinserted` array <
                                   struct <
                                       name: string,
                                       value: string
                                         >
                                    >,
     `responsecodesent` string,
     `httprequest` struct <
                       clientip: string,
                       country: string,
                       headers: array <
                                   struct <
                                       name: string,
                                       value: string
                                         >
                                    >,
                       uri: string,
                       args: string,
                       httpversion: string,
                       httpmethod: string,
                       requestid: string
                         >,
     `labels` array <
                  struct <
                      name: string
                        >
                   >,
     `captcharesponse` struct <
                           responsecode: string,
                           solvetimestamp: string,
                           failureReason: string
                             >,
     `challengeresponse` struct <
                           responsecode: string,
                           solvetimestamp: string,
                           failureReason: string
                           >,
     `ja3Fingerprint` string,
     `oversizefields` string,
     `requestbodysize` int,
     `requestbodysizeinspectedbywaf` int
   )
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
   STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION 's3://amzn-s3-demo-bucket/prefix/'
   ```

1. 在 Athena 控制台查询编辑器中运行 `CREATE EXTERNAL TABLE` 语句。这将注册 `waf_logs` 表，并使其中的数据可用于来自 Athena 的查询。

# AWS WAF 日志的示例查询
<a name="query-examples-waf-logs"></a>

本部分中许多示例查询使用之前创建的分区投影表。根据您的要求修改示例中的表名称、列值和其他变量。若要提高查询的性能并降低成本，请在筛选条件中添加分区列。

**Topics**
+ [统计引用站点、IP 地址或匹配的规则](query-examples-waf-logs-count.md)
+ [使用日期和时间进行查询](query-examples-waf-logs-date-time.md)
+ [查询被阻止的请求或地址](query-examples-waf-logs-blocked-requests.md)

# 统计引用站点、IP 地址或匹配的规则
<a name="query-examples-waf-logs-count"></a>

本部分中的示例查询相关日志项的计数。
+ [Count the number of referrers that contain a specified term](#waf-example-count-referrers-with-specified-term)
+ [Count all matched IP addresses in the last 10 days that have matched excluded rules](#waf-example-count-matched-ip-addresses)
+ [Group all counted managed rules by the number of times matched](#waf-example-group-managed-rules-by-times-matched)
+ [Group all counted custom rules by number of times matched](#waf-example-group-custom-rules-by-times-matched)

**Example – 统计包含指定术语的引用站点数量**  
以下查询计算指定日期范围内包含“amazon”一词的引用者数量。  

```
WITH test_dataset AS 
  (SELECT header FROM waf_logs
    CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01'
    AND "date" < '2021/03/31')
SELECT COUNT(*) referer_count 
FROM test_dataset 
WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
```

**Example – 统计过去 10 天内与排除规则匹配的所有匹配 IP 地址**  
以下查询计算过去 10 天内 IP 地址与规则组中排除规则匹配的次数。  

```
WITH test_dataset AS 
  (SELECT * FROM waf_logs 
    CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups))
SELECT 
  COUNT(*) AS count, 
  "httprequest"."clientip", 
  "allrulegroups"."excludedrules",
  "allrulegroups"."ruleGroupId"
FROM test_dataset 
WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day
GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules"
ORDER BY count DESC
```

**Example - 按匹配次数对所有已计数的托管规则进行分组**  
如果您在 2022 年 10 月 27 日之前在 Web ACL 配置中将规则组规则操作设置为“计数”，AWS WAF 在 Web ACL JSON 中将覆盖内容保存为 `excludedRules`。现在，用于将规则替换为“计数”的 JSON 设置位于 `ruleActionOverrides` 设置中。有关更多信息，请参阅《AWS WAF 开发人员指南》**中的[规则组中的操作覆盖](https://docs.aws.amazon.com/waf/latest/developerguide/web-acl-rule-group-override-options.html)。要从新的日志结构中提取计数模式下的托管规则，请在 `ruleGroupList` 部分而不是 `excludedRules` 字段中查询 `nonTerminatingMatchingRules`，如下例所示。  

```
SELECT
 count(*) AS count,
 httpsourceid,
 httprequest.clientip,
 t.rulegroupid, 
 t.nonTerminatingMatchingRules
FROM "waf_logs" 
CROSS JOIN UNNEST(rulegrouplist) AS t(t) 
WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 
GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid 
ORDER BY "count" DESC 
Limit 50
```

**Example - 按匹配次数对所有已计数的自定义规则进行分组**  
以下查询按匹配次数对所有已计数的自定义规则进行分组。  

```
SELECT
  count(*) AS count,
         httpsourceid,
         httprequest.clientip,
         t.ruleid,
         t.action
FROM "waf_logs" 
CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) 
WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 
GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip 
ORDER BY "count" DESC
Limit 50
```

有关自定义规则和托管规则组的日志位置的信息，请参阅《AWS WAF 开发人员指南》**中的[监控和调整](https://docs.aws.amazon.com/waf/latest/developerguide/web-acl-testing-activities.html)。

# 使用日期和时间进行查询
<a name="query-examples-waf-logs-date-time"></a>

本部分中的示例包括使用日期和时间值的查询。
+ [Return the timestamp field in human-readable ISO 8601 format](#waf-example-return-human-readable-timestamp)
+ [Return records from the last 24 hours](#waf-example-return-records-last-24-hours)
+ [Return records for a specified date range and IP address](#waf-example-return-records-date-range-and-ip)
+ [For a specified date range, count the number of IP addresses in five minute intervals](#waf-example-count-ip-addresses-in-date-range)
+ [Count the number of X-Forwarded-For IP in the last 10 days](#waf-example-count-x-forwarded-for-ip)

**Example – 以人类可读 ISO 8601 格式返回时间戳字段**  
以下查询使用 `from_unixtime` 和 `to_iso8601` 函数以人类可读的 ISO 8601 格式返回 `timestamp` 字段（例如 `2019-12-13T23:40:12.000Z` 而不是 `1576280412771`）。该查询还返回 HTTP 源名称、源 ID 和请求。  

```
SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601,
       httpsourcename,
       httpsourceid,
       httprequest
FROM waf_logs
LIMIT 10;
```

**Example – 返回过去 24 小时的记录**  
以下查询使用 `WHERE` 子句中的筛选条件返回过去 24 小时内记录的 HTTP 源名称、HTTP 源 ID 和 HTTP 请求字段。  

```
SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, 
       httpsourcename, 
       httpsourceid, 
       httprequest 
FROM waf_logs
WHERE from_unixtime(timestamp/1000) > now() - interval '1' day
LIMIT 10;
```

**Example – 返回指定日期范围和 IP 地址的记录**  
以下查询列出了指定的客户端 IP 地址在指定日期范围内的记录。  

```
SELECT * 
FROM waf_logs 
WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
```

**Example – 对于指定的日期范围，计算在五分钟间隔内的 IP 地址数**  
对于特定日期范围，以下查询计算在五分钟间隔内的 IP 地址数。  

```
WITH test_dataset AS 
  (SELECT 
     format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts,
     "httprequest"."clientip" 
     FROM waf_logs 
     WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31')
SELECT five_minutes_ts,"clientip",count(*) ip_count 
FROM test_dataset 
GROUP BY five_minutes_ts,"clientip"
```

**Example – 计算过去 10 天内 X-Forwarded-For IP 的数量**  
以下查询将筛选请求标头，并统计过去 10 天内 X-Forwarded-For IP 的数量。  

```
WITH test_dataset AS
  (SELECT header
   FROM waf_logs
   CROSS JOIN UNNEST (httprequest.headers) AS t(header)
   WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) 
SELECT header.value AS ip,
       count(*) AS COUNT 
FROM test_dataset 
WHERE header.name='X-Forwarded-For' 
GROUP BY header.value 
ORDER BY COUNT DESC
```

有关日期和时间函数的更多信息，请参阅 Trino 文档中的 [日期与时间函数和运算符](https://trino.io/docs/current/functions/datetime.html)。

# 查询被阻止的请求或地址
<a name="query-examples-waf-logs-blocked-requests"></a>

本部分中的示例查询被阻止的请求或地址。
+ [Extract the top 100 IP addresses blocked by a specified rule type](#waf-example-extract-top-100-blocked-ip-by-rule)
+ [Count the number of times a request from a specified country has been blocked](#waf-example-count-request-blocks-from-country)
+ [Count the number of times a request has been blocked, grouping by specific attributes](#waf-example-count-request-blocks-by-attribute)
+ [Count the number of times a specific terminating rule ID has been matched](#waf-example-count-terminating-rule-id-matches)
+ [Retrieve the top 100 IP addresses blocked during a specified date range](#waf-example-top-100-ip-addresses-blocked-for-date-range)

**Example – 提取被指定规则类型阻止的前 100 个 IP 地址**  
下面的查询将提取并统计在指定的日期范围内被 `RATE_BASED` 终止规则阻止的前 100 个 IP 地址。  

```
SELECT COUNT(httpRequest.clientIp) as count,
httpRequest.clientIp
FROM waf_logs
WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01'
AND "date" < '2021/03/31'
GROUP BY httpRequest.clientIp
ORDER BY count DESC
LIMIT 100
```

**Example – 计算来自指定国家/地区的请求被阻止的次数**  
以下查询针对来自属于爱尔兰 (IE) IP 地址的请求，计算请求到达但被 `RATE_BASED` 终止规则阻止的次数。  

```
SELECT 
  COUNT(httpRequest.country) as count, 
  httpRequest.country 
FROM waf_logs
WHERE 
  terminatingruletype='RATE_BASED' AND 
  httpRequest.country='IE'
GROUP BY httpRequest.country
ORDER BY count
LIMIT 100;
```

**Example – 计算请求被阻止的次数，按特定属性分组**  
以下查询计算请求被阻止的次数，并按照 WebACL、RuleId、ClientIP 和 HTTP 请求 URI 对结果分组。  

```
SELECT 
  COUNT(*) AS count,
  webaclid,
  terminatingruleid,
  httprequest.clientip,
  httprequest.uri
FROM waf_logs
WHERE action='BLOCK'
GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;
```

**Example – 计算特定终止规则 ID 匹配的次数**  
以下查询计算特定终止规则 ID 匹配的次数 (`WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e'`)。然后，查询按照 WebACL、操作、ClientIP 和 HTTP 请求 URI 对结果分组。  

```
SELECT 
  COUNT(*) AS count,
  webaclid,
  action,
  httprequest.clientip,
  httprequest.uri
FROM waf_logs
WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e'
GROUP BY webaclid, action, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;
```

**Example – 检索指定日期范围内被阻止的前 100 个 IP 地址**  
以下查询将提取在指定日期范围内被阻止的前 100 个 IP 地址。该查询还列出了 IP 地址被阻止的次数。  

```
SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country"
FROM waf_logs
WHERE "action" = 'BLOCK' and "date" >= '2021/03/01'
AND "date" < '2021/03/31'
GROUP BY "httprequest"."clientip", "httprequest"."country"
ORDER BY "ipcount" DESC limit 100
```

有关查询 Amazon S3 日志的更多信息，请参阅以下主题：
+ AWS 知识中心中的[如何使用 Athena 分析 Amazon S3 服务器访问日志？](https://aws.amazon.com/premiumsupport/knowledge-center/analyze-logs-athena/)
+ 《Amazon Simple Storage Service 用户指南》中的[使用 Amazon Athena 查询请求的 Amazon S3 访问日志](https://docs.aws.amazon.com/AmazonS3/latest/dev/using-s3-access-logs-to-identify-requests.html#querying-s3-access-logs-for-requests)
+ 《Amazon Simple Storage Service 用户指南》中的[使用 AWS CloudTrail 识别 Amazon S3 请求](https://docs.aws.amazon.com/AmazonS3/latest/dev/cloudtrail-request-identification.html)