

# 查询 AWS WAF日志
<a name="waf-logs"></a>

AWS WAF 是一个 Web 应用程序防火墙，可以监视和控制受保护的 Web 应用程序从客户端收到的 HTTP 和 HTTPS 请求。您可以通过在 AWS WAF Web 访问控制列表（ACL）中配置规则来定义如何处理 Web 请求。然后，您可以通过将 Web ACL 关联到 Web 应用程序来保护该应用程序。您可以使用 AWS WAF 保护的 Web 应用程序资源的示例包括 Amazon CloudFront 分配、Amazon API Gateway REST API 和应用程序负载均衡器。有关 AWS WAF 的更多信息，请参阅*《AWS WAF Developer Guide》*中的 [AWS WAF](https://docs.aws.amazon.com/waf/latest/developerguide/waf-chapter.html)。

AWS WAF 日志包含您的 Web ACL 所分析的流量相关信息，例如 AWS WAF 从 AWS 资源收到请求的时间，有关请求的详细信息，以及每个请求所匹配的规则的操作。

您可以配置 AWS WAF Web ACL 将日志发布到多个目标中的一个目标，即您可以查询和查看这些日志的地方。有关配置 Web ACL 日志记录和 AWS WAF 日志内容的更多信息，请参阅*《AWS WAF Developer Guide》*中的 [Logging AWS WAF web ACL traffic](https://docs.aws.amazon.com/waf/latest/developerguide/logging.html)。

有关如何使用 Athena 分析 AWS WAF 日志以深入了解威胁检测和潜在安全攻击的信息，请参阅 AWS 网络和内容交付博客文章 [How to use Amazon Athena queries to analyze AWS WAF logs and provide the visibility needed for threat detection](https://aws.amazon.com/blogs/networking-and-content-delivery/how-to-use-amazon-athena-queries-to-analyze-aws-waf-logs-and-provide-the-visibility-needed-for-threat-detection/)。

有关如何将 AWS WAF 日志聚合到中央数据湖存储库并使用 Athena 进行查询的示例，请参阅 AWS 大数据博客文章 [Analyzing AWS WAF logs with OpenSearch Service, Amazon Athena, and Quick](https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/)。

本主题提供了分区投影、手动分区和不使用任何分区的 `CREATE TABLE` 语句示例。

**注意**  
本主题中的 `CREATE TABLE` 语句可以用于 v1 和 v2 AWS WAF 日志。在 v1 中，`webaclid` 字段包含一个 ID。在 v2 中，`webaclid` 字段包含完整的 ARN。这里的 `CREATE TABLE` 语句通过使用 `string` 数据类型未知地处理此内容。

**Topics**
+ [使用分区投影为 Athena 中的 AWS WAF S3 日志创建表](create-waf-table-partition-projection.md)
+ [使用手动分区为 Athena 中的 AWS WAF S3 日志创建表](create-waf-table-manual-partition.md)
+ [创建不进行分区的 AWS WAF 日志表](create-waf-table.md)
+ [AWS WAF 日志的示例查询](query-examples-waf-logs.md)

# 使用分区投影为 Athena 中的 AWS WAF S3 日志创建表
<a name="create-waf-table-partition-projection"></a>

由于 AWS WAF 日志具有您可以预先指定其分区方案的已知结构，因此您可以使用 Athena [分区投影](partition-projection.md)功能减少查询运行时间并自动管理分区。当添加新数据时，分区投影会自动添加新分区。这样就不必使用 `ALTER TABLE ADD PARTITION` 手动添加分区了。

以下示例 `CREATE TABLE` 语句会自动在 AWS WAF 日志上从指定日期开始到当前日期为止，为四个不同 AWS 区域使用分区投影。本示例中的 `PARTITION BY` 子句按区域和日期进行分区，但您可以根据自己的要求修改此子句。根据需要修改字段以匹配您的日志输出。在 `LOCATION` 和 `storage.location.template` 子句中，将 *amzn-s3-demo-bucket* 和 *AWS\$1ACCOUNT\$1NUMBER* 占位符替换为值，该值标识 AWS WAF 日志在 Amazon S3 存储桶中的位置。对于 `projection.day.range`，将 *2021*/*01*/*01* 替换为要使用的开始日期。成功运行查询后，您可以查询表。您无需运行 `ALTER TABLE ADD PARTITION` 来加载分区。

```
CREATE EXTERNAL TABLE `waf_logs_partition_projection`(
  `timestamp` bigint, 
  `formatversion` int, 
  `webaclid` string, 
  `terminatingruleid` string, 
  `terminatingruletype` string, 
  `action` string, 
  `terminatingrulematchdetails` array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>, 
  `httpsourcename` string, 
  `httpsourceid` string, 
  `rulegrouplist` array<struct<rulegroupid:string,terminatingrule:struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>>,nonterminatingmatchingrules:array<struct<ruleid:string,action:string,overriddenaction:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>,excludedrules:string>>, 
  `ratebasedrulelist` array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>, 
  `nonterminatingmatchingrules` array<struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>, 
  `requestheadersinserted` array<struct<name:string,value:string>>, 
  `responsecodesent` string, 
  `httprequest` struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string,fragment:string,scheme:string,host:string>,
  `labels` array<struct<name:string>>, 
  `captcharesponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `challengeresponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `ja3fingerprint` string, 
  `ja4fingerprint` string, 
  `oversizefields` string, 
  `requestbodysize` int, 
  `requestbodysizeinspectedbywaf` int)
  PARTITIONED BY ( 
   `log_time` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/testui/'
TBLPROPERTIES (
 'projection.enabled'='true',
  'projection.log_time.format'='yyyy/MM/dd/HH/mm',
  'projection.log_time.interval'='1',
  'projection.log_time.interval.unit'='minutes',
  'projection.log_time.range'='2025/01/01/00/00,NOW',
  'projection.log_time.type'='date',
  'storage.location.template'='s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/testui/${log_time}')
```

**注意**  
示例中 `LOCATION` 子句中的路径格式是标准格式，但可能因所实施的 AWS WAF 配置而异。例如，以下示例 AWS WAF 日志路径适用于 CloudFront 分配：  

```
s3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/cloudfronyt/2025/01/01/00/00/
```
如果您在创建或查询 AWS WAF 日志表时遇到问题，请确认日志数据位置或联系 [支持](https://console.aws.amazon.com/support/home/)。

更多有关分区投影的信息，请参阅 [将分区投影与 Amazon Athena 结合使用](partition-projection.md)。

# 使用手动分区为 Athena 中的 AWS WAF S3 日志创建表
<a name="create-waf-table-manual-partition"></a>

本节介绍了如何使用手动分区来为 AWS WAF 日志创建表。

在 `LOCATION` 和 `storage.location.template` 子句中，将 *amzn-s3-demo-bucket* 和 *AWS\$1ACCOUNT\$1NUMBER* 占位符替换为值，该值标识 AWS WAF 日志在 Amazon S3 存储桶中的位置。

```
CREATE EXTERNAL TABLE `waf_logs_manual_partition`(
  `timestamp` bigint, 
  `formatversion` int, 
  `webaclid` string, 
  `terminatingruleid` string, 
  `terminatingruletype` string, 
  `action` string, 
  `terminatingrulematchdetails` array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>, 
  `httpsourcename` string, 
  `httpsourceid` string, 
  `rulegrouplist` array<struct<rulegroupid:string,terminatingrule:struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>>,nonterminatingmatchingrules:array<struct<ruleid:string,action:string,overriddenaction:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>,excludedrules:string>>, 
  `ratebasedrulelist` array<struct<ratebasedruleid:string,limitkey:string,maxrateallowed:int>>, 
  `nonterminatingmatchingrules` array<struct<ruleid:string,action:string,rulematchdetails:array<struct<conditiontype:string,sensitivitylevel:string,location:string,matcheddata:array<string>>>,challengeresponse:struct<responsecode:string,solvetimestamp:string>,captcharesponse:struct<responsecode:string,solvetimestamp:string>>>, 
  `requestheadersinserted` array<struct<name:string,value:string>>, 
  `responsecodesent` string, 
  `httprequest` struct<clientip:string,country:string,headers:array<struct<name:string,value:string>>,uri:string,args:string,httpversion:string,httpmethod:string,requestid:string,fragment:string,scheme:string,host:string>, 
  `labels` array<struct<name:string>>, 
  `captcharesponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `challengeresponse` struct<responsecode:string,solvetimestamp:string,failurereason:string>, 
  `ja3fingerprint` string, 
  `ja4fingerprint` string, 
  `oversizefields` string, 
  `requestbodysize` int, 
  `requestbodysizeinspectedbywaf` int)
  PARTITIONED BY ( `year` string, `month` string, `day` string, `hour` string, `min` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://amzn-s3-demo-bucket/AWSLogs/AWS_ACCOUNT_NUMBER/WAFLogs/cloudfront/webacl/'
```

# 创建不进行分区的 AWS WAF 日志表
<a name="create-waf-table"></a>

本节介绍如何创建不进行分区或分区投影的 AWS WAF 日志表。

**注意**  
出于性能和成本原因，不建议使用非分区架构进行查询。有关更多信息，请参阅 AWS 大数据博客中的 [Top 10 Performance Tuning Tips for Amazon Athena](https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-tips-for-amazon-athena/)（Amazon Athena 的十大性能优化技巧）。

**创建 AWS WAF 表**

1. 将以下 DDL 语句复制并粘贴到 Athena 控制台中。根据需要修改字段以匹配您的日志输出。修改 Amazon S3 存储桶的 `LOCATION` 以对应用于存储日志的存储桶。

   此查询使用 [OpenX JSON SerDe](openx-json-serde.md)。
**注意**  
SerDe 期望每个 JSON 文档都位于单行文本中，并且不使用行终止字符分隔记录中的字段。如果 JSON 文本采用美观的打印格式，当您在创建表后尝试对其进行查询时，可能会收到类似以下内容的错误消息：HIVE\$1CURSOR\$1ERROR: Row is not a valid JSON Object（HIVE\$1CURSOR\$1ERROR：行不是有效的 JSON 对象）或 HIVE\$1CURSOR\$1ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT（HIVE\$1CURSOR\$1ERROR：JsonParseException：意外的输入结束：对象的预期关闭标记）。有关更多信息，请参阅 GitHub 上 OpenX SerDe 文档中的 [JSON 数据文件](https://github.com/rcongiu/Hive-JSON-Serde#json-data-files)。

   ```
   CREATE EXTERNAL TABLE `waf_logs`(
     `timestamp` bigint,
     `formatversion` int,
     `webaclid` string,
     `terminatingruleid` string,
     `terminatingruletype` string,
     `action` string,
     `terminatingrulematchdetails` array <
                                       struct <
                                           conditiontype: string,
                                           sensitivitylevel: string,
                                           location: string,
                                           matcheddata: array < string >
                                             >
                                        >,
     `httpsourcename` string,
     `httpsourceid` string,
     `rulegrouplist` array <
                         struct <
                             rulegroupid: string,
                             terminatingrule: struct <
                                                 ruleid: string,
                                                 action: string,
                                                 rulematchdetails: array <
                                                                      struct <
                                                                          conditiontype: string,
                                                                          sensitivitylevel: string,
                                                                          location: string,
                                                                          matcheddata: array < string >
                                                                             >
                                                                       >
                                                   >,
                             nonterminatingmatchingrules: array <
                                                                 struct <
                                                                     ruleid: string,
                                                                     action: string,
                                                                     overriddenaction: string,
                                                                     rulematchdetails: array <
                                                                                          struct <
                                                                                              conditiontype: string,
                                                                                              sensitivitylevel: string,
                                                                                              location: string,
                                                                                              matcheddata: array < string >
                                                                                                 >
                                                                      >,
                                                                     challengeresponse: struct <
                                                                               responsecode: string,
                                                                               solvetimestamp: string
                                                                                 >,
                                                                     captcharesponse: struct <
                                                                               responsecode: string,
                                                                               solvetimestamp: string
                                                                                 >
                                                                       >
                                                                >,
                             excludedrules: string
                               >
                          >,
   `ratebasedrulelist` array <
                            struct <
                                ratebasedruleid: string,
                                limitkey: string,
                                maxrateallowed: int
                                  >
                             >,
     `nonterminatingmatchingrules` array <
                                       struct <
                                           ruleid: string,
                                           action: string,
                                           rulematchdetails: array <
                                                                struct <
                                                                    conditiontype: string,
                                                                    sensitivitylevel: string,
                                                                    location: string,
                                                                    matcheddata: array < string >
                                                                       >
                                                                >,
                                           challengeresponse: struct <
                                                               responsecode: string,
                                                               solvetimestamp: string
                                                                >,
                                           captcharesponse: struct <
                                                               responsecode: string,
                                                               solvetimestamp: string
                                                                >
                                             >
                                        >,
     `requestheadersinserted` array <
                                   struct <
                                       name: string,
                                       value: string
                                         >
                                    >,
     `responsecodesent` string,
     `httprequest` struct <
                       clientip: string,
                       country: string,
                       headers: array <
                                   struct <
                                       name: string,
                                       value: string
                                         >
                                    >,
                       uri: string,
                       args: string,
                       httpversion: string,
                       httpmethod: string,
                       requestid: string
                         >,
     `labels` array <
                  struct <
                      name: string
                        >
                   >,
     `captcharesponse` struct <
                           responsecode: string,
                           solvetimestamp: string,
                           failureReason: string
                             >,
     `challengeresponse` struct <
                           responsecode: string,
                           solvetimestamp: string,
                           failureReason: string
                           >,
     `ja3Fingerprint` string,
     `oversizefields` string,
     `requestbodysize` int,
     `requestbodysizeinspectedbywaf` int
   )
   ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
   STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
   LOCATION 's3://amzn-s3-demo-bucket/prefix/'
   ```

1. 在 Athena 控制台查询编辑器中运行 `CREATE EXTERNAL TABLE` 语句。这将注册 `waf_logs` 表，并使其中的数据可用于来自 Athena 的查询。

# AWS WAF 日志的示例查询
<a name="query-examples-waf-logs"></a>

本部分中许多示例查询使用之前创建的分区投影表。根据您的要求修改示例中的表名称、列值和其他变量。若要提高查询的性能并降低成本，请在筛选条件中添加分区列。

**Topics**
+ [统计引用站点、IP 地址或匹配的规则](query-examples-waf-logs-count.md)
+ [使用日期和时间进行查询](query-examples-waf-logs-date-time.md)
+ [查询被阻止的请求或地址](query-examples-waf-logs-blocked-requests.md)

# 统计引用站点、IP 地址或匹配的规则
<a name="query-examples-waf-logs-count"></a>

本部分中的示例查询相关日志项的计数。
+ [Count the number of referrers that contain a specified term](#waf-example-count-referrers-with-specified-term)
+ [Count all matched IP addresses in the last 10 days that have matched excluded rules](#waf-example-count-matched-ip-addresses)
+ [Group all counted managed rules by the number of times matched](#waf-example-group-managed-rules-by-times-matched)
+ [Group all counted custom rules by number of times matched](#waf-example-group-custom-rules-by-times-matched)

**Example – 统计包含指定术语的引用站点数量**  
以下查询计算指定日期范围内包含“amazon”一词的引用者数量。  

```
WITH test_dataset AS 
  (SELECT header FROM waf_logs
    CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01'
    AND "date" < '2021/03/31')
SELECT COUNT(*) referer_count 
FROM test_dataset 
WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
```

**Example – 统计过去 10 天内与排除规则匹配的所有匹配 IP 地址**  
以下查询计算过去 10 天内 IP 地址与规则组中排除规则匹配的次数。  

```
WITH test_dataset AS 
  (SELECT * FROM waf_logs 
    CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups))
SELECT 
  COUNT(*) AS count, 
  "httprequest"."clientip", 
  "allrulegroups"."excludedrules",
  "allrulegroups"."ruleGroupId"
FROM test_dataset 
WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day
GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules"
ORDER BY count DESC
```

**Example - 按匹配次数对所有已计数的托管规则进行分组**  
如果您在 2022 年 10 月 27 日之前在 Web ACL 配置中将规则组规则操作设置为“计数”，AWS WAF 在 Web ACL JSON 中将覆盖内容保存为 `excludedRules`。现在，用于将规则替换为“计数”的 JSON 设置位于 `ruleActionOverrides` 设置中。有关更多信息，请参阅《AWS WAF 开发人员指南》**中的[规则组中的操作覆盖](https://docs.aws.amazon.com/waf/latest/developerguide/web-acl-rule-group-override-options.html)。要从新的日志结构中提取计数模式下的托管规则，请在 `ruleGroupList` 部分而不是 `excludedRules` 字段中查询 `nonTerminatingMatchingRules`，如下例所示。  

```
SELECT
 count(*) AS count,
 httpsourceid,
 httprequest.clientip,
 t.rulegroupid, 
 t.nonTerminatingMatchingRules
FROM "waf_logs" 
CROSS JOIN UNNEST(rulegrouplist) AS t(t) 
WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 
GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid 
ORDER BY "count" DESC 
Limit 50
```

**Example - 按匹配次数对所有已计数的自定义规则进行分组**  
以下查询按匹配次数对所有已计数的自定义规则进行分组。  

```
SELECT
  count(*) AS count,
         httpsourceid,
         httprequest.clientip,
         t.ruleid,
         t.action
FROM "waf_logs" 
CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) 
WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 
GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip 
ORDER BY "count" DESC
Limit 50
```

有关自定义规则和托管规则组的日志位置的信息，请参阅《AWS WAF 开发人员指南》**中的[监控和调整](https://docs.aws.amazon.com/waf/latest/developerguide/web-acl-testing-activities.html)。

# 使用日期和时间进行查询
<a name="query-examples-waf-logs-date-time"></a>

本部分中的示例包括使用日期和时间值的查询。
+ [Return the timestamp field in human-readable ISO 8601 format](#waf-example-return-human-readable-timestamp)
+ [Return records from the last 24 hours](#waf-example-return-records-last-24-hours)
+ [Return records for a specified date range and IP address](#waf-example-return-records-date-range-and-ip)
+ [For a specified date range, count the number of IP addresses in five minute intervals](#waf-example-count-ip-addresses-in-date-range)
+ [Count the number of X-Forwarded-For IP in the last 10 days](#waf-example-count-x-forwarded-for-ip)

**Example – 以人类可读 ISO 8601 格式返回时间戳字段**  
以下查询使用 `from_unixtime` 和 `to_iso8601` 函数以人类可读的 ISO 8601 格式返回 `timestamp` 字段（例如 `2019-12-13T23:40:12.000Z` 而不是 `1576280412771`）。该查询还返回 HTTP 源名称、源 ID 和请求。  

```
SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601,
       httpsourcename,
       httpsourceid,
       httprequest
FROM waf_logs
LIMIT 10;
```

**Example – 返回过去 24 小时的记录**  
以下查询使用 `WHERE` 子句中的筛选条件返回过去 24 小时内记录的 HTTP 源名称、HTTP 源 ID 和 HTTP 请求字段。  

```
SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, 
       httpsourcename, 
       httpsourceid, 
       httprequest 
FROM waf_logs
WHERE from_unixtime(timestamp/1000) > now() - interval '1' day
LIMIT 10;
```

**Example – 返回指定日期范围和 IP 地址的记录**  
以下查询列出了指定的客户端 IP 地址在指定日期范围内的记录。  

```
SELECT * 
FROM waf_logs 
WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
```

**Example – 对于指定的日期范围，计算在五分钟间隔内的 IP 地址数**  
对于特定日期范围，以下查询计算在五分钟间隔内的 IP 地址数。  

```
WITH test_dataset AS 
  (SELECT 
     format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts,
     "httprequest"."clientip" 
     FROM waf_logs 
     WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31')
SELECT five_minutes_ts,"clientip",count(*) ip_count 
FROM test_dataset 
GROUP BY five_minutes_ts,"clientip"
```

**Example – 计算过去 10 天内 X-Forwarded-For IP 的数量**  
以下查询将筛选请求标头，并统计过去 10 天内 X-Forwarded-For IP 的数量。  

```
WITH test_dataset AS
  (SELECT header
   FROM waf_logs
   CROSS JOIN UNNEST (httprequest.headers) AS t(header)
   WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) 
SELECT header.value AS ip,
       count(*) AS COUNT 
FROM test_dataset 
WHERE header.name='X-Forwarded-For' 
GROUP BY header.value 
ORDER BY COUNT DESC
```

有关日期和时间函数的更多信息，请参阅 Trino 文档中的 [日期与时间函数和运算符](https://trino.io/docs/current/functions/datetime.html)。

# 查询被阻止的请求或地址
<a name="query-examples-waf-logs-blocked-requests"></a>

本部分中的示例查询被阻止的请求或地址。
+ [Extract the top 100 IP addresses blocked by a specified rule type](#waf-example-extract-top-100-blocked-ip-by-rule)
+ [Count the number of times a request from a specified country has been blocked](#waf-example-count-request-blocks-from-country)
+ [Count the number of times a request has been blocked, grouping by specific attributes](#waf-example-count-request-blocks-by-attribute)
+ [Count the number of times a specific terminating rule ID has been matched](#waf-example-count-terminating-rule-id-matches)
+ [Retrieve the top 100 IP addresses blocked during a specified date range](#waf-example-top-100-ip-addresses-blocked-for-date-range)

**Example – 提取被指定规则类型阻止的前 100 个 IP 地址**  
下面的查询将提取并统计在指定的日期范围内被 `RATE_BASED` 终止规则阻止的前 100 个 IP 地址。  

```
SELECT COUNT(httpRequest.clientIp) as count,
httpRequest.clientIp
FROM waf_logs
WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01'
AND "date" < '2021/03/31'
GROUP BY httpRequest.clientIp
ORDER BY count DESC
LIMIT 100
```

**Example – 计算来自指定国家/地区的请求被阻止的次数**  
以下查询针对来自属于爱尔兰 (IE) IP 地址的请求，计算请求到达但被 `RATE_BASED` 终止规则阻止的次数。  

```
SELECT 
  COUNT(httpRequest.country) as count, 
  httpRequest.country 
FROM waf_logs
WHERE 
  terminatingruletype='RATE_BASED' AND 
  httpRequest.country='IE'
GROUP BY httpRequest.country
ORDER BY count
LIMIT 100;
```

**Example – 计算请求被阻止的次数，按特定属性分组**  
以下查询计算请求被阻止的次数，并按照 WebACL、RuleId、ClientIP 和 HTTP 请求 URI 对结果分组。  

```
SELECT 
  COUNT(*) AS count,
  webaclid,
  terminatingruleid,
  httprequest.clientip,
  httprequest.uri
FROM waf_logs
WHERE action='BLOCK'
GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;
```

**Example – 计算特定终止规则 ID 匹配的次数**  
以下查询计算特定终止规则 ID 匹配的次数 (`WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e'`)。然后，查询按照 WebACL、操作、ClientIP 和 HTTP 请求 URI 对结果分组。  

```
SELECT 
  COUNT(*) AS count,
  webaclid,
  action,
  httprequest.clientip,
  httprequest.uri
FROM waf_logs
WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e'
GROUP BY webaclid, action, httprequest.clientip, httprequest.uri
ORDER BY count DESC
LIMIT 100;
```

**Example – 检索指定日期范围内被阻止的前 100 个 IP 地址**  
以下查询将提取在指定日期范围内被阻止的前 100 个 IP 地址。该查询还列出了 IP 地址被阻止的次数。  

```
SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country"
FROM waf_logs
WHERE "action" = 'BLOCK' and "date" >= '2021/03/01'
AND "date" < '2021/03/31'
GROUP BY "httprequest"."clientip", "httprequest"."country"
ORDER BY "ipcount" DESC limit 100
```