

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Hudi 数据集
<a name="emr-hudi-work-with-dataset"></a>

Hudi 支持通过 Spark 在 Hudi 数据集中插入、更新和删除数据。有关更多信息，请参阅 Apache Hudi 文档中的[写入 Hudi 表格](https://hudi.apache.org/docs/writing_data.html)。

以下示例演示如何启动交互式 Spark Shell、使用 Spark 提交，或如何使用 Amazon EMR Notebooks 在 Amazon EMR 上使用 Hudi。您也可以使用 Hudi DeltaStreamer 实用程序或其他工具写入数据集。在本节中，示例演示使用 Spark shell 处理数据集，同时使用 SSH 作为默认 `hadoop` 用户连接到主节点。

## 使用 Amazon EMR 6.7 及更高版本启动 Spark Shell
<a name="hudi-datasets-67"></a>

运行 `spark-shell`、`spark-submit` 或 `spark-sql` 使用 Amazon EMR 6.7.0 或更高版本时，传递以下命令。

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi 0.11.0-amzn-0](https://hudi.apache.org/)，相比于之前的 Hudi 版本有明显改进。有关更多信息，请参阅 [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)（《Apache Hudi 0.11.0 迁移指南》）。此选项卡上的示例反映了这些更改。

**在主节点上打开 Spark Shell**

1. 使用 SSH 连接到主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳，请*spark-shell*替换为*pyspark*。

   ```
   spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \    
   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"  \
   --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
   ```

## 使用 Amazon EMR 6.6 及更早版本启动 Spark Shell
<a name="hudi-datasets-67"></a>

运行 `spark-shell`、`spark-submit` 或 `spark-sql` 使用 Amazon EMR 6.6.x 或更早版本时，传递以下命令。

**注意**  
Amazon EMR 6.2 和 5.31 及更高版本（Hudi 0.6.x 及更高版本）可以在配置中省略 `spark-avro.jar`。
Amazon EMR 6.5 和 5.35 及更高版本（Hudi 0.9.x 及更高版本）可以从配置中省略 `spark.sql.hive.convertMetastoreParquet=false`。
Amazon EMR 6.6 和 5.36 及更高版本（Hudi 0.10.x 及更高版本）必须包含 [Version: 0.10.0 Spark Guide](https://hudi.apache.org/docs/0.10.0/quick-start-guide/)（《版本：0.10.0 Spark 指南》）中所述的 `HoodieSparkSessionExtension` 配置：  

  ```
  --conf  "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
  ```

**在主节点上打开 Spark Shell**

1. 使用 SSH 连接到主节点。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[使用 SSH 连接到主节点](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 输入以下命令以启动 Spark shell。要使用 PySpark 外壳，请*spark-shell*替换为*pyspark*。

   ```
   spark-shell \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
   ```

## 使用 Amazon EMR 6.7 及更高版本将 Hudi 与 Amazon EMR Notebooks 结合使用
<a name="hudi-datasets-notebooks"></a>

要将 Hudi 与 Amazon EMR Notebooks 结合使用，您必须首先将 Hudi jar 文件从本地文件系统复制到 Notebook 集群的主节点 上的 HDFS。然后，您可以使用 Notebook 编辑器来配置 EMR Notebook 以使用 Hudi。

**将 Hudi 与 Amazon EMR Notebooks 搭配使用**

1. 为 Amazon EMR Notebooks 创建并启动集群。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[为 Notebook 创建 Amazon EMR 集群](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html)。

1. 使用 SSH 连接到集群的主节点，然后将 jar 文件从本地文件系统复制到 HDFS，如以下示例所示。在此示例中，我们在 HDFS 中创建了一个目录，以便清晰地管理文件。如果需要，您可以在 HDFS 中选择自己的目的地。

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

1. 打开 Notebook 编辑器，输入以下示例中的代码，然后运行它。

   ```
   %%configure
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
               "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
             }}
   ```

## 使用 Amazon EMR 6.6 及更早版本将 Hudi 与 Amazon EMR Notebooks 结合使用
<a name="hudi-datasets-notebooks-66"></a>

要将 Hudi 与 Amazon EMR Notebooks 结合使用，您必须首先将 Hudi jar 文件从本地文件系统复制到 Notebook 集群的主节点 上的 HDFS。然后，您可以使用 Notebook 编辑器来配置 EMR Notebook 以使用 Hudi。

**将 Hudi 与 Amazon EMR Notebooks 搭配使用**

1. 为 Amazon EMR Notebooks 创建并启动集群。有关更多信息，请参阅《Amazon EMR 管理指南》**中的[为 Notebook 创建 Amazon EMR 集群](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html)。

1. 使用 SSH 连接到集群的主节点，然后将 jar 文件从本地文件系统复制到 HDFS，如以下示例所示。在此示例中，我们在 HDFS 中创建了一个目录，以便清晰地管理文件。如果需要，您可以在 HDFS 中选择自己的目的地。

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
   ```

1. 打开 Notebook 编辑器，输入以下示例中的代码，然后运行它。

   ```
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.hive.convertMetastoreParquet":"false"
             }}
   ```

## 初始化 Hudi 的 Spark 会话
<a name="emr-hudi-initialize-session"></a>

使用 Scala 时，您必须在 Spark 会话中导入以下类。这需要在每个 Spark 会话中完成一次。

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
```

## 写入 Hudi 数据集
<a name="emr-hudi-dataframe"></a>

以下示例说明如何创建 DataFrame 并将其写为 Hudi 数据集。

**注意**  
要将代码示例粘贴到 Spark shell 中，请在提示符处键入 **:paste**，粘贴示例，然后按 **CTRL** \$1 **D**。

每次向 Hudi 数据集写入时，都必须指定`DataSourceWriteOptions`。 DataFrame 这些选项中的许多选项在写入操作之间可能是相同的。以下示例使用 `hudiOptions` 变量指定常用选项，随后的示例使用这些选项。

### 使用 Amazon EMR 6.7 及更高版本的 Scala 进行写入
<a name="scala-examples-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi 0.11.0-amzn-0](https://hudi.apache.org/)，相比于之前的 Hudi 版本有明显改进。有关更多信息，请参阅 [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)（《Apache Hudi 0.11.0 迁移指南》）。此选项卡上的示例反映了这些更改。

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TBL_NAME.key -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
  HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
  HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName",
  HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 Amazon EMR 6.6 及更早版本的 Scala 进行写入
<a name="scala-examples-66"></a>

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用写作 PySpark
<a name="pyspark-examples"></a>

```
# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'tableName',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'tableName',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

**注意**  
您可能会在代码示例和通知中看到“hoodie”而不是 Hudi。Hudi 代码库广泛使用旧的“hoodie”拼写。


**DataSourceWriteOptions Hudi 的参考资料**  

| 选项 | 描述 | 
| --- | --- | 
|  TABLE\$1NAME  |  要在其中注册数据集的表名称。  | 
|  TABLE\$1TYPE\$1OPT\$1KEY  |  可选。指定数据集是创建为 `"COPY_ON_WRITE"` 还是 `"MERGE_ON_READ"`。默认值为 `"COPY_ON_WRITE"`。  | 
|  RECORDKEY\$1FIELD\$1OPT\$1KEY  |  其值将用作 `HoodieKey` 的 `recordKey` 组件的记录键字段。实际值将通过对字段值调用 `.toString()` 来获得。可使用点表示法指定嵌套字段，例如 `a.b.c`。  | 
|  PARTITIONPATH\$1FIELD\$1OPT\$1KEY  |  其值将用作 `HoodieKey` 的 `partitionPath` 组件的分区路径字段。实际值将通过对字段值调用 `.toString()` 来获得。  | 
|  PRECOMBINE\$1FIELD\$1OPT\$1KEY  |  在实际写入之前在预合并中使用的字段。如果两个记录具有相同的键值，Hudi 为预合并选择字段值最大的记录（由 `Object.compareTo(..)` 确定）。  | 

仅在元数据仓中注册 Hudi 数据集表时才需要以下选项。如果您未将 Hudi 数据集注册为 Hive 元数据仓中的表，则不需要这些选项。


**DataSourceWriteOptions Hive 的参考资料**  

| 选项 | 描述 | 
| --- | --- | 
|  HIVE\$1DATABASE\$1OPT\$1KEY  |  要同步到的 Hive 数据库。默认值为 `"default"`。  | 
|  HIVE\$1PARTITION\$1EXTRACTOR\$1CLASS\$1OPT\$1KEY  |  用于将分区字段值提取到 Hive 分区列中的类。  | 
|  HIVE\$1PARTITION\$1FIELDS\$1OPT\$1KEY  |  数据集中用于确定 Hive 分区列的字段。  | 
|  HIVE\$1SYNC\$1ENABLED\$1OPT\$1KEY  |  设置为 `"true"` 时，将向 Apache Hive 元数据仓注册数据集。默认值为 `"false"`。  | 
|  HIVE\$1TABLE\$1OPT\$1KEY  |  必需。Hive 中要同步到的表的名称。例如，`"my_hudi_table_cow"`。  | 
|  HIVE\$1USER\$1OPT\$1KEY  |  可选。同步时要使用的 Hive 用户名。例如 `"hadoop"`。  | 
|  HIVE\$1PASS\$1OPT\$1KEY  |  可选。由 `HIVE_USER_OPT_KEY` 指定的用户的 Hive 密码。  | 
|  HIVE\$1URL\$1OPT\$1KEY  |  Hive 元数据仓 URL。  | 

## 更新插入数据
<a name="emr-hudi-upsert-to-datasets"></a>

以下示例演示如何通过编写 a DataFrame 来更新插入数据。与之前的插入示例不同，`OPERATION_OPT_KEY` 值设置为 `UPSERT_OPERATION_OPT_VAL`。此外，还指定 `.mode(SaveMode.Append)` 以指示应追加记录。

### 使用 Amazon EMR 6.7 及更高版本的 Scala 进行更新插入
<a name="scala-upsert-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi 0.11.0-amzn-0](https://hudi.apache.org/)，相比于之前的 Hudi 版本有明显改进。有关更多信息，请参阅 [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)（《Apache Hudi 0.11.0 迁移指南》）。此选项卡上的示例反映了这些更改。

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 Amazon EMR 6.6 及更早版本的 Scala 进行更新插入
<a name="scala-upsert-66"></a>

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 Upsert PySpark
<a name="pyspark-upsert"></a>

```
from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value'))

updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

## 删除记录
<a name="emr-hudi-delete-from-datasets"></a>

要硬删除记录，您可以更新插入一个空的负载。在这种情况下，`PAYLOAD_CLASS_OPT_KEY` 选项指定 `EmptyHoodieRecordPayload` 类。该示例使用 upsert 示例中使用的相同方法来指定相同的记录。 DataFrame `updateDF`

### 使用 Amazon EMR 6.7 及更高版本的 Scala 进行删除
<a name="scala-delete-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi 0.11.0-amzn-0](https://hudi.apache.org/)，相比于之前的 Hudi 版本有明显改进。有关更多信息，请参阅 [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)（《Apache Hudi 0.11.0 迁移指南》）。此选项卡上的示例反映了这些更改。

```
(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 Amazon EMR 6.6 及更早版本的 Scala 进行删除
<a name="scala-delete-66"></a>

```
(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用删除 PySpark
<a name="pyspark-delete"></a>

```
updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

您还可以通过以下方式硬删除数据：将 `OPERATION_OPT_KEY ` 设置为 `DELETE_OPERATION_OPT_VAL` 来删除您提交的数据集中的所有记录。有关执行软删除的说明，以及有关删除 Hudi 表中存储的数据的详细信息，请参阅 Apache Hudi 文档中的 [Deletes](https://hudi.apache.org/docs/writing_data.html#deletes)。

## 从 Hudi 数据集读取
<a name="emr-hudi-read-dataset"></a>

要在当前时间点检索数据，Hudi 默认情况下执行快照查询。以下是查询在 [写入 Hudi 数据集](#emr-hudi-dataframe) 中写入 S3 的数据集的示例。*s3://amzn-s3-demo-bucket/myhudidataset*替换为表路径，为每个分区级别添加通配符星号，*再加上一个*星号。在此示例中，有一个分区级别，因此我们添加了两个通配符号。

### 使用 Amazon EMR 6.7 及更高版本的 Scala 进行读取
<a name="scala-read-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi 0.11.0-amzn-0](https://hudi.apache.org/)，相比于之前的 Hudi 版本有明显改进。有关更多信息，请参阅 [Apache Hudi 0.11.0 Migration Guide](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)（《Apache Hudi 0.11.0 迁移指南》）。此选项卡上的示例反映了这些更改。

```
val snapshotQueryDF = spark.read
    .format("hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset") 
    .show()
```

### 使用 Amazon EMR 6.6 及更早版本的 Scala 进行读取
<a name="scala-read-66"></a>

```
(val snapshotQueryDF = spark.read
    .format("org.apache.hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*"))

snapshotQueryDF.show()
```

### 使用 “阅读” PySpark
<a name="pyspark-read"></a>

```
snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*')
    
snapshotQueryDF.show()
```

### 递增查询
<a name="emr-hudi-incremental-query"></a>

您还可以使用 Hudi 执行增量查询，以获取自给定提交时间戳以来已更改的记录流。为此，请将 `QUERY_TYPE_OPT_KEY` 字段设置为 `QUERY_TYPE_INCREMENTAL_OPT_VAL`。然后，为 `BEGIN_INSTANTTIME_OPT_KEY` 添加一个值，以获取自指定时间以来写入的所有记录。递增查询的效率通常是批处理查询的十倍，因为它们只处理更改的记录。

执行增量查询时，请使用根（基）表路径，而不需要用于快照查询的通配符星号。

**注意**  
Presto 不支持递增查询。

#### 使用 Scala 进行增量查询
<a name="scala-incremental-queries"></a>

```
val incQueryDF = spark.read
    .format("org.apache.hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>)
    .load("s3://amzn-s3-demo-bucket/myhudidataset")
     
incQueryDF.show()
```

#### 使用增量查询 PySpark
<a name="pyspark-incremental-queries"></a>

```
readOptions = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': <beginInstantTime>,
}

incQueryDF = spark.read \
    .format('org.apache.hudi') \
    .options(**readOptions) \
    .load('s3://amzn-s3-demo-bucket/myhudidataset')
    
incQueryDF.show()
```

有关从 Hudi 数据集读取的更多信息，请参阅 Apache Hudi 文档中的 [查询 Hudi 表](https://hudi.apache.org/docs/querying_data.html)。