

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 AWS Glue ETL 任務，在 Data Catalog 中更新結構描述並新增新的分區
<a name="update-from-job"></a>

您的擷取、轉換和載入 (ETL) 任務可能會在目標資料存放區中建立新的資料表分割區。您的資料集結構描述可能會隨著時間演進，而與 AWS Glue 資料目錄結構描述有所不同。AWS GlueETL 任務目前提供數個功能，您可以在 ETL 指令碼中使用這些功能來更新資料目錄中的 ETL 結構描述和分割區。這些功能可讓您在資料目錄中查看 ETL 運作的結果，無需重新執行爬蟲程式。

## 新分割區
<a name="update-from-job-partitions"></a>

如果您想要在 中檢視新的分割區 AWS Glue Data Catalog，您可以執行下列其中一項操作：
+ 任務完成後，重新執行爬蟲程式，並在爬蟲程式完成後在主控台上檢視新的分割區。
+ 任務完成後，立即在主控台上檢視新的分割區，而不重新執行爬蟲程式。您可以在 ETL 指令碼中加入幾行程式碼，以啟用此功能，如下列範例所示。程式碼會使用 `enableUpdateCatalog` 引數來指出在資料目錄建立新分割區時，要在任務執行期間更新。

**方法 1**  
以選項引數傳遞 `enableUpdateCatalog` 和 `partitionKeys`。  

```
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["region", "year", "month", "day"]


sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>,
                                                    table_name=<target_table_name>, transformation_ctx="write_sink",
                                                    additional_options=additionalOptions)
```

```
val options = JsonOptions(Map(
    "path" -> <S3_output_path>, 
    "partitionKeys" -> Seq("region", "year", "month", "day"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(
    database = <target_db_name>, 
    tableName = <target_table_name>, 
    additionalOptions = options)sink.writeDynamicFrame(df)
```

**方法 2**  
以 `getSink()` 傳遞 `enableUpdateCatalog` 和 `partitionKeys`，並呼叫 `DataSink` 物件上的 `setCatalogInfo()`。  

```
sink = glueContext.getSink(
    connection_type="s3", 
    path="<S3_output_path>",
    enableUpdateCatalog=True,
    partitionKeys=["region", "year", "month", "day"])
sink.setFormat("json")
sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>)
sink.writeFrame(last_transform)
```

```
val options = JsonOptions(
   Map("path" -> <S3_output_path>, 
       "partitionKeys" -> Seq("region", "year", "month", "day"), 
       "enableUpdateCatalog" -> true))
val sink = glueContext.getSink("s3", options).withFormat("json")
sink.setCatalogInfo(<target_db_name>, <target_table_name>)
sink.writeDynamicFrame(df)
```

現在，您可以建立新的目錄資料表、使用修改的結構描述更新現有資料表，並使用 AWS Glue ETL 任務在資料目錄新增新的資料表分割區，而無需重新執行爬蟲程式。

## 更新資料表結構描述
<a name="update-from-job-updating-table-schema"></a>

如果您想覆寫資料目錄資料表的結構描述，可以執行以下任一作業：
+ 任務完成後，重新執行爬蟲程式，並確定您的爬蟲程式已設定為更新資料表定義。當爬蟲程式完成時，檢視主控台上的新分割區以及任何結構描述更新。如需詳細資訊，請參閱[使用 API 設定爬蟲程式](https://docs.aws.amazon.com/glue/latest/dg/crawler-configuration.html#crawler-configure-changes-api)。
+ 任務完成後，立即在主控台上檢視修改的結構描述，無需重新執行爬蟲程式。您可以在 ETL 指令碼中加入幾行程式碼，以啟用此功能，如下列範例所示。程式碼使用設為 true 的 `enableUpdateCatalog`，並同時將 `updateBehavior` 設為 `UPDATE_IN_DATABASE`，表示在任務執行期間，在資料目錄中覆寫結構描述並新增新的分割區。

------
#### [ Python ]

```
additionalOptions = {
    "enableUpdateCatalog": True, 
    "updateBehavior": "UPDATE_IN_DATABASE"}
additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"]

sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>,
    table_name=<dst_tbl_name>, transformation_ctx="write_sink",
    additional_options=additionalOptions)
job.commit()
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("partition_0", "partition_1"), 
    "enableUpdateCatalog" -> true))
val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options)
sink.writeDynamicFrame(df)
```

------

如果您想防止資料表結構描述遭到覆寫，但仍想要新增新的分割區，您也可以將 `updateBehavior` 值設為 `LOG`。`updateBehavior` 的預設值為 `UPDATE_IN_DATABASE`，因此，如果您未明確定該值，則會覆寫資料表結構描述。

如果 `enableUpdateCatalog` 未設為 true，無論針對 `updateBehavior` 選取哪個選項，ETL 任務都不會在資料目錄中更新資料表。

## 建立新的資料表
<a name="update-from-job-creating-new-tables"></a>

您也可以使用相同選項，在資料目錄中建立新資料表。您可以使用 `setCatalogInfo` 指定資料庫和新的資料表名稱。

------
#### [ Python ]

```
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data",
    enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["partition_key0", "partition_key1"])
sink.setFormat("<format>")
sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>)
sink.writeFrame(last_transform)
```

------
#### [ Scala ]

```
val options = JsonOptions(Map(
    "path" -> outputPath, 
    "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), 
    "enableUpdateCatalog" -> true, 
    "updateBehavior" -> "UPDATE_IN_DATABASE"))
val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>")
sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”)
sink.writeDynamicFrame(df)
```

------

## 限制
<a name="update-from-job-restrictions"></a>

請注意下列限制：
+ 僅支援 Amazon Simple Storage Service (Amazon S3) 的目標。
+ 控管的資料表不支援 `enableUpdateCatalog` 功能。
+ 僅支援下列格式：`json`、`csv`、`avro` 和 `parquet`。
+ 若要建立或更新具有 `parquet` 分類的資料表，您必須使用適用於 DynamicFrames 的 AWS Glue 最佳化 Parquet 寫入器。這可以透過以下方法之一來達成：
  + 如果要使用 `parquet` 分類來更新目錄中的現有資料表，則在更新之前，資料表的 `"useGlueParquetWriter"` 資料表屬性必須設定為 `true`。您可以透過 AWS Glue APIs/SDK、主控台或 Athena DDL 陳述式來設定此屬性。  
![\[AWS Glue 主控台中的目錄資料表屬性編輯欄位。\]](http://docs.aws.amazon.com/zh_tw/glue/latest/dg/images/edit-table-property.png)

    設定目錄資料表屬性後，您便可以使用以下程式碼片段，以新資料更新目錄資料表：

    ```
    glueContext.write_dynamic_frame.from_catalog(
        frame=frameToWrite,
        database="dbName",
        table_name="tableName",
        additional_options={
            "enableUpdateCatalog": True,
            "updateBehavior": "UPDATE_IN_DATABASE"
        }
    )
    ```
  + 如果目錄中尚不存在資料表，您可以在指令碼中使用 `getSink()` 方法和 `connection_type="s3"` 將資料表及其分割區新增至目錄中，並將資料寫入 Amazon S3。為您的工作流程提供適當的 `partitionKeys` 和 `compression`。

    ```
    s3sink = glueContext.getSink(
        path="s3://bucket/folder/",
        connection_type="s3",
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=[],
        compression="snappy",
        enableUpdateCatalog=True
    )
        
    s3sink.setCatalogInfo(
        catalogDatabase="dbName", catalogTableName="tableName"
    )
        
    s3sink.setFormat("parquet", useGlueParquetWriter=True)
    s3sink.writeFrame(frameToWrite)
    ```
  + `glueparquet` 格式值是啟用 AWS Glue parquet 寫入器的舊版方法。
+ 將 `updateBehavior` 設為 `LOG` 後，只有在 `DynamicFrame` 結構描述等同於或包含資料目錄資料表結構描述中定義之欄的子集時，才會新增新的分割區。
+ 未分割資料表不支援結構描述更新 (不使用 "partitionKeys" 選項)。
+ 在 ETL 指令碼中傳遞的參數和資料目錄資料表結構描述中的 partitionKeys 之間，您的 partitionKeys 必須等效，而且順序相同。
+ 此功能目前尚不支援更新/建立更新巢狀結構描述的資料表 (例如，結構內的陣列)。

如需詳細資訊，請參閱[Spark 指令碼程式設計](aws-glue-programming.md)。

# 在 ETL 任務中使用 MongoDB 連線
<a name="integrate-with-mongo-db"></a>

您可以為 MongoDB 建立一個連線，然後在 AWS Glue 任務中使用該連線。如需詳細資訊，請參閱 AWS Glue 程式設計指南[MongoDB 連線](aws-glue-programming-etl-connect-mongodb-home.md)中的 。連線 `url`、`username` 和 `password` 儲存在 MongoDB 連線中。其他選項可以在 ETL 任務指令碼中使用 `glueContext.getCatalogSource` 的 `additionalOptions` 參數指定。其他選項包括：
+ `database`：(必要) 讀取的 MongoDB 資料庫。
+ `collection`：(必要) 讀取的 MongoDB 集合。

藉由將 `database` 和 `collection` 資訊放在 ETL 任務指令碼中，您可以在多個任務中使用相同的連線。

1. 建立 MongoDB 資料來源的 AWS Glue Data Catalog 連線。請參閱 ["connectionType": "mongodb"](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-mongodb) 以取得此連線參數的描述。您可以使用主控台、API 或 CLI 來建立連線。

1. 在 中建立資料庫 AWS Glue Data Catalog ，以存放 MongoDB 資料的資料表定義。如需詳細資訊，請參閱[建立資料庫](define-database.md)。

1. 建立爬蟲程式，使用連接到 MongoDB 的連線中的資訊，網路爬取 MongoDB 中的資料。爬蟲程式會在 中建立資料表 AWS Glue Data Catalog ，描述您在任務中使用的 MongoDB 資料庫中的資料表。如需詳細資訊，請參閱[使用編目程式填入 Data Catalog](add-crawler.md)。

1. 使用自訂指令碼來建立任務。您可以使用主控台、API 或 CLI 來建立它們。如需詳細資訊，請參閱[在 AWS Glue 新增任務](https://docs.aws.amazon.com/glue/latest/dg/add-job.html)。

1. 選擇任務的資料目標。代表資料目標的資料表可在您的資料目錄中定義，或者您的任務可在執行時建立目標資料表。選擇編寫任務時的目標位置。如果目標需要連線，您的任務也會參照此連線。如果您的任務需要多個資料目標，可在之後編輯指令碼以新增來源。

1. 為任務及產生的指令碼提供引數，以自訂任務處理環境。

   這裡提供了從基於資料目錄中定義之資料表結構的 MongoDB 資料庫建立 `DynamicFrame` 的範例。程式碼使用 `additionalOptions` 以提供其他資料來源資訊：

------
#### [  Scala  ]

   ```
   val resultFrame: DynamicFrame = glueContext.getCatalogSource(
           database = catalogDB, 
           tableName = catalogTable, 
           additionalOptions = JsonOptions(Map("database" -> DATABASE_NAME, 
                   "collection" -> COLLECTION_NAME))
         ).getDynamicFrame()
   ```

------
#### [  Python  ]

   ```
   glue_context.create_dynamic_frame_from_catalog(
           database = catalogDB,
           table_name = catalogTable,
           additional_options = {"database":"database_name", 
               "collection":"collection_name"})
   ```

------

1. 隨需或透過觸發執行任務。