

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Security Lake 查询
<a name="subscriber-query-examples"></a>

您可以查询 Security Lake 存储在 AWS Lake Formation 数据库和表中的数据。您还可以在 Security Lake 控制台、API 或 AWS CLI中创建第三方订阅用户。第三方订阅用户还可以从您指定的来源查询 Lake Formation 数据。

Lake Formation 数据湖管理员必须向查询数据的 IAM 身份授予相关数据库和表的 `SELECT` 权限。订阅用户也必须是在 Security Lake 中创建的，然后才能查询数据。有关如何创建具有查询权限的订阅用户的更多信息，请参阅[管理 Security Lake 订阅用户的查询访问权限](subscriber-query-access.md)。

**使用保留设置查询数据**   
[Amazon S3 生命周期设置](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html)会影响数据的保存时间，而这反过来又会影响您可以查询多久以前。如果您在 Security Lake 中配置了保留设置，则必须在查询中包含基于时间的筛选器，以确保结果集的范围仅限于未过期的数据文件。有关 Security Lake 中数据保留的更多信息，请参阅[生命周期管理](lifecycle-management.md)。

以下各节中的查询示例包括基于时间的过滤器，例如`eventDay`或`time_dt`，以演示这种最佳实践。

**Topics**
+ [

# Security Lake 查询AWS源版本 1 (OCSF 1.0.0-rc.2)
](subscriber-query-examples1.md)
+ [

# AWS源版本 2 的 Security Lake 查询 (OCSF 1.1.0)
](subscriber-query-examples2.md)

# Security Lake 查询AWS源版本 1 (OCSF 1.0.0-rc.2)
<a name="subscriber-query-examples1"></a>

以下部分提供了有关从 Security Lake 中查询数据的指导，并包括源版本 1 中原生支持的AWSAWS源代码的一些查询示例。这些查询旨在检索特定数据AWS 区域。示例使用的是 us-east-1，即美国东部（弗吉尼亚州北部）。此外，示例查询使用 `LIMIT 25` 参数，最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例，请参阅 [Amazon Security Lake OCSF 查询 GitHub 目录](https://github.com/awslabs/aws-security-analytics-bootstrap/tree/main/AWSSecurityAnalyticsBootstrap/amazon_security_lake_queries)。

以下查询包括基于时间的过滤器，`eventDay`用于确保您的查询在配置的保留设置范围内。有关更多信息，请参阅 [Querying data with retention settings](subscriber-query-examples.md#security-lake-retention-setting-query-data)。

例如，如果超过 60 天的数据已过期，则您的查询应包含时间限制，以防止访问过期的数据。对于 60 天的保留期，请在查询中加入以下子句：

```
...
WHERE eventDay BETWEEN cast(date_format(current_date - INTERVAL '59' day, '%Y%m%d') AS varchar) 
                   AND cast(date_format(current_date, '%Y%m%d') AS varchar)
...
```

该条款使用 59 天（而不是 60 天）来避免 Amazon S3 和 Apache Iceberg 之间出现任何数据或时间重叠。

## 日志源表
<a name="log-source-table"></a>

查询 Security Lake 数据时，您必须将数据所在的 Lake Formation 表的名称包含在内。

```
SELECT *
   FROM amazon_security_lake_glue_db_DB_Region.amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE
   WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
   LIMIT 25
```

日志源表的常见值包括以下内容：
+ `cloud_trail_mgmt_1_0`—AWS CloudTrail管理活动
+ `lambda_execution_1_0`— Lambda CloudTrail 的数据事件
+ `s3_data_1_0`— S3 CloudTrail 的数据事件
+ `route53_1_0` – Amazon Route 53 Resolver 查询日志
+ `sh_findings_1_0`—AWS Security Hub CSPM调查结果
+ `vpc_flow_1_0` – Amazon Virtual Private Cloud (Amazon VPC) 流日志

**示例：表中所有来自 us-east `sh_findings_1_0` -1 区域的 Security Hub CSPM 调查结果**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
   LIMIT 25
```

## 数据库区域
<a name="database-region"></a>

查询 Security Lake 数据时，您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表，请参阅 [Amazon Security Lake 端点](https://docs.aws.amazon.com/general/latest/gr/securitylake.html)。

**示例：列出来自源 IP AWS CloudTrail的活动**

以下示例列出了在（2023 年 3 月 1 日）之后*20230301*（2023 年 3 月 1 日）记录的*cloud\$1trail\$1mgmt\$11\$10*来自源 IP *192.0.2.1* 的所有 CloudTrail 活动*us-east-1*`DB_Region`。

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > '20230301' AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

## 分区日期
<a name="partition-date"></a>

通过对数据进行分区，您可以限制每次查询所扫描的数据量，从而提高性能并降低成本。Security Lake 通过 `eventDay`、`region` 和 `accountid` 参数实施分区。`eventDay` 分区采用格式 `YYYYMMDD`。

以下是使用 `eventDay` 分区的查询示例：

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > '20230301'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
```

`eventDay` 的常见值包括以下内容：

**过去 1 年内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '1' year, '%Y%m%d%H') as varchar)`

**过去 1 个月内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '1' month, '%Y%m%d%H') as varchar)`

**过去 30 天内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar)`

**过去 12 个小时内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '12' hour, '%Y%m%d%H') as varchar)`

**过去 5 分钟内发生的事件**  
`> cast(date_format(current_timestamp - INTERVAL '5' minute, '%Y%m%d%H') as varchar)`

**7-14 天前发生的事件**  
`BETWEEN cast(date_format(current_timestamp - INTERVAL '14' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar)`

**在特定日期当天或之后发生的事件**  
`>= '20230301'`

**示例：表中列出了 2023 年 3 月 1 日当天或之后来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay >= '20230301'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

**示例：表中列出了过去 30 天内来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar) 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

# Security Lake 查询 CloudTrail 数据的示例
<a name="cloudtrail-query-examples"></a>

AWS CloudTrail跟踪中的用户活动和 API 使用情况AWS 服务。订阅者可以查询 CloudTrail 数据以了解以下类型的信息：

以下是AWS源版本 1 的一些 CloudTrail 数据查询示例：

**过去 7 天AWS 服务内未经授权的企图**

```
SELECT 
      time, 
      api.service.name, 
      api.operation, 
      api.response.error, 
      api.response.message, 
      unmapped['responseElements'], 
      cloud.region, 
      actor.user.uuid, 
      src_endpoint.ip, 
      http_request.user_agent
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND api.response.error in (
        'Client.UnauthorizedOperation',
        'Client.InvalidPermission.NotFound',
        'Client.OperationNotPermitted',
        'AccessDenied')
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天`192.0.2.1`内来自源 IP 的所有 CloudTrail 活动清单**

```
SELECT 
      api.request.uid, 
      time, 
      api.service.name, 
      api.operation, 
      cloud.region, 
      actor.user.uuid, 
      src_endpoint.ip, 
      http_request.user_agent
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    AND src_endpoint.ip = '127.0.0.1.'
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天内所有 IAM 活动的列表**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
      AND api.service.name = 'iam.amazonaws.com'
    ORDER BY time desc
    LIMIT 25
```

**过去 7 天内使用过凭证 `AIDACKCEVSQ6C2EXAMPLE` 的实例**

```
SELECT 
      actor.user.uid, 
      actor.user.uuid, 
      actor.user.account_uid, 
      cloud.region
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
      AND actor.user.credential_uid = 'AIDACKCEVSQ6C2EXAMPLE'
      LIMIT 25
```

**过去 7 天内失败的 CloudTrail 记录列表**

```
SELECT 
      actor.user.uid, 
      actor.user.uuid, 
      actor.user.account_uid, 
      cloud.region
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE status='failed' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    ORDER BY time DESC
    LIMIT 25
```

# Route 53 解析器查询日志的安全湖查询示例
<a name="route53_1_0-query-examples"></a>

Amazon Route 53 Resolver 查询日志可以跟踪由 Amazon VPC 中的资源进行的 DNS 查询。订阅用户可以查询 Route 53 Resolver 查询日志，以了解以下类型的信息：

以下是AWS源版本 1 的 Route 53 解析器查询日志的一些示例查询：

**过去 7 天 CloudTrail 内的 DNS 查询列表**

```
SELECT 
      time,
      src_endpoint.instance_uid,
      src_endpoint.ip,
      src_endpoint.port,
      query.hostname,
      rcode
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内与 `s3.amazonaws.com` 匹配的 DNS 查询的列表**

```
SELECT 
      time,
      src_endpoint.instance_uid,
      src_endpoint.ip,
      src_endpoint.port,
      query.hostname,
      rcode,
      answers
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE query.hostname LIKE 's3.amazonaws.com.' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内未解析的 DNS 查询的列表**

```
SELECT 
      time, 
      src_endpoint.instance_uid, 
      src_endpoint.ip, 
      src_endpoint.port, 
      query.hostname, 
      rcode, 
      answers
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    WHERE cardinality(answers) = 0 and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

**过去 7 天内解析到 `192.0.2.1`** 的 DNS 查询的列表

```
SELECT 
      time, 
      src_endpoint.instance_uid, 
      src_endpoint.ip, 
      src_endpoint.port, 
      query.hostname, 
      rcode, 
      answer.rdata
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0
    CROSS JOIN UNNEST(answers) as st(answer)
    WHERE answer.rdata='192.0.2.1' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

# Security Lake 对 Security Hub CSPM 调查结果的查询示例
<a name="security-hub-query-examples"></a>

Security Hub CSPM 为您提供安全状态的全面视图，AWS并帮助您根据安全行业标准和最佳实践检查您的环境。Security Hub CSPM 会生成用于安全检查的结果，并接收来自第三方服务的调查结果。

以下是 Security Hub CSPM 调查结果的一些示例查询：

**过去 7 天内严重性等级大于或等于 `MEDIUM` 的新调查发现**

```
SELECT 
      time,
      finding,
      severity
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0_findings
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND severity_id >= 3
      AND state_id = 1
    ORDER BY time DESC
    LIMIT 25
```

**过去 7 天内的重复调查发现**

```
SELECT 
    finding.uid,
    MAX(time) AS time,
    ARBITRARY(region) AS region,
    ARBITRARY(accountid) AS accountid,
    ARBITRARY(finding) AS finding,
    ARBITRARY(vulnerabilities) AS vulnerabilities
FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
GROUP BY finding.uid
LIMIT 25
```

**过去 7 天内的所有非信息性调查发现**

```
SELECT 
      time,
      finding.title,
      finding,
      severity
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE severity != 'Informational' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

**资源为 Amazon S3 存储桶的调查发现（无时间限制）**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE any_match(resources, element -> element.type = 'amzn-s3-demo-bucket')
   LIMIT 25
```

**通用漏洞评分系统 (CVSS) 得分大于 `1` 的调查发现（无时间限制）**

```
SELECT *
   FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
   WHERE any_match(vulnerabilities, element -> element.cve.cvss.base_score > 1.0)
   LIMIT 25
```

**符合通用漏洞披露 (CVE) `CVE-0000-0000` 的调查发现（无时间限制）**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE any_match(vulnerabilities, element -> element.cve.uid = 'CVE-0000-0000')
    LIMIT 25
```

**过去 7 天内从 Security Hub CSPM 发送调查结果的产品数量**

```
SELECT 
      metadata.product.feature.name,
      count(*)
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    GROUP BY metadata.product.feature.name
    ORDER BY metadata.product.feature.name DESC
    LIMIT 25
```

**过去 7 天内调查发现中的资源类型数量**

```
SELECT 
      count(*),
      resource.type
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
        CROSS JOIN UNNEST(resources) as st(resource)
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    GROUP BY resource.type
    LIMIT 25
```

**过去 7 天内调查发现中的易受攻击软件包**

```
SELECT 
      vulnerability
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0,
    UNNEST(vulnerabilities) as t(vulnerability)  
    WHERE vulnerabilities is not null
    LIMIT 25
```

**过去 7 天内发生更改的调查发现**

```
SELECT 
    finding.uid,
    finding.created_time,
    finding.first_seen_time,
    finding.last_seen_time,
    finding.modified_time,
    finding.title,
    state
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar)
    LIMIT 25
```

# Amazon VPC 流日志的安全湖查询示例
<a name="vpc-query-examples"></a>

Amazon Virtual Private Cloud (Amazon VPC) 提供有关进出 VPC 网络接口的 IP 流量的详细信息。

以下是AWS源版本 1 的 Amazon VPC 流日志的一些查询示例：

**最近 7 天的具体AWS 区域流量**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND region in ('us-east-1','us-east-2','us-west-2')
    LIMIT 25
```

**过去 7 天内来自源 IP `192.0.2.1` 和源端口 `22` 的活动的列表**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND src_endpoint.ip = '192.0.2.1'
      AND src_endpoint.port = 22
    LIMIT 25
```

**过去 7 天内不同目标 IP 地址的数量**

```
SELECT
    COUNT(DISTINCT dst_endpoint.ip) 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    LIMIT 25
```

**过去 7 天内源自 198.51.100.0/24 的流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
    AND split_part(src_endpoint.ip,'.', 1)='198'AND split_part(src_endpoint.ip,'.', 2)='51'
    LIMIT 25
```

**过去 7 天内的所有 HTTPS 流量**

```
SELECT
      dst_endpoint.ip as dst, 
      src_endpoint.ip as src, 
      traffic.packets 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND dst_endpoint.port = 443
    GROUP BY 
      dst_endpoint.ip, 
      traffic.packets, 
      src_endpoint.ip 
    ORDER BY traffic.packets DESC 
    LIMIT 25
```

**按过去 7 天内发送到端口 `443` 的连接的数据包数量排序**

```
SELECT
      traffic.packets,
      dst_endpoint.ip
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND dst_endpoint.port = 443 
    GROUP BY 
      traffic.packets,
      dst_endpoint.ip
    ORDER BY traffic.packets DESC
    LIMIT 25
```

**过去 7 天内 IP `192.0.2.1` 和 `192.0.2.2` 之间的所有流量**

```
SELECT
      start_time, 
      end_time, 
      src_endpoint.interface_uid, 
      connection_info.direction,
      src_endpoint.ip,
      dst_endpoint.ip,
      src_endpoint.port,
      dst_endpoint.port,
      traffic.packets,
      traffic.bytes
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND(
        src_endpoint.ip = '192.0.2.1'
        AND dst_endpoint.ip = '192.0.2.2')
      OR (
        src_endpoint.ip = '192.0.2.2'
        AND dst_endpoint.ip = '192.0.2.1')
    ORDER BY start_time ASC
    LIMIT 25
```

**过去 7 天内的所有入站流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND connection_info.direction = 'ingress'
    LIMIT 25
```

**过去 7 天的所有出站流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND connection_info.direction = 'egress'
    LIMIT 25
```

**过去 7 天内所有被拒绝的流量**

```
SELECT * 
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0
    WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) 
      AND type_uid = 400105
    LIMIT 25
```

# AWS源版本 2 的 Security Lake 查询 (OCSF 1.1.0)
<a name="subscriber-query-examples2"></a>

以下部分提供了有关从 Security Lake 中查询数据的指导，并包括源版本 2 原生支持的AWSAWS源代码的一些查询示例。这些查询旨在检索特定数据AWS 区域。示例使用的是 us-east-1，即美国东部（弗吉尼亚州北部）。此外，示例查询使用 `LIMIT 25` 参数，最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例，请参阅 [Amazon Security Lake OCSF 查询 GitHub 目录](https://github.com/awslabs/aws-security-analytics-bootstrap/tree/main/AWSSecurityAnalyticsBootstrap/amazon_security_lake_queries)。

您可以查询 Security Lake 存储在AWS Lake Formation数据库和表中的数据。您还可以在 Security Lake 控制台、API 或AWS CLI中创建第三方订阅用户。第三方订阅用户还可以从您指定的来源查询 Lake Formation 数据。

Lake Formation 数据湖管理员必须向查询数据的 IAM 身份授予相关数据库和表的 `SELECT` 权限。订阅用户也必须是在 Security Lake 中创建的，然后才能查询数据。有关如何创建具有查询权限的订阅用户的更多信息，请参阅[管理 Security Lake 订阅用户的查询访问权限](subscriber-query-access.md)。

以下查询包括基于时间的过滤器，`eventDay`用于确保您的查询在配置的保留设置范围内。有关更多信息，请参阅 [Querying data with retention settings](subscriber-query-examples.md#security-lake-retention-setting-query-data)。

例如，如果超过 60 天的数据已过期，则您的查询应包含时间限制，以防止访问过期的数据。对于 60 天的保留期，请在查询中加入以下子句：

```
...
WHERE time_dt > DATE_ADD('day', -59, CURRENT_TIMESTAMP)
...
```

该条款使用 59 天（而不是 60 天）来避免 Amazon S3 和 Apache Iceberg 之间出现任何数据或时间重叠。

## 日志源表
<a name="log-source-table"></a>

查询 Security Lake 数据时，您必须将数据所在的 Lake Formation 表的名称包含在内。

```
SELECT *
FROM "amazon_security_lake_glue_db_DB_Region"."amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

日志源表的常见值包括以下内容：
+ `cloud_trail_mgmt_2_0`—AWS CloudTrail管理活动
+ `lambda_execution_2_0`— Lambda CloudTrail 的数据事件
+ `s3_data_2_0`— S3 CloudTrail 的数据事件
+ `route53_2_0` – Amazon Route 53 Resolver 查询日志
+ `sh_findings_2_0`—AWS Security Hub CSPM调查结果
+ `vpc_flow_2_0` – Amazon Virtual Private Cloud (Amazon VPC) 流日志
+ `eks_audit_2_0`— 亚马逊 Elastic Kubernetes Service（亚马逊 EKS）审计日志
+ `waf_2_0`—AWS WAF v2 日志

**示例：表中所有来自 us-east `sh_findings_2_0` -1 区域的 Security Hub CSPM 调查结果**

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

## 数据库区域
<a name="database-region"></a>

查询 Security Lake 数据时，您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表，请参阅 [Amazon Security Lake 端点](https://docs.aws.amazon.com/general/latest/gr/securitylake.html)。

**示例：列出来自来源 IP 的亚马逊 Virtual Private Cloud 活动**

以下示例列出了在（2023 年 3 月 1 日）之后*20230301*（2023 年 3 月 1 日）记录的*vpc\$1flow\$12\$10*来自源 IP *192.0.2.1* 的所有 Amazon VPC 活动*us-west-2*`DB_Region`。

```
SELECT * 
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
    WHERE time_dt > TIMESTAMP '2023-03-01' 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time_dt desc
LIMIT 25
```

## 分区日期
<a name="partition-date"></a>

通过对数据进行分区，您可以限制每次查询所扫描的数据量，从而提高性能并降低成本。与 Security Lake 1.0 相比，Security Lake 2.0 中的分区工作 Security Lake 现在通过`time_dt``region`、和`accountid`实现分区。而 Security Lake 1.0 通过`eventDay``region`、和`accountid`参数实现了分区。

查询`time_dt`将自动生成来自 S3 的日期分区，并且可以像 Athena 中任何基于时间的字段一样进行查询。

以下是使用` time_dt`分区查询 2023 年 3 月 1 日之后的日志的查询示例：

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt > TIMESTAMP '2023-03-01'
AND src_endpoint.ip = '192.0.2.1'
ORDER BY time desc
LIMIT 25
```

`time_dt` 的常见值包括以下内容：

**过去 1 年内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' YEAR`

**过去 1 个月内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '1' MONTH`

**过去 30 天内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '30' DAY`

**过去 12 个小时内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '12' HOUR`

**过去 5 分钟内发生的事件**  
`WHERE time_dt > CURRENT_TIMESTAMP - INTERVAL '5' MINUTE`

**7-14 天前发生的事件**  
`WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '14' DAY AND CURRENT_TIMESTAMP - INTERVAL '7' DAY`

**在特定日期当天或之后发生的事件**  
`WHERE time_dt >= TIMESTAMP '2023-03-01'`

**示例：表中列出了 2023 年 3 月 1 日当天或之后来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay >= '20230301'
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

**示例：表中列出了过去 30 天内来自源 IP `192.0.2.1` 的所有 CloudTrail 活动 `cloud_trail_mgmt_1_0`**

```
SELECT *
    FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0
    WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar) 
    AND src_endpoint.ip = '192.0.2.1'
    ORDER BY time desc
    LIMIT 25
```

## 查询安全湖观测数据
<a name="querying-observables-examples"></a>

Observables 是 Security Lake 2.0 现已推出的一项新功能。可观察对象是一个枢轴元素，其中包含在事件中许多地方发现的相关信息。通过查询可观察数据，用户可以从其数据集中获得高级安全见解。

通过查询可观察对象中的特定元素，您可以将数据集限制为诸如特定用户名、资源 UIDs IPs、哈希值和其他 IOC 类型信息之类的内容

这是一个使用 observables 数组查询包含 IP 值 “172.01.02.03” 的 VPC Flow 和 Route53 表中的日志的示例查询

```
WITH a AS 
    (SELECT 
    time_dt,
    observable.name,
    observable.value
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0",
    UNNEST(observables) AS t(observable)
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
    AND observable.value='172.01.02.03'
    AND observable.name='src_endpoint.ip'),
b as 
    (SELECT 
    time_dt,
    observable.name,
    observable.value
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0",
    UNNEST(observables) AS t(observable)
    WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
    AND observable.value='172.01.02.03'
    AND observable.name='src_endpoint.ip')
SELECT * FROM a
LEFT JOIN b ON a.value=b.value and a.name=b.name
LIMIT 25
```

# Security Lake 查询 CloudTrail 数据的示例
<a name="cloudtrail-query-examples-sourceversion2"></a>

AWS CloudTrail跟踪中的用户活动和 API 使用情况AWS 服务。订阅者可以查询 CloudTrail 数据以了解以下类型的信息：

以下是一些针对AWS源版本 2 CloudTrail 的数据查询示例：

**过去 7 天AWS 服务内未经授权的企图**

```
SELECT
    time_dt, 
    api.service.name, 
    api.operation, 
    api.response.error, 
    api.response.message, 
    api.response.data, 
    cloud.region, 
    actor.user.uid, 
    src_endpoint.ip, 
    http_request.user_agent
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND api.response.error in (
    'Client.UnauthorizedOperation',
    'Client.InvalidPermission.NotFound',
    'Client.OperationNotPermitted',
    'AccessDenied')
ORDER BY time desc
LIMIT 25
```

**过去 7 天`192.0.2.1`内来自源 IP 的所有 CloudTrail 活动清单**

```
SELECT
    api.request.uid, 
    time_dt, 
    api.service.name, 
    api.operation, 
    cloud.region, 
    actor.user.uid, 
    src_endpoint.ip, 
    http_request.user_agent
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND src_endpoint.ip = '192.0.2.1.'
ORDER BY time desc
LIMIT 25
```

**过去 7 天内所有 IAM 活动的列表**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND api.service.name = 'iam.amazonaws.com'
ORDER BY time desc
LIMIT 25
```

**过去 7 天内使用过凭证 `AIDACKCEVSQ6C2EXAMPLE` 的实例**

```
SELECT 
      actor.user.uid, 
      actor.user.uid_alt, 
      actor.user.account.uid, 
      cloud.region
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND actor.user.credential_uid = 'AIDACKCEVSQ6C2EXAMPLE'
LIMIT 25
```

**最近 7 天内失败的 CloudTrail 记录列表**

```
SELECT 
      actor.user.uid, 
      actor.user.uid_alt, 
      actor.user.account.uid, 
      cloud.region
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_cloud_trail_mgmt_2_0"
WHERE status='failed' and time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
ORDER BY time DESC
LIMIT 25
```

# Route 53 Resolver 查询日志的查询示例
<a name="route53_1_0-query-examples-sourceversion2"></a>

Amazon Route 53 Resolver 查询日志可以跟踪由 Amazon VPC 中的资源进行的 DNS 查询。订阅用户可以查询 Route 53 Resolver 查询日志，以了解以下类型的信息：

以下是AWS源版本 2 的 Route 53 reesolver 查询日志的一些查询示例：

**过去 7 天 CloudTrail 内的 DNS 查询列表**

```
SELECT 
    time_dt,
    src_endpoint.instance_uid,
    src_endpoint.ip,
    src_endpoint.port,
    query.hostname,
    rcode
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
ORDER BY time DESC
LIMIT 25
```

**过去 7 天内与 `s3.amazonaws.com` 匹配的 DNS 查询的列表**

```
SELECT 
    time_dt,
    src_endpoint.instance_uid,
    src_endpoint.ip,
    src_endpoint.port,
    query.hostname,
    rcode,
    answers
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0"
WHERE query.hostname LIKE 's3.amazonaws.com.' and time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
ORDER BY time DESC
LIMIT 25
```

**过去 7 天内未解析的 DNS 查询的列表**

```
SELECT 
    time_dt,
    src_endpoint.instance_uid, 
    src_endpoint.ip, 
    src_endpoint.port, 
    query.hostname, 
    rcode, 
    answers
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0"
WHERE cardinality(answers) = 0 and time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

**过去 7 天内解析到 `192.0.2.1`** 的 DNS 查询的列表

```
SELECT 
    time_dt,
    src_endpoint.instance_uid, 
    src_endpoint.ip, 
    src_endpoint.port, 
    query.hostname, 
    rcode, 
    answer.rdata
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_route53_2_0",
UNNEST(answers) as st(answer)
WHERE answer.rdata='192.0.2.1' 
AND time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

# Security Lake 对 Security Hub CSPM 调查结果的查询示例
<a name="security-hub-query-examples-sourceversion2"></a>

Security Hub CSPM 为您提供安全状态的全面视图，AWS并帮助您根据安全行业标准和最佳实践检查您的环境。Security Hub CSPM 会生成用于安全检查的结果，并接收来自第三方服务的调查结果。

以下是AWS源版本 2 的 Security Hub CSPM 发现结果的一些查询示例：

**过去 7 天内严重性等级大于或等于 `MEDIUM` 的新调查发现**

```
SELECT
    time_dt,
    finding_info,
    severity_id,
    status
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
    AND severity_id >= 3
    AND status = 'New'
ORDER BY time DESC
LIMIT 25
```

**过去 7 天内的重复调查发现**

```
SELECT 
    finding_info.uid,
    MAX(time_dt) AS time,
    ARBITRARY(region) AS region,
    ARBITRARY(accountid) AS accountid,
    ARBITRARY(finding_info) AS finding,
    ARBITRARY(vulnerabilities) AS vulnerabilities
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
GROUP BY finding_info.uid
LIMIT 25
```

**过去 7 天内的所有非信息性调查发现**

```
SELECT 
    time_dt,
    finding_info.title,
    finding_info,
    severity
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE severity != 'Informational' and time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

**资源为 Amazon S3 存储桶的调查发现（无时间限制）**

```
SELECT *
   FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE any_match(resources, element -> element.type = 'amzn-s3-demo-bucket')
LIMIT 25
```

**通用漏洞评分系统 (CVSS) 得分大于 `1` 的调查发现（无时间限制）**

```
SELECT
    DISTINCT finding_info.uid
    time_dt,
    metadata,
    finding_info,
    vulnerabilities,
    resource
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0",
UNNEST(vulnerabilities) AS t(vulnerability),
UNNEST(vulnerability.cve.cvss) AS t(cvs)
WHERE cvs.base_score > 1.0
AND vulnerabilities is NOT NULL
LIMIT 25
```

**符合通用漏洞披露 (CVE) `CVE-0000-0000` 的调查发现（无时间限制）**

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE any_match(vulnerabilities, element -> element.cve.uid = 'CVE-0000-0000')
LIMIT 25
```

**过去 7 天内从 Security Hub CSPM 发送调查结果的产品数量**

```
SELECT 
    metadata.product.name,
    count(*)
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
GROUP BY metadata.product.name
ORDER BY metadata.product.name DESC
LIMIT 25
```

**过去 7 天内调查发现中的资源类型数量**

```
SELECT
    count(*) AS "Total",
    resource.type
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
GROUP BY resource.type
ORDER BY count(*) DESC
LIMIT 25
```

**过去 7 天内调查发现中的易受攻击软件包**

```
SELECT 
    vulnerabilities
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND vulnerabilities is NOT NULL
LIMIT 25
```

**过去 7 天内发生更改的调查发现**

```
SELECT 
    status,
    finding_info.title,
    finding_info.created_time_dt,
    finding_info,
    finding_info.uid,
    finding_info.first_seen_time_dt,
    finding_info.last_seen_time_dt,
    finding_info.modified_time_dt
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_sh_findings_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
LIMIT 25
```

# Amazon VPC 流日志的安全湖查询示例
<a name="vpc-query-examples-sourceversion2"></a>

Amazon Virtual Private Cloud (Amazon VPC) 提供有关进出 VPC 网络接口的 IP 流量的详细信息。

以下是AWS源版本 2 的 Amazon VPC 流日志的一些查询示例：

**最近 7 天的具体AWS 区域流量**

```
SELECT *
    FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND region in ('us-east-1','us-east-2','us-west-2')
LIMIT 25
```

**过去 7 天内来自源 IP `192.0.2.1` 和源端口 `22` 的活动的列表**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND src_endpoint.ip = '192.0.2.1'
AND src_endpoint.port = 22
LIMIT 25
```

**过去 7 天内不同目标 IP 地址的数量**

```
SELECT 
    COUNT(DISTINCT dst_endpoint.ip) AS "Total"
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
LIMIT 25
```

**过去 7 天内源自 198.51.100.0/24 的流量**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND split_part(src_endpoint.ip,'.', 1)='198'AND split_part(src_endpoint.ip,'.', 2)='51'
LIMIT 25
```

**过去 7 天内的所有 HTTPS 流量**

```
SELECT 
    dst_endpoint.ip as dst, 
    src_endpoint.ip as src, 
    traffic.packets
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND dst_endpoint.port = 443
GROUP BY 
    dst_endpoint.ip, 
    traffic.packets, 
    src_endpoint.ip 
ORDER BY traffic.packets DESC 
LIMIT 25
```

**按过去 7 天内发送到端口 `443` 的连接的数据包数量排序**

```
SELECT 
    traffic.packets,
    dst_endpoint.ip
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND dst_endpoint.port = 443 
GROUP BY 
    traffic.packets,
    dst_endpoint.ip
ORDER BY traffic.packets DESC
LIMIT 25
```

**过去 7 天内 IP `192.0.2.1` 和 `192.0.2.2` 之间的所有流量**

```
SELECT 
    start_time_dt, 
    end_time_dt, 
    src_endpoint.interface_uid, 
    connection_info.direction,
    src_endpoint.ip,
    dst_endpoint.ip,
    src_endpoint.port,
    dst_endpoint.port,
    traffic.packets,
    traffic.bytes
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND(
    src_endpoint.ip = '192.0.2.1'
AND dst_endpoint.ip = '192.0.2.2')
OR (
    src_endpoint.ip = '192.0.2.2'
AND dst_endpoint.ip = '192.0.2.1')
ORDER BY start_time_dt ASC
LIMIT 25
```

**过去 7 天内的所有入站流量**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND connection_info.direction = 'Inbound'
LIMIT 25
```

**过去 7 天的所有出站流量**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND connection_info.direction = 'Outbound'
LIMIT 25
```

**过去 7 天内所有被拒绝的流量**

```
SELECT *
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_vpc_flow_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND action = 'Denied'
LIMIT 25
```

## Amazon EKS 审核日志的安全湖查询示例
<a name="example-queries-eks-sourceversion2"></a>

Amazon EKS 日志跟踪控制平面活动直接从 Amazon EKS 控制平面向您的账户提供审计和诊断 CloudWatch 日志。这些日志可让您轻松地保护和运行您的集群。订阅者可以查询 EKS 日志以了解以下类型的信息。

以下是AWS源版本 2 的 Amazon EKS 审核日志的一些查询示例：

**过去 7 天内对特定 URL 的请求**

```
SELECT 
    time_dt,
    actor.user.name,
    http_request.url.path,
    activity_name
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND activity_name = 'get'
and http_request.url.path = '/apis/coordination.k8s.io/v1/'
LIMIT 25
```

**更新过去 7 天来自 “10.0.97.167” 的请求**

```
SELECT 
    activity_name,
    time_dt,
    api.request,
    http_request.url.path,
    src_endpoint.ip,
    resources
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND src_endpoint.ip = '10.0.97.167'
AND activity_name = 'Update'
LIMIT 25
```

**过去 7 天内与资源 “kube-controller-manager” 关联的请求和响应**

```
SELECT 
    activity_name,
    time_dt,
    api.request,
    api.response,
    resource.name
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_eks_audit_2_0",
UNNEST(resources) AS t(resource)
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP 
AND resource.name = 'kube-controller-manager'
LIMIT 25
```

# AWS WAF v2 日志的 Security Lake 查询示例
<a name="example-queries-waf-sourceversion2"></a>

AWS WAF是一种 Web 应用程序防火墙，可用于监控最终用户向您的应用程序发送的 Web 请求并控制对您的内容的访问。

以下是AWS源版本 2 的AWS WAF v2 日志查询示例：

**在过去 7 天内发布来自特定源 IP 的请求**

```
SELECT 
    time_dt,
    activity_name,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method,
    http_request.http_headers
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_waf_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND src_endpoint.ip = '100.123.123.123'
AND activity_name = 'Post'
LIMIT 25
```

**过去 7 天内与防火墙类型 MANAGED\$1RULE\$1GROUP 匹配的请求**

```
SELECT 
    time_dt,
    activity_name,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method,
    firewall_rule.uid,
    firewall_rule.type,
    firewall_rule.condition,
    firewall_rule.match_location,
    firewall_rule.match_details,
    firewall_rule.rate_limit
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_waf_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND firewall_rule.type = 'MANAGED_RULE_GROUP'
LIMIT 25
```

**过去 7 天内与防火墙规则中的正则表达式匹配的请求**

```
SELECT 
    time_dt,
    activity_name,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method,
    firewall_rule.uid,
    firewall_rule.type,
    firewall_rule.condition,
    firewall_rule.match_location,
    firewall_rule.match_details,
    firewall_rule.rate_limit
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_waf_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND firewall_rule.condition = 'REGEX'
LIMIT 25
```

**拒绝获取过去 7 天内触发AWS WAF规则的AWS凭证请求**

```
SELECT 
    time_dt,
    activity_name,
    action,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method,
    firewall_rule.uid,
    firewall_rule.type
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_waf_2_0" 
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY AND CURRENT_TIMESTAMP
AND http_request.url.path = '/.aws/credentials'
AND action = 'Denied'
LIMIT 25
```

**获取过去 7 天内按国家/地区分组的AWS凭证申请**

```
SELECT count(*) as Total,
    src_endpoint.location.country AS Country,
    activity_name,
    action,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method
FROM "amazon_security_lake_glue_db_us_east_1"."amazon_security_lake_table_us_east_1_waf_2_0"
WHERE time_dt BETWEEN CURRENT_TIMESTAMP - INTERVAL '7' DAY
    AND CURRENT_TIMESTAMP
    AND activity_name = 'Get'
    AND http_request.url.path = '/.aws/credentials'
GROUP BY src_endpoint.location.country,
    activity_name,
    action,
    src_endpoint.ip,
    http_request.url.path,
    http_request.url.hostname,
    http_request.http_method
```