

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Hudi 資料集
<a name="emr-hudi-work-with-dataset"></a>

Hudi 支援透過 Spark 插入、更新和刪除 Hudi 資料集中的資料。如需詳細資訊，請參閱 Apache Hudi 文件中的[寫入 Hudi 資料表](https://hudi.apache.org/docs/writing_data.html)。

下列範例示範如何啟動互動式 Spark Shell、使用 Spark 提交，或透過 Amazon EMR Notebooks 在 Amazon EMR 上使用 Hudi。您也可以使用 Hudi DeltaStreamer 公用程式或其他工具來寫入資料集。本節中的範例示範使用 Spark shell 來處理資料集，同時以預設的 `hadoop` 使用者身分使用 SSH 連接到主節點。

## 使用 Amazon EMR 6.7 及更新版本啟動 Spark Shell
<a name="hudi-datasets-67"></a>

在使用 Amazon EMR 6.7.0 或更早版本執行 `spark-shell`、`spark-submit` 或 `spark-sql` 時，請傳遞下列命令。

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0，相比之前的 Hudi 版本有顯著改進。如需詳細資訊，請參閱 [Apache Hudi 0.11.0 遷移指南](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)。此標籤上的範例反映了這些變更。

**在主節點上開啟 Spark Shell**

1. 使用 SSH 連接至主節點。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell，請將 *spark-shell* 取代為 *pyspark*。

   ```
   spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \    
   --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"  \
   --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
   ```

## 使用 Amazon EMR 6.6 及更早版本啟動 Spark Shell
<a name="hudi-datasets-67"></a>

在使用 Amazon EMR 6.6.x 或更早版本執行 `spark-shell`、`spark-submit` 或 `spark-sql` 時，請傳遞下列命令。

**注意**  
Amazon EMR 6.2 和 5.31 及更新版本 (Hudi 0.6.x 及更新版本) 可以從組態中省略 `spark-avro.jar`。
Amazon EMR 6.5 和 5.35 及更新版本 (Hudi 0.9.x 及更新版本) 可以從組態中省略 `spark.sql.hive.convertMetastoreParquet=false`。
Amazon EMR 6.6 和 5.36 及更新版本 (Hudi 0.10.x 及更新版本) 必須包含 `HoodieSparkSessionExtension` 組態，如[版本：0.10.0 Spark 指南](https://hudi.apache.org/docs/0.10.0/quick-start-guide/)中所述：  

  ```
  --conf  "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
  ```

**在主節點上開啟 Spark Shell**

1. 使用 SSH 連接至主節點。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[使用 SSH 連接至主節點](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html)。

1. 輸入以下命令啟動 Spark shell。若要使用 PySpark Shell，請將 *spark-shell* 取代為 *pyspark*。

   ```
   spark-shell \
   --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
   --conf "spark.sql.hive.convertMetastoreParquet=false" \
   --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
   ```

## 將 Hudi 與使用 Amazon EMR 6.7 及更新版本的 Amazon EMR Notebooks 搭配使用
<a name="hudi-datasets-notebooks"></a>

若要將 Hudi 與 Amazon EMR Notebooks 搭配使用，您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集主節點上的 HDFS。然後，使用筆記本編輯器來設定 EMR 筆記本以使用 Hudi。

**將 Hudi 與 Amazon EMR Notebooks 搭配使用**

1. 為 Amazon EMR Notebooks 建立和啟動叢集。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[為筆記本建立 Amazon EMR 叢集](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html)。

1. 使用 SSH 連接到叢集的主節點，然後將 jar 檔案從本機檔案系統複製到 HDFS，如以下範例所示。在此範例中，我們在 HDFS 中建立目錄以便清楚管理檔案。如有需要，您可以在 HDFS 中選擇自己的目的地。

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

1. 開啟筆記本編輯器，輸入下列範例中的程式碼，然後執行程式碼。

   ```
   %%configure
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
               "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
             }}
   ```

## 將 Hudi 與使用 Amazon EMR 6.6 及更早版本的 Amazon EMR Notebooks 搭配使用
<a name="hudi-datasets-notebooks-66"></a>

若要將 Hudi 與 Amazon EMR Notebooks 搭配使用，您必須先將 Hudi jar 檔案從本機檔案系統複製到筆記本叢集主節點上的 HDFS。然後，使用筆記本編輯器來設定 EMR 筆記本以使用 Hudi。

**將 Hudi 與 Amazon EMR Notebooks 搭配使用**

1. 為 Amazon EMR Notebooks 建立和啟動叢集。如需詳細資訊，請參閱《Amazon EMR 管理指南》**中的[為筆記本建立 Amazon EMR 叢集](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks-cluster.html)。

1. 使用 SSH 連接到叢集的主節點，然後將 jar 檔案從本機檔案系統複製到 HDFS，如以下範例所示。在此範例中，我們在 HDFS 中建立目錄以便清楚管理檔案。如有需要，您可以在 HDFS 中選擇自己的目的地。

   ```
   hdfs dfs -mkdir -p /apps/hudi/lib
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
   ```

   ```
   hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
   ```

1. 開啟筆記本編輯器，輸入下列範例中的程式碼，然後執行程式碼。

   ```
   { "conf": {
               "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
               "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
               "spark.sql.hive.convertMetastoreParquet":"false"
             }}
   ```

## 初始化 Hudi 的 Spark 工作階段
<a name="emr-hudi-initialize-session"></a>

使用 Scala 時，您必須在 Spark 工作階段中匯入下列類別。這需要在每個 Spark 工作階段都各自完成一次。

```
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
```

## 寫入 Hudi 資料集
<a name="emr-hudi-dataframe"></a>

下列範例顯示如何建立 DataFrame 並將其寫入為 Hudi 資料集。

**注意**  
若要將程式碼範例貼到 Spark shell 中，請在提示字元中輸入 **:paste**、貼上範例，然後按 **CTRL** \$1 **D**。

每次將 DataFrame 寫入至 Hudi 資料集時，都必須指定 `DataSourceWriteOptions`。這些選項有許多在各個寫入操作之間可能是相同的。以下範例會使用 `hudiOptions` 變數指定常用選項，後續範例會使用這些變數。

### 將 Scala 與 Amazon EMR 6.7 及更新版本搭配使用進行寫入
<a name="scala-examples-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0，相比之前的 Hudi 版本有顯著改進。如需詳細資訊，請參閱 [Apache Hudi 0.11.0 遷移指南](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)。此標籤上的範例反映了這些變更。

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TBL_NAME.key -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
  HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
  HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
  HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName",
  HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 將 Scala 與 Amazon EMR 6.6 及更早版本搭配使用進行寫入
<a name="scala-examples-66"></a>

```
// Create a DataFrame
val inputDF = Seq(
 ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
 ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
 ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
 ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
 ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
 ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
 ).toDF("id", "creation_date", "last_update_time")

//Specify common DataSourceWriteOptions in the single hudiOptions variable 
val hudiOptions = Map[String,String](
  HoodieWriteConfig.TABLE_NAME -> "tableName",
  DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
  DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
  DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
  DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
  DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName",
  DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
  DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)

// Write the DataFrame as a Hudi dataset
(inputDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Overwrite)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 PySpark 進行寫入
<a name="pyspark-examples"></a>

```
# Create a DataFrame
inputDF = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ["id", "creation_date", "last_update_time"]
)

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'tableName',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'tableName',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}

# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

**注意**  
您可能會在程式碼範例和通知中看到 "hoodie" 而不是 Hudi。Hudi 程式碼庫廣泛使用舊的 "hoodie" 拼寫。


**Hudi 的 DataSourceWriteOptions 參考**  

| 選項 | Description | 
| --- | --- | 
|  TABLE\$1NAME  |  在其下註冊資料集的表名。  | 
|  TABLE\$1TYPE\$1OPT\$1KEY  |  選用。指定要將資料集建立為 `"COPY_ON_WRITE"` 或 `"MERGE_ON_READ"`。預設值為 `"COPY_ON_WRITE"`。  | 
|  RECORDKEY\$1FIELD\$1OPT\$1KEY  |  記錄金鑰欄位，其值將用作 `HoodieKey` 的 `recordKey` 元件。實際值將藉由叫用欄位值的 `.toString()` 來取得。您可以使用點符號來指定巢狀欄位，例如 `a.b.c`。  | 
|  PARTITIONPATH\$1FIELD\$1OPT\$1KEY  |  分割區路徑欄位，其值將用作 `HoodieKey` 的 `partitionPath` 元件。實際值將藉由叫用欄位值的 `.toString()` 來取得。  | 
|  PRECOMBINE\$1FIELD\$1OPT\$1KEY  |  在實際寫入之前，會在預先結合中使用此欄位。當兩個記錄具有相同的金鑰值時，Hudi 選取 (由 `Object.compareTo(..)` 決定之) 預先組合欄位中值最大的記錄。  | 

只有在您的中繼存放區中註冊 Hudi 資料集資料表時才需要下列選項。如果您未將 Hudi 資料集註冊為 Hive 中繼存放區中的資料表，則不需要這些選項。


**Hive 的 DataSourceWriteOptions 參考**  

| 選項 | Description | 
| --- | --- | 
|  HIVE\$1DATABASE\$1OPT\$1KEY  |  要同步的 Hive 資料庫。預設值為 `"default"`。  | 
|  HIVE\$1PARTITION\$1EXTRACTOR\$1CLASS\$1OPT\$1KEY  |  用於將分割區欄位值擷取至 Hive 分割區欄的類別。  | 
|  HIVE\$1PARTITION\$1FIELDS\$1OPT\$1KEY  |  資料集中用於判斷 Hive 分割區欄的欄位。  | 
|  HIVE\$1SYNC\$1ENABLED\$1OPT\$1KEY  |  當設定為 `"true"` 時，將資料集註冊至 Apache Hive 中繼存放區。預設值為 `"false"`。  | 
|  HIVE\$1TABLE\$1OPT\$1KEY  |  必要. Hive 中要同步的資料表的名稱。例如 `"my_hudi_table_cow"`。  | 
|  HIVE\$1USER\$1OPT\$1KEY  |  選用。同步時要使用的 Hive 使用者名稱。例如 `"hadoop"`。  | 
|  HIVE\$1PASS\$1OPT\$1KEY  |  選用。由 `HIVE_USER_OPT_KEY` 指定之使用者的 Hive 密碼。  | 
|  HIVE\$1URL\$1OPT\$1KEY  |  Hive 中繼存放區 URL。  | 

## Upsert 資料
<a name="emr-hudi-upsert-to-datasets"></a>

下列範例示範如何撰寫 DataFrame 來進行 upsert 資料。與先前插入範例不同，`OPERATION_OPT_KEY` 值設定為 `UPSERT_OPERATION_OPT_VAL`。此外，指定 `.mode(SaveMode.Append)` 以指示應附加記錄。

### 將 Scala 與 Amazon EMR 6.7 及更新版本搭配使用進行 upsert
<a name="scala-upsert-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0，相比之前的 Hudi 版本有顯著改進。如需詳細資訊，請參閱 [Apache Hudi 0.11.0 遷移指南](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)。此標籤上的範例反映了這些變更。

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 將 Scala 與 Amazon EMR 6.6 及更早版本搭配使用進行 upsert
<a name="scala-upsert-66"></a>

```
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))

(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 PySpark 進行 upsert
<a name="pyspark-upsert"></a>

```
from pyspark.sql.functions import lit

# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value'))

updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

## 刪除記錄
<a name="emr-hudi-delete-from-datasets"></a>

若要硬刪除記錄，您可以 upsert 空的承載。在此情況下，`PAYLOAD_CLASS_OPT_KEY` 選項會指定 `EmptyHoodieRecordPayload` 類別。此範例利用在 upsert 範例中使用的相同 DataFrame `updateDF`，來指定相同的記錄。

### 將 Scala 與 Amazon EMR 6.7 及更新版本搭配使用進行刪除
<a name="scala-delete-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0，相比之前的 Hudi 版本有顯著改進。如需詳細資訊，請參閱 [Apache Hudi 0.11.0 遷移指南](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)。此標籤上的範例反映了這些變更。

```
(updateDF.write
    .format("hudi")
    .options(hudiOptions)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 將 Scala 與 Amazon EMR 6.6 及更早版本搭配使用進行刪除
<a name="scala-delete-66"></a>

```
(updateDF.write
    .format("org.apache.hudi")
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
    .mode(SaveMode.Append)
    .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
```

### 使用 PySpark 進行刪除
<a name="pyspark-delete"></a>

```
updateDF.write \
    .format('org.apache.hudi') \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
    .options(**hudiOptions) \
    .mode('append') \
    .save('s3://amzn-s3-demo-bucket/myhudidataset/')
```

您還可以透過以下方式硬刪除資料：將 `OPERATION_OPT_KEY ` 設定為 `DELETE_OPERATION_OPT_VAL`，來刪除您提交的資料集中的所有記錄。如需有關執行軟刪除的指示，以及有關刪除儲存在 Hudi 資料表中的資料的詳細資訊，請參閱 Apache Hudi 文件中的 [Deletes](https://hudi.apache.org/docs/writing_data.html#deletes)。

## 從 Hudi 資料集讀取
<a name="emr-hudi-read-dataset"></a>

為在目前時間點擷取資料，Hudi 依預設執行快照查詢。以下是查詢在 [寫入 Hudi 資料集](#emr-hudi-dataframe) 中寫入至 S3 的資料集的範例。將 *s3：//amzn-s3-demo-bucket/myhudidataset* 取代為您的資料表路徑，並為每個分割區層級新增萬用字元星號，*加上一個額外的星號*。在此範例中，有一個分割區層級，因此我們新增了兩個萬用字符號。

### 將 Scala 與 Amazon EMR 6.7 及更新版本搭配使用進行讀取
<a name="scala-read-67"></a>

**注意**  
Amazon EMR 6.7.0 使用 [Apache Hudi](https://hudi.apache.org/) 0.11.0-amzn-0，相比之前的 Hudi 版本有顯著改進。如需詳細資訊，請參閱 [Apache Hudi 0.11.0 遷移指南](https://hudi.apache.org/releases/release-0.11.0/#migration-guide)。此標籤上的範例反映了這些變更。

```
val snapshotQueryDF = spark.read
    .format("hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset") 
    .show()
```

### 將 Scala 與 Amazon EMR 6.6 及更早版本搭配使用進行讀取
<a name="scala-read-66"></a>

```
(val snapshotQueryDF = spark.read
    .format("org.apache.hudi")
    .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*"))

snapshotQueryDF.show()
```

### 使用 PySpark 進行讀取
<a name="pyspark-read"></a>

```
snapshotQueryDF = spark.read \
    .format('org.apache.hudi') \
    .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*')
    
snapshotQueryDF.show()
```

### 增量查詢
<a name="emr-hudi-incremental-query"></a>

您還可以使用 Hudi 執行增量查詢，以取得自提供遞交時間戳記以來已變更的記錄串流。若要這麼做，請將 `QUERY_TYPE_OPT_KEY` 欄位設定為 `QUERY_TYPE_INCREMENTAL_OPT_VAL`。然後，為 `BEGIN_INSTANTTIME_OPT_KEY` 新增一個值，以取得自指定時間以來寫入的所有記錄。增量查詢的效率通常是批次處理查詢的十倍，因為它們僅處理變更的記錄。

在執行增量查詢時，請使用根 (基本) 資料表路徑，而不需要用於快照查詢的萬用字元星號。

**注意**  
Presto 不支援增量查詢。

#### 使用 Scala 進行增量查詢
<a name="scala-incremental-queries"></a>

```
val incQueryDF = spark.read
    .format("org.apache.hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>)
    .load("s3://amzn-s3-demo-bucket/myhudidataset")
     
incQueryDF.show()
```

#### 使用 PySpark 進行增量查詢
<a name="pyspark-incremental-queries"></a>

```
readOptions = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': <beginInstantTime>,
}

incQueryDF = spark.read \
    .format('org.apache.hudi') \
    .options(**readOptions) \
    .load('s3://amzn-s3-demo-bucket/myhudidataset')
    
incQueryDF.show()
```

如需有關從 Hudi 資料集讀取的詳細資訊，請參閱 Apache Hudi 文件中的[查詢 Hudi 資料表](https://hudi.apache.org/docs/querying_data.html)。