

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 处理延迟到达的数据
<a name="scheduledqueries-patterns-latearrive"></a>

在某些情况下，您的数据可能会延迟很长时间到达，例如，与摄取的行关联的时间戳相比，将数据提取到 Tim LiveAnalytics estream 的时间会明显延迟。在之前的示例中，您已经了解如何使用 @scheduled\_runtime 参数定义的时间范围来处理部分延迟到达的数据。然而，如果存在数据可能延迟数小时或数天的使用案例，则需要采用不同的模式，以确保派生表中的预先计算能及时更新，从而准确反映这些延迟到达的数据。有关延迟到达数据的一般信息，请参阅[写入数据（插入和更新插入）](writes.md#writes.writing-data-inserts-upserts)。

下面将介绍两种处理延迟到达数据的不同方法。
+ 如果数据到达存在可预测的延迟，则可使用另一项“追赶”计划计算来更新聚合，以处理延迟到达的数据。
+ 如果遇到不可预测的延迟偶发性延迟到达的数据，可通过手动执行来更新派生表。

本讨论涵盖数据延迟到达的各种情况。然而，同样的原则也适用于数据更正，即您修改了源表中的数据，并希望更新派生表中的聚合。

**Topics**
+ [计划追赶查询](#scheduledqueries-patterns-latearrive-schedcatchup)
+ [手动执行不可预测的延迟到达数据](#scheduledqueries-patterns-latearrive-manual)

## 计划追赶查询
<a name="scheduledqueries-patterns-latearrive-schedcatchup"></a>

### 查询及时到达的聚合数据
<a name="scheduledqueries-patterns-latearrive-schedcatchup-1"></a>

以下模式介绍如何在数据到达存在可预测延迟的情况下，通过自动化方式更新聚合数据。以下是先前对实时数据进行计划计算的示例。此计划计算每 30 分钟刷新一次派生表，并已考虑最多延迟一小时的数据。

```
{
    "Name": "MultiPT30mPerHrPerTimeseriesDPCount",
    "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 1h AND @scheduled_runtime + 1h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)",
    "ScheduleConfiguration": {
        "ScheduleExpression": "cron(0/30 * * * ? *)"
    },
    "NotificationConfiguration": {
        "SnsConfiguration": {
            "TopicArn": "******"
        }
    },
    "TargetConfiguration": {
        "TimestreamConfiguration": {
            "DatabaseName": "derived",
            "TableName": "dp_per_timeseries_per_hr",
            "TimeColumn": "hour",
            "DimensionMappings": [
                {
                    "Name": "region",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "cell",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "silo",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "availability_zone",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "microservice_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "instance_type",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "os_version",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "instance_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "process_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "jdk_version",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": {
                "TargetMultiMeasureName": "numDataPoints",
                "MultiMeasureAttributeMappings": [
                    {
                        "SourceColumn": "numDataPoints",
                        "MeasureValueType": "BIGINT"
                    }
                ]
            }
        }
    },
    "ErrorReportConfiguration": {
        "S3Configuration" : {
            "BucketName" : "******",
            "ObjectKeyPrefix": "errors",
            "EncryptionOption": "SSE_S3"
        }
    },
    "ScheduledQueryExecutionRoleArn": "******"
}
```

### 追赶查询为延迟到达的数据更新聚合
<a name="scheduledqueries-patterns-latearrive-schedcatchup-2"></a>

现在，如果考虑数据可能延迟约 12 小时的情况。以下是同一查询的变体。但是，不同之处在于，该计算基于延迟长达 12 小时的数据进行聚合，相较于计划计算触发时所用的数据存在时间差。例如，在以下示例查询中，查询的目标时间范围是触发查询前 2 小时至 14 小时之间。此外，如果您注意到计划表达式 ccron(0 0,12 \* \* ? \*)，其每天在 UTC 00:00 和 UTC 12:00 触发计算。因此，当查询在 2021-12-01 00:00:00 触发时，查询会更新 2021-11-30 10:00:00 到 2021-11-30 22:00:00 范围内的聚合。定时查询使用类似于 Timestream for 写入 LiveAnalytics的 upsert 语义，如果窗口中有延迟到达的数据或者找到了较新的聚合（例如，此聚合中出现了一个新的分组，而该分组在触发原始计划计算时并不存在），则此追赶查询将使用较新的值更新聚合值，然后将新的聚合插入到派生表中。同样，如果下一个实例在 2021-12-01 12:00:00 触发，该实例将更新 2021-11-30 22:00:00 到 2021-12-01 10:00:00 范围内的聚合。

```
       {
    "Name": "MultiPT12HPerHrPerTimeseriesDPCountCatchUp",
    "QueryString": "SELECT region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h) as hour, SUM(CASE WHEN measure_name = 'metrics' THEN 20 ELSE 5 END) as numDataPoints FROM raw_data.devops WHERE time BETWEEN bin(@scheduled_runtime, 1h) - 14h AND bin(@scheduled_runtime, 1h) - 2h GROUP BY region, cell, silo, availability_zone, microservice_name, instance_type, os_version, instance_name, process_name, jdk_version, bin(time, 1h)",
    "ScheduleConfiguration": {
        "ScheduleExpression": "cron(0 0,12 * * ? *)"
    },
    "NotificationConfiguration": {
        "SnsConfiguration": {
            "TopicArn": "******"
        }
    },
    "TargetConfiguration": {
        "TimestreamConfiguration": {
            "DatabaseName": "derived",
            "TableName": "dp_per_timeseries_per_hr",
            "TimeColumn": "hour",
            "DimensionMappings": [
                {
                    "Name": "region",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "cell",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "silo",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "availability_zone",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "microservice_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "instance_type",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "os_version",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "instance_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "process_name",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "Name": "jdk_version",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": {
                "TargetMultiMeasureName": "numDataPoints",
                "MultiMeasureAttributeMappings": [
                    {
                        "SourceColumn": "numDataPoints",
                        "MeasureValueType": "BIGINT"
                    }
                ]
            }
        }
    },
    "ErrorReportConfiguration": {
        "S3Configuration" : {
            "BucketName" : "******",
            "ObjectKeyPrefix": "errors",
            "EncryptionOption": "SSE_S3"
        }
    },
    "ScheduledQueryExecutionRoleArn": "******"
}
```

上述示例假设延迟到达时间不超过 12 小时，且对于超出实时窗口的数据，可以每 12 小时更新一次衍生表。您可以调整此模式，使派生表每小时更新一次，从而让派生表更快地反映延迟到达的数据。同样，可将时间范围调整为超过 12 小时，例如一天甚至一周或更长时间，以处理可预测的延迟到达数据。

## 手动执行不可预测的延迟到达数据
<a name="scheduledqueries-patterns-latearrive-manual"></a>

在某些情况下，可能会出现不可预测的延迟到达，或者您对源数据进行了更改并在事后更新了一些值。所有这些情况下，您都可以手动触发计划查询以更新派生表。以下是如何实现这一目标的示例。

假设使用案例是将计算写入派生表 dp\_per\_timeseries\_per\_hr。您在 devops 表中的基础数据已在 2021-11-30 23:00:00 到 2021-12-01 00:00:00 的时间范围内进行更新。有两种不同的计划查询可用于更新此派生表：Multi PT30m PerHrPerTimeseries DPCount 和 Multi PT12 HPer HrPerTimeseries DPCount CatchUp。您在 Timestream 中为其创建的每个计划计算都 LiveAnalytics 有一个唯一的 ARN，您可以在创建计算或执行列表操作时获得该值。您可以使用计算的 ARN 以及查询中 @scheduled\_runtime 参数的值来执行此操作。

假设 Multi 的计算PT30mPerHrPerTimeseriesDPCount 具有 ARN arn\_1，并且您想使用此计算来更新派生表。由于前面的计划计算会更新 @scheduled\_runtime 值前 1 小时和后 1 小时的聚合数据，因此您可以使用 2021-12-01 00:00:00 的 @scheduled\_runtime 参数值，覆盖更新的时间范围（2021-11-30 23:00:00 到 2021-12-01 00:00:00） 您可以使用 ExecuteScheduledQuery API 传递此计算的 ARN 和以纪元秒（UTC）为单位的时间参数值，以实现此目的。以下是使用 AWS CLI 的示例，你可以使用 Timestream SDKs 支持的任何一种来遵循相同的模式。 LiveAnalytics

```
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1
```

在前面的示例中，配置文件是具有相应权限进行此 API 调用的 AWS 配置文件，1638316800 对应于 2021-12-01 00:00:00:00 的时代秒。假设系统在所需时间段触发了此调用，则此手动触发器的工作方式几乎与自动触发器相同。

如果更新周期较长，例如基础数据更新时间为2021-11-30 23:00:00 到 2021-12-01 11:00:00，则可多次触发前置查询以覆盖整个时间范围。例如，您可以执行以下六种不同的操作。

```
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638316800 --profile profile --region us-east-1

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638324000 --profile profile --region us-east-1

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638331200 --profile profile --region us-east-1

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638338400 --profile profile --region us-east-1

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638345600 --profile profile --region us-east-1

aws timestream-query execute-scheduled-query --scheduled-query-arn arn_1 --invocation-time 1638352800 --profile profile --region us-east-1
```

前六个命令对应于在 2021-12-01 00:00:00、2021-12-01 02:00:00、2021-12-01 04:0:00、2021-12-01 06:00:00、2021-12-01 08:00:00、2021-12-01 10:00 调用的计划计算：

或者，您可以使用在 2021-12-01 13:00:00 PT12 HPer HrPerTimeseries DPCount CatchUp 触发的计算单次执行来更新整个 12 小时时间范围内的聚合。例如，如果该计算的 ARN 为 arn\_2，则可通过 CLI 执行以下命令。

```
aws timestream-query execute-scheduled-query --scheduled-query-arn arn_2 --invocation-time 1638363600 --profile profile --region us-east-1
```

值得注意的是，对于手动触发器，您可以为调用时间参数使用时间戳，该时间戳无需与自动触发器的时间戳保持一致。例如，在前面的示例中，您在时间戳 2021-12-01 13:00:00 触发了计算，尽管自动计划仅在 2021-12-01 10:00:00、2021-12-01 12:00:00 以及 2021-12-02 00:00:00 这三个时间戳触发。Timestream for 使您可以 LiveAnalytics 灵活地根据手动操作的需要使用适当的值来触发它。

以下是使用 ExecuteScheduledQuery API 时的一些重要注意事项。
+ 如果您要触发多个此类调用，则需确保这些调用不会在重叠的时间范围内生成结果。例如，在前面的示例中，共有六次调用。每次调用都覆盖 2 小时的时间范围，因此调用时间戳间隔设置为每两小时一次，以避免更新内容产生任何重叠。这可确保派生表中的数据最终状态与源表中的聚合相匹配。如果无法确保时间范围不重叠，请确保这些执行按顺序依次触发。如果您同时触发多个执行且其时间范围存在重叠，则可能观察到触发器失败，在这些执行的错误报告中，您可能会看到版本冲突的提示。计划查询调用生成的结果会根据调用触发的时间分配版本号。因此，由较新调用生成的行具有更高的版本号。较高版本的记录可覆盖较低版本的记录。对于自动触发的计划查询，Timestream fo LiveAnalytics r 会自动管理计划，这样即使后续调用的时间范围重叠，您也不会看到这些问题。
+ 如前所述，您可以使用 @scheduled\_runtime 的任何时间戳值触发调用。因此，您有责任正确设置这些值，以便派生表中与源表中数据更新范围相对应的时间范围得到相应更新。
+ 您也可以使用这些手动触发器以处理处于 DISABLED 状态的计划查询。这使您能够定义特殊查询，这些查询不会在自动化计划中执行，因为其处于 DISABLED 状态。相反，您可以使用这些查询的手动触发器，以管理数据更正或数据延迟到达的使用案例。