

# DynamicFrame 类
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame"></a>

Apache Spark 中的主要抽象之一是 SparkSQL `DataFrame`，它类似于在 R 和 Pandas 中找到的 `DataFrame` 构造。`DataFrame` 类似于表格，支持函数风格（map/reduce/filter/等）操作和 SQL 操作（select、project、aggregate）。

尽管 `DataFrames` 功能强大且运用广泛，但在提取、转换和加载 (ETL) 操作方面存在局限性。最明显的是，它们需要指定了架构后才能加载任何数据。SparkSQL 通过对数据进行两次扫描解决了这一问题 – 第一次为了推断架构，第二次为了加载数据。然而，此推断仍有局限性，且不能解决数据混乱的实际问题。例如，同样的字段在不同记录中可能属于不同类型。对此，Apache Spark 通常没有好的办法，只能使用原始字段文本将类型报告为 `string`。这或许不正确，同时您可能希望更精细地控制解决架构差异的方式。对于大型数据集而言，每扫描一次源数据都会付出高昂代价。

为了解决这些局限性，AWS Glue 推出了 `DynamicFrame`。`DynamicFrame` 类似于 `DataFrame`，不同之处在于每个记录都是自描述的，因此初始并不需要架构。相反，AWS Glue 会在需要时实时计算一个架构，并使用选择（或联合）类型显式编码架构不一致之处。您可以解决这些不一致之处，以使您的数据集兼容需要固定架构的数据存储。

同样，一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。它类似于 Spark `DataFrame` 中的一行，只不过它是自描述的，可用于不符合固定架构的数据。将 AWS Glue 与 PySpark 一起使用时，通常不需要处理独立 `DynamicRecords`。相反，您需要通过 `DynamicFrame` 一起转换数据集。

在解决任何架构不一致问题后，就可以在 `DynamicFrames` 和 `DataFrames` 之间来回转换。

##  – 构造 –
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-_constructing"></a>
+ [\$1\$1init\$1\$1](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-__init__)
+ [fromDF](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-fromDF)
+ [toDF](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-toDF)

## \$1\$1init\$1\$1
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-__init__"></a>

**`__init__(jdf, glue_ctx, name)`**
+ `jdf` – 对 Java 虚拟机 (JVM) 中数据帧的引用。
+ `glue_ctx` – 一个 [GlueContext 类](aws-glue-api-crawler-pyspark-extensions-glue-context.md) 对象。
+ `name` – 可选的名称字符串，默认为空。

## fromDF
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-fromDF"></a>

**`fromDF(dataframe, glue_ctx, name)`**

通过将 `DataFrame` 字段转换为 `DynamicRecord` 字段将 `DataFrame` 转换为 `DynamicFrame`。返回新的 `DynamicFrame`。

一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。它类似于 Spark `DataFrame` 中的一行，只不过它是自描述的，可用于不符合固定架构的数据。

此函数期望您 `DataFrame` 中名称重复的列已经被解析。
+ `dataframe` – 要转换的 Apache Spark SQL `DataFrame` (必需)。
+ `glue_ctx` – 为此转换指定上下文的 [GlueContext 类](aws-glue-api-crawler-pyspark-extensions-glue-context.md) 对象 (必需)。
+ `name` – 生成的 `DynamicFrame` 的名称（从 AWS Glue 3.0 起是可选的）。

## toDF
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-toDF"></a>

**`toDF(options)`**

通过将 `DynamicRecords` 转换为 `DataFrame` 字段将 `DynamicFrame` 转换为 Apache Spark `DataFrame`。返回新的 `DataFrame`。

一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。它类似于 Spark `DataFrame` 中的一行，只不过它是自描述的，可用于不符合固定架构的数据。
+  `options` – 指定在转换过程中如何解析选择类型的 `ResolveOption` 对象列表。此参数用于处理架构不一致，不适用于像 CSV 解析这样的格式选项。

   对于 CSV 解析和其他格式选项，请在创建 DynamicFrame 时在 `from_options` 方法中指定这些选项，而不是在 `toDF` 方法中指定这些选项。

   以下是处理 CSV 格式选项的正确方法的示例：

  ```
  from awsglue.context import GlueContext
  from awsglue.dynamicframe import DynamicFrame
  from pyspark.context import SparkContext
  
  sc = SparkContext()
  glueContext = GlueContext(sc)
  
  # Correct: Specify format options in from_options
  csv_dyf = glueContext.create_dynamic_frame.from_options(
      connection_type="s3",
      connection_options={"paths": ["s3://my-bucket/path/to/csv/"]},
      format="csv",
      format_options={
          "withHeader": True,
          "separator": ",",
          "inferSchema": True
      }
  )
  
  # Convert to DataFrame (no format options needed here)
  csv_df = csv_dyf.toDF()
  ```

   `toDF` 中的 `options` 参数专门用于解析选择类型。如果您选择 `Project` 和 `Cast` 操作类型，则指定目标类型。示例包括以下内容。

  ```
  >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")])
  >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])
  ```

##  – 信息 –
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-_informational"></a>
+ [count](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-count)
+ [架构](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-schema)
+ [printSchema](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-printSchema)
+ [show](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-show)
+ [repartition](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-repartition)
+ [coalesce](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-coalesce)

## count
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-count"></a>

`count( )` – 返回底层 `DataFrame` 中的行数。

## 架构
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-schema"></a>

`schema( )` – 返回此 `DynamicFrame` 的架构，如果此架构不可用，则返回底层 `DataFrame` 的架构。

有关构成此架构的 `DynamicFrame` 类型的更多信息，请参阅 [PySpark 扩展类型](aws-glue-api-crawler-pyspark-extensions-types.md)。

## printSchema
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-printSchema"></a>

`printSchema( )` – 打印底层 `DataFrame` 的架构。

## show
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-show"></a>

`show(num_rows)` – 打印底层 `DataFrame` 中的行数。

## repartition
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-repartition"></a>

`repartition(numPartitions)` – 返回具有 `numPartitions` 个分区的新 `DynamicFrame`。

## coalesce
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-coalesce"></a>

`coalesce(numPartitions)` – 返回具有 `numPartitions` 个分区的新 `DynamicFrame`。

##  – 转换 –
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-_transforms"></a>
+ [apply\$1mapping](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-apply_mapping)
+ [drop\$1fields](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-drop_fields)
+ [筛选](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-filter)
+ [join](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-join)
+ [映射](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-map)
+ [mergeDynamicFrame](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-merge)
+ [relationalize](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-relationalize)
+ [rename\$1field](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-rename_field)
+ [resolveChoice](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-resolveChoice)
+ [select\$1fields](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-select_fields)
+ [spigot](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-spigot)
+ [split\$1fields](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_fields)
+ [split\$1rows](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_rows)
+ [unbox](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unbox)
+ [联合](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-union)
+ [unnest](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unnest)
+ [unnest\$1ddb\$1json](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unnest_ddb_json)
+ [write](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-write)

## apply\$1mapping
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-apply_mapping"></a>

**`apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

将声明映射应用于此 `DynamicFrame` 并返回已将那些映射应用于所指定字段的新 `DynamicFrame`。新 `DynamicFrame` 中省略了未指定的字段。
+ `mappings` – 映射元组列表（必需）。每个元组包括：（源列，源类型，目标列，目标类型）。

  如果源列的名称有一个点“`.`”，则必须在名称外加上反引号“````”。例如，要将 `this.old.name`（字符串）映射到 `thisNewName`，可以使用以下元组：

  ```
  ("`this.old.name`", "string", "thisNewName", "string")
  ```
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 apply\$1mapping 重命名字段并更改字段类型
<a name="pyspark-apply_mapping-example"></a>

以下代码示例展示了如何使用 `apply_mapping` 方法重命名选定字段并更改字段类型。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-legislators.md#aws-glue-programming-python-samples-legislators-crawling) 中的说明进行操作。

```
# Example: Use apply_mapping to reshape source data into
# the desired column names and types as a new DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create a DynamicFrame and view its schema
persons = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="persons_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()

# Select and rename fields, change field type
print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:")
persons_mapped = persons.apply_mapping(
    [
        ("family_name", "String", "last_name", "String"),
        ("name", "String", "first_name", "String"),
        ("birth_date", "String", "date_of_birth", "Date"),
    ]
)
persons_mapped.printSchema()
```

#### Output
<a name="apply_mapping-example-output"></a>

```
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

Schema for the persons_mapped DynamicFrame, created with apply_mapping:
root
|-- last_name: string
|-- first_name: string
|-- date_of_birth: date
```

## drop\$1fields
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-drop_fields"></a>

**`drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

调用 [FlatMap 类](aws-glue-api-crawler-pyspark-transforms-flat-map.md) 转换以从 `DynamicFrame` 中删除字段。返回指定字段已删除的新的 `DynamicFrame`。
+ `paths` – 字符串列表。每个字符串都包含要删除的字段节点的完整路径。您可以使用点表示法来指定嵌套字段。例如，如果字段 `first` 在树结构中是字段 `name` 的子字段，则为路径指定 `"name.first"`。

  如果字段节点的名称有文字 `.`，则必须用反引号（```）将名称括起来。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 drop\$1fields 从 `DynamicFrame` 中删除字段
<a name="pyspark-drop_fields-example"></a>

此代码示例使用 `drop_fields` 方法从 `DynamicFrame` 中移除选定的顶级字段和嵌套字段。

**示例数据集**

该示例使用以下数据集，其在代码中由 `EXAMPLE-FRIENDS-DATA` 表表示：

```
{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []}
{"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]}
{"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]}
{"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}}
{"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}
```

**示例代码**

```
# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame.
# Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name.
# Replace EXAMPLE-FRIENDS-DATA with your table name.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create a DynamicFrame from Glue Data Catalog
glue_source_database = "MY-EXAMPLE-DATABASE"
glue_source_table = "EXAMPLE-FRIENDS-DATA"

friends = glueContext.create_dynamic_frame.from_catalog(
    database=glue_source_database, table_name=glue_source_table
)
print("Schema for friends DynamicFrame before calling drop_fields:")
friends.printSchema()

# Remove location.county, remove friends.age, remove age
friends = friends.drop_fields(paths=["age", "location.county", "friends.age"])
print("Schema for friends DynamicFrame after removing age, county, and friend age:")
friends.printSchema()
```

#### Output
<a name="drop_fields-example-output"></a>

```
Schema for friends DynamicFrame before calling drop_fields:
root
|-- name: string
|-- age: int
|-- location: struct
|    |-- state: string
|    |-- county: string
|-- friends: array
|    |-- element: struct
|    |    |-- name: string
|    |    |-- age: int

Schema for friends DynamicFrame after removing age, county, and friend age:
root
|-- name: string
|-- location: struct
|    |-- state: string
|-- friends: array
|    |-- element: struct
|    |    |-- name: string
```

## 筛选
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-filter"></a>

**`filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

返回新 `DynamicFrame`，其中包含输入 `DynamicFrame` 中满足指定谓词函数 `f` 的所有 `DynamicRecords`。
+ `f` – 应用到 `DynamicFrame` 的谓词函数。该函数必须采用 `DynamicRecord` 作为参数，如果 `DynamicRecord` 满足筛选要求，则返回 True，否则返回 False (必需)。

  一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。其类似于 Spark `DataFrame` 中的一行，但其具有自描述性，可用于不符合固定架构的数据。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用筛选条件获取筛选后的字段选择
<a name="pyspark-filter-example"></a>

此示例用 `filter` 方法创建新 `DynamicFrame`，其中包括筛选后的其他 `DynamicFrame` 字段选择。

类似于 `map` 方法，`filter` 采用一个函数作为实际参数，以将其应用于原 `DynamicFrame` 中的每个记录。该函数将记录作为输入并返回布尔值。如果返回值为 true，则该记录将包含在生成的 `DynamicFrame` 中。如果返回值为 false，则该记录将被排除在外。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备](aws-glue-programming-python-samples-medicaid.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-medicaid.md#aws-glue-programming-python-samples-medicaid-crawling) 中的说明进行操作。

```
# Example: Use filter to create a new DynamicFrame
# with a filtered selection of records

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create DynamicFrame from Glue Data Catalog
medicare = glueContext.create_dynamic_frame.from_options(
    "s3",
    {
        "paths": [
            "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"
        ]
    },
    "csv",
    {"withHeader": True},
)

# Create filtered DynamicFrame with custom lambda
# to filter records by Provider State and Provider City
sac_or_mon = medicare.filter(
    f=lambda x: x["Provider State"] in ["CA", "AL"]
    and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"]
)

# Compare record counts
print("Unfiltered record count: ", medicare.count())
print("Filtered record count:  ", sac_or_mon.count())
```

#### Output
<a name="filter-example-output"></a>

```
Unfiltered record count:  163065
Filtered record count:   564
```

## join
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-join"></a>

**`join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

执行与另一个 `DynamicFrame` 的等式联接并返回生成的 `DynamicFrame`。
+ `paths1` – 此帧中要联接的键列表。
+ `paths2` – 另一帧中要联接的键列表。
+ `frame2` – 要联接的另一个 `DynamicFrame`。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用联接来组合 `DynamicFrames`
<a name="pyspark-join-example"></a>

此示例使用 `join` 方法对三个 `DynamicFrames` 执行联接。AWSGlue 根据您提供的字段键执行联接。由此生成的 `DynamicFrame` 包含来自指定密钥相匹配的两个原始帧中的行。

请注意，`join` 转换会让所有字段保持不变。这意味着您指定要匹配的字段会出现在生成的 DynamicFrame 中，即使这些字段是多余的并且包含相同的密钥。在此示例中，我们在联接后使用 `drop_fields` 移除这些冗余密钥。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-legislators.md#aws-glue-programming-python-samples-legislators-crawling) 中的说明进行操作。

```
# Example: Use join to combine data from three DynamicFrames

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load DynamicFrames from Glue Data Catalog
persons = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="persons_json"
)
memberships = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="memberships_json"
)
orgs = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="organizations_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()
print("Schema for the memberships DynamicFrame:")
memberships.printSchema()
print("Schema for the orgs DynamicFrame:")
orgs.printSchema()

# Join persons and memberships by ID
persons_memberships = persons.join(
    paths1=["id"], paths2=["person_id"], frame2=memberships
)

# Rename and drop fields from orgs
# to prevent field name collisions with persons_memberships
orgs = (
    orgs.drop_fields(["other_names", "identifiers"])
    .rename_field("id", "org_id")
    .rename_field("name", "org_name")
)

# Create final join of all three DynamicFrames
legislators_combined = orgs.join(
    paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships
).drop_fields(["person_id", "org_id"])

# Inspect the schema for the joined data
print("Schema for the new legislators_combined DynamicFrame:")
legislators_combined.printSchema()
```

#### Output
<a name="join-example-output"></a>

```
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

Schema for the memberships DynamicFrame:
root
|-- area_id: string
|-- on_behalf_of_id: string
|-- organization_id: string
|-- role: string
|-- person_id: string
|-- legislative_period_id: string
|-- start_date: string
|-- end_date: string

Schema for the orgs DynamicFrame:
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

Schema for the new legislators_combined DynamicFrame:
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- death_date: string
|-- end_date: string
```

## 映射
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-map"></a>

**`map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

返回由于将指定映射函数应用到原始 `DynamicFrame` 中的所有记录而产生的新的 `DynamicFrame`。
+ `f` – 应用到 `DynamicFrame` 中所有记录的映射函数。该函数必须采用 `DynamicRecord` 作为参数并返回一个新的 `DynamicRecord` (必需)。

  一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。它类似于 Apache Spark `DataFrame` 中的一行，但其具有自描述性，可用于不符合固定架构的数据。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与转换中的错误关联的字符串（可选）。
+ `stageThreshold` – 在转换出错之前可能在其中发生的最大错误数（可选）。默认值为 0。
+ `totalThreshold` – 在处理出错之前可能全面发生的最大错误数（可选）。默认值为 0。

### 示例：使用映射将函数应用于 `DynamicFrame` 中的每个记录
<a name="pyspark-map-example"></a>

此示例展示了如何使用 `map` 方法将函数应用于 `DynamicFrame` 的每个记录。具体而言，此示例将名为 `MergeAddress` 的函数应用于每个记录，以便将多个地址字段合并为一个 `struct` 类型。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备](aws-glue-programming-python-samples-medicaid.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-medicaid.md#aws-glue-programming-python-samples-medicaid-crawling) 中的说明进行操作。

```
# Example: Use map to combine fields in all records
# of a DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create a DynamicFrame and view its schema
medicare = glueContext.create_dynamic_frame.from_options(
        "s3",
        {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]},
        "csv",
        {"withHeader": True})
print("Schema for medicare DynamicFrame:")
medicare.printSchema()

# Define a function to supply to the map transform
# that merges address fields into a single field
def MergeAddress(rec):
  rec["Address"] = {}
  rec["Address"]["Street"] = rec["Provider Street Address"]
  rec["Address"]["City"] = rec["Provider City"]
  rec["Address"]["State"] = rec["Provider State"]
  rec["Address"]["Zip.Code"] = rec["Provider Zip Code"]
  rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]]
  del rec["Provider Street Address"]
  del rec["Provider City"]
  del rec["Provider State"]
  del rec["Provider Zip Code"]
  return rec


# Use map to apply MergeAddress to every record
mapped_medicare = medicare.map(f = MergeAddress)
print("Schema for mapped_medicare DynamicFrame:")
mapped_medicare.printSchema()
```

#### Output
<a name="map-example-output"></a>

```
Schema for medicare DynamicFrame:
root
|-- DRG Definition: string
|-- Provider Id: string
|-- Provider Name: string
|-- Provider Street Address: string
|-- Provider City: string
|-- Provider State: string
|-- Provider Zip Code: string
|-- Hospital Referral Region Description: string
|-- Total Discharges: string
|-- Average Covered Charges: string
|-- Average Total Payments: string
|-- Average Medicare Payments: string

Schema for mapped_medicare DynamicFrame:
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
|    |-- Zip.Code: string
|    |-- City: string
|    |-- Array: array
|    |    |-- element: string
|    |-- State: string
|    |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
```

## mergeDynamicFrame
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-merge"></a>

**`mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)`**

基于指定主键的将此 `DynamicFrame` 与暂存 `DynamicFrame` 合并以标识记录。不会对重复记录（具有相同主键的记录）去除重复。如果暂存帧中没有匹配的记录，则从源中保留所有记录（包括重复记录）。如果暂存帧具有匹配的记录，则暂存帧中的记录将覆盖 AWS Glue 中的源中的记录。
+ `stage_dynamic_frame` – 要合并的暂存 `DynamicFrame`。
+ `primary_keys` – 要匹配源和暂存动态帧中的记录的主键字段列表。
+ `transformation_ctx` – 用于检索有关当前转换的元数据的唯一字符串（可选）。
+ `options` – 为此转换提供其他信息的 JSON 名称-值对的字符串。当前未使用此参数。
+ `info` – 一个 `String`。要与此转换中的错误关联的任何字符串。
+ `stageThreshold` – 一个 `Long`。给定转换中处理需要排除的错误的数目。
+ `totalThreshold` – 一个 `Long`。此转换中处理需要排除的错误的总数。

该方法将返回通过将此 `DynamicFrame` 与暂存 `DynamicFrame` 合并获取的新 `DynamicFrame`。

在这些情况下，返回的 `DynamicFrame` 将包含记录 A：
+ 如果 `A` 在源帧和暂存帧中都存在，则返回暂存帧中的 `A`。
+ 如果 `A` 在源表中，且 `A.primaryKeys` 不在 `stagingDynamicFrame` 中，这意味着未在暂存表中更新 `A`。

源帧和暂存帧不需要具有相同的架构。

### 示例：根据主键使用 mergeDynamicFrame 合并两个 `DynamicFrames`
<a name="pyspark-mergeDynamicFrame-example"></a>

以下代码示例显示了如何使用 `mergeDynamicFrame` 方法根据主键 `id` 将 `DynamicFrame` 与“staging”`DynamicFrame` 合并。

**示例数据集**

该示例使用了名为 `split_rows_collection` 的 `DynamicFrameCollection` 中的两个 `DynamicFrames`。以下是 `split_rows_collection` 中的键列表。

```
dict_keys(['high', 'low'])
```

**示例代码**

```
# Example: Use mergeDynamicFrame to merge DynamicFrames
# based on a set of specified primary keys

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import SelectFromCollection

# Inspect the original DynamicFrames
frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low")
print("Inspect the DynamicFrame that contains rows where ID < 10")
frame_low.toDF().show()

frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high")
print("Inspect the DynamicFrame that contains rows where ID > 10")
frame_high.toDF().show()

# Merge the DynamicFrames based on the "id" primary key
merged_high_low = frame_high.mergeDynamicFrame(
    stage_dynamic_frame=frame_low, primary_keys=["id"]
)

# View the results where the ID is 1 or 20
print("Inspect the merged DynamicFrame that contains the combined rows")
merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
```

#### Output
<a name="mergeDynamicFrame-example-output"></a>

```
Inspect the DynamicFrame that contains rows where ID < 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
|  1|    0|                     fax|             202-225-3307|
|  1|    1|                   phone|             202-225-5731|
|  2|    0|                     fax|             202-225-3307|
|  2|    1|                   phone|             202-225-5731|
|  3|    0|                     fax|             202-225-3307|
|  3|    1|                   phone|             202-225-5731|
|  4|    0|                     fax|             202-225-3307|
|  4|    1|                   phone|             202-225-5731|
|  5|    0|                     fax|             202-225-3307|
|  5|    1|                   phone|             202-225-5731|
|  6|    0|                     fax|             202-225-3307|
|  6|    1|                   phone|             202-225-5731|
|  7|    0|                     fax|             202-225-3307|
|  7|    1|                   phone|             202-225-5731|
|  8|    0|                     fax|             202-225-3307|
|  8|    1|                   phone|             202-225-5731|
|  9|    0|                     fax|             202-225-3307|
|  9|    1|                   phone|             202-225-5731|
| 10|    0|                     fax|             202-225-6328|
| 10|    1|                   phone|             202-225-4576|
+---+-----+------------------------+-------------------------+
only showing top 20 rows

Inspect the DynamicFrame that contains rows where ID > 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 11|    0|                     fax|             202-225-6328|
| 11|    1|                   phone|             202-225-4576|
| 11|    2|                 twitter|           RepTrentFranks|
| 12|    0|                     fax|             202-225-6328|
| 12|    1|                   phone|             202-225-4576|
| 12|    2|                 twitter|           RepTrentFranks|
| 13|    0|                     fax|             202-225-6328|
| 13|    1|                   phone|             202-225-4576|
| 13|    2|                 twitter|           RepTrentFranks|
| 14|    0|                     fax|             202-225-6328|
| 14|    1|                   phone|             202-225-4576|
| 14|    2|                 twitter|           RepTrentFranks|
| 15|    0|                     fax|             202-225-6328|
| 15|    1|                   phone|             202-225-4576|
| 15|    2|                 twitter|           RepTrentFranks|
| 16|    0|                     fax|             202-225-6328|
| 16|    1|                   phone|             202-225-4576|
| 16|    2|                 twitter|           RepTrentFranks|
| 17|    0|                     fax|             202-225-6328|
| 17|    1|                   phone|             202-225-4576|
+---+-----+------------------------+-------------------------+
only showing top 20 rows

Inspect the merged DynamicFrame that contains the combined rows
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
|  1|    0|                     fax|             202-225-3307|
|  1|    1|                   phone|             202-225-5731|
| 20|    0|                     fax|             202-225-5604|
| 20|    1|                   phone|             202-225-6536|
| 20|    2|                 twitter|                USRepLong|
+---+-----+------------------------+-------------------------+
```

## relationalize
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-relationalize"></a>

**`relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

将 `DynamicFrame` 转换为适用于关系数据库的形式。将数据从 DynamoDB 等 NoSQL 环境移动到 MySQL 等关系数据库时，对 `DynamicFrame` 进行关系化尤其有用。

该转换将通过取消嵌套列的嵌套并透视数组列来生成帧列表。可使用在取消嵌套阶段生成的联接键将透视数组列联接到根表。
+ `root_table_name` – 根表的名称。
+ `staging_path` – 要将 CSV 格式的透视表分区存储到的路径（可选）。从该路径读取透视表。
+ `options` – 可选参数的词典。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 relatialize 在 `DynamicFrame` 中展平嵌套架构
<a name="pyspark-relationalize-example"></a>

此代码示例使用 `relationalize` 方法将嵌套架构展平为适用于关系数据库的形式。

**示例数据集**

该示例通过以下架构使用了名为 `legislators_combined` 的 `DynamicFrame`。`legislators_combined` 有多个嵌套字段，例如 `links`、`images` 和 `contact_details`，这些字段将通过 `relationalize` 转换进行展平。

```
root
|-- role: string
|-- seats: int
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- type: string
|-- sort_name: string
|-- area_id: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- on_behalf_of_id: string
|-- other_names: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- name: string
|    |    |-- lang: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- name: string
|-- birth_date: string
|-- organization_id: string
|-- gender: string
|-- classification: string
|-- legislative_period_id: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- image: string
|-- given_name: string
|-- start_date: string
|-- family_name: string
|-- id: string
|-- death_date: string
|-- end_date: string
```

**示例代码**

```
# Example: Use relationalize to flatten
# a nested schema into a format that fits
# into a relational database.
# Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Apply relationalize and inspect new tables
legislators_relationalized = legislators_combined.relationalize(
    "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir"
)
legislators_relationalized.keys()

# Compare the schema of the contact_details
# nested field to the new relationalized table that
# represents it
legislators_combined.select_fields("contact_details").printSchema()
legislators_relationalized.select("l_root_contact_details").toDF().where(
    "id = 10 or id = 75"
).orderBy(["id", "index"]).show()
```

#### Output
<a name="relationalize-example-output"></a>

以下输出可让您将名为 `contact_details` 的嵌套字段的架构与 `relationalize` 转换创建的表进行比较。请注意，表记录可使用名为 `id` 的外键和表示数组位置的 `index` 列链接回主表。

```
dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers'])

root
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string

+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 10|    0|                     fax|             202-225-4160|
| 10|    1|                   phone|             202-225-3436|
| 75|    0|                     fax|             202-225-6791|
| 75|    1|                   phone|             202-225-2861|
| 75|    2|                 twitter|               RepSamFarr|
+---+-----+------------------------+-------------------------+
```

## rename\$1field
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-rename_field"></a>

**`rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

重命名此 `DynamicFrame` 中的一个字段并返回包含该重命名字段的新的 `DynamicFrame`。
+ `oldName` – 要重命名的节点的完整路径。

  如果旧名称中包含点，则 `RenameField` 将不起作用，除非使用反引号（```）将其引起来。例如，要将 `this.old.name` 替换为 `thisNewName`，应按如下方式调用 rename\$1field。

  ```
  newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  ```
+ `newName` – 新名称，作为完整路径。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 rename\$1field 重命名 `DynamicFrame` 中的字段
<a name="pyspark-rename_field-example"></a>

此代码示例可使用 `rename_field` 方法重命名 `DynamicFrame` 中的字段。请注意，该示例可使用方法链同时重命名多个字段。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-legislators.md#aws-glue-programming-python-samples-legislators-crawling) 中的说明进行操作。

**示例代码**

```
# Example: Use rename_field to rename fields
# in a DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Inspect the original orgs schema
orgs = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="organizations_json"
)
print("Original orgs schema: ")
orgs.printSchema()

# Rename fields and view the new schema
orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name")
print("New orgs schema with renamed fields: ")
orgs.printSchema()
```

#### Output
<a name="rename_field-example-output"></a>

```
Original orgs schema: 
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- id: string
|-- classification: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string

New orgs schema with renamed fields: 
root
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- classification: string
|-- org_id: string
|-- org_name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- image: string
|-- seats: int
|-- type: string
```

## resolveChoice
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-resolveChoice"></a>

**`resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)`**

解析此 `DynamicFrame` 内的一个选择类型并返回新的 `DynamicFrame`。
+ `specs` – 要解析的特定歧义列表，每个歧义均采用元组形式：`(field_path, action)`。

  可通过两种方式使用 `resolveChoice`。第一种是使用 `specs` 参数指定一系列特定字段以及如何解析它们。`resolveChoice` 的另一种模式是使用 `choice` 参数为所有 `ChoiceTypes` 指定单个解析方法。

  `specs` 的值被指定为由 `(field_path, action)` 对组成的元组。`field_path` 值标识特定歧义元素，`action` 值标识相应解析。可能的操作如下：
  + `cast:type` – 尝试将所有值转换为指定的类型。例如：`cast:int`。
  + `make_cols` – 将每个不同的类型转换为名为 `columnName_type` 的列。通过展平数据来解析潜在的歧义。例如，如果 `columnA` 是 `int` 或 `string`，则解析就是在结果 `DynamicFrame` 中生成名为 `columnA_int` 和 `columnA_string` 的两个列。
  + `make_struct` – 使用 `struct` 表示数据，解析潜在的歧义。例如，如果某个列中的数据是 `int` 或 `string`，则使用 `make_struct` 操作会在结果 `DynamicFrame` 中生成结构列。每个结构都包含 `int` 和 `string`。
  + `project:type` – 将所有数据投影到可能的数据类型之一，解析潜在的歧义。例如，如果某个列中的数据是 `int` 或 `string`，则使用 `project:string` 操作会在结果 `DynamicFrame` 中生成一个列，其中所有 `int` 值都已转换为字符串。

  如果 `field_path` 识别到数组，则在数组名称后放置一个空的方括号可避免歧义。例如，假设您正在使用结构如下的数据：

  ```
  "myList": [
    { "price": 100.00 },
    { "price": "$100.00" }
  ]
  ```

  可以通过将 `field_path` 设置为 `"myList[].price"`、将 `action` 设置为 `"cast:double"` 来选择价格的数字而非字符串版本。
**注意**  
只能使用 `specs` 和 `choice` 参数之一。如果 `specs` 参数不为 `None`，则 `choice` 参数必须为空字符串。反过来，如果 `choice` 不为空字符串，则 `specs` 参数必须为 `None`。
+ `choice` – 为所有 `ChoiceTypes` 指定单个解析方法。这可以在运行前不知道 `ChoiceTypes` 的完整列表的情况下使用。除了之前为 `specs` 列出的操作外，此参数还支持以下操作：
  + `match_catalog` – 尝试将每个 `ChoiceType` 转换为指定数据目录表中的对应类型。
+ `database` – 与 `match_catalog` 操作一起使用的数据目录数据库。
+ `table_name` – 与 `match_catalog` 操作一起使用的数据目录表。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认为零，表示此过程应该不会出错。
+ `catalog_id` – 正在访问的数据目录的目录 ID（数据目录的账户 ID）。如果设置为 `None`（默认值），它使用调用账户的目录 ID。

### 示例：使用 resolveChoice 处理包含多种类型的列
<a name="pyspark-resolveChoice-example"></a>

此代码示例使用 `resolveChoice` 方法来指定如何处理包含多种类型值的 `DynamicFrame` 列。该示例演示了处理不同类型的列的两种常用方法：
+ 将该列转换为单一数据类型。
+ 在单独的列中保留所有类型。

**示例数据集**

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：使用 ResolveChoice、Lambda 和 ApplyMapping 进行数据准备](aws-glue-programming-python-samples-medicaid.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-medicaid.md#aws-glue-programming-python-samples-medicaid-crawling) 中的说明进行操作。

该示例通过以下架构使用了名为 `medicare` 的 `DynamicFrame`：

```
root
|-- drg definition: string
|-- provider id: choice
|    |-- long
|    |-- string
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string
```

**示例代码**

```
# Example: Use resolveChoice to handle
# a column that contains multiple types

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load the input data and inspect the "provider id" column
medicare = glueContext.create_dynamic_frame.from_catalog(
    database="payments", table_name="medicare_hospital_provider_csv"
)
print("Inspect the provider id column:")
medicare.toDF().select("provider id").show()

# Cast provider id to type long
medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")])
print("Schema after casting provider id to type long:")
medicare_resolved_long.printSchema()
medicare_resolved_long.toDF().select("provider id").show()

# Create separate columns
# for each provider id type
medicare_resolved_cols = medicare.resolveChoice(choice="make_cols")
print("Schema after creating separate columns for each type:")
medicare_resolved_cols.printSchema()
medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
```

#### Output
<a name="resolveChoice-example-output"></a>

```
Inspect the 'provider id' column:
+-----------+
|provider id|
+-----------+
|   [10001,]|
|   [10005,]|
|   [10006,]|
|   [10011,]|
|   [10016,]|
|   [10023,]|
|   [10029,]|
|   [10033,]|
|   [10039,]|
|   [10040,]|
|   [10046,]|
|   [10055,]|
|   [10056,]|
|   [10078,]|
|   [10083,]|
|   [10085,]|
|   [10090,]|
|   [10092,]|
|   [10100,]|
|   [10103,]|
+-----------+
only showing top 20 rows

Schema after casting 'provider id' to type long:
root
|-- drg definition: string
|-- provider id: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string

+-----------+
|provider id|
+-----------+
|      10001|
|      10005|
|      10006|
|      10011|
|      10016|
|      10023|
|      10029|
|      10033|
|      10039|
|      10040|
|      10046|
|      10055|
|      10056|
|      10078|
|      10083|
|      10085|
|      10090|
|      10092|
|      10100|
|      10103|
+-----------+
only showing top 20 rows

Schema after creating separate columns for each type:
root
|-- drg definition: string
|-- provider id_string: string
|-- provider id_long: long
|-- provider name: string
|-- provider street address: string
|-- provider city: string
|-- provider state: string
|-- provider zip code: long
|-- hospital referral region description: string
|-- total discharges: long
|-- average covered charges: string
|-- average total payments: string
|-- average medicare payments: string

+----------------+------------------+
|provider id_long|provider id_string|
+----------------+------------------+
|           10001|              null|
|           10005|              null|
|           10006|              null|
|           10011|              null|
|           10016|              null|
|           10023|              null|
|           10029|              null|
|           10033|              null|
|           10039|              null|
|           10040|              null|
|           10046|              null|
|           10055|              null|
|           10056|              null|
|           10078|              null|
|           10083|              null|
|           10085|              null|
|           10090|              null|
|           10092|              null|
|           10100|              null|
|           10103|              null|
+----------------+------------------+
only showing top 20 rows
```

## select\$1fields
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-select_fields"></a>

**`select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

返回包含选定字段的新 `DynamicFrame`。
+ `paths` – 字符串列表。每个字符串就是一个指向您要选择的顶层节点的路径。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 select\$1fields 以利用选定字段创建新 `DynamicFrame`
<a name="pyspark-select_fields-example"></a>

以下代码示例展示了如何使用 `select_fields` 方法以利用从现有 `DynamicFrame` 中选定的字段列表来创建新 `DynamicFrame`。

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-legislators.md#aws-glue-programming-python-samples-legislators-crawling) 中的说明进行操作。

```
# Example: Use select_fields to select specific fields from a DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create a DynamicFrame and view its schema
persons = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="persons_json"
)
print("Schema for the persons DynamicFrame:")
persons.printSchema()

# Create a new DynamicFrame with chosen fields
names = persons.select_fields(paths=["family_name", "given_name"])
print("Schema for the names DynamicFrame, created with select_fields:")
names.printSchema()
names.toDF().show()
```

#### Output
<a name="select_fields-example-output"></a>

```
Schema for the persons DynamicFrame:
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string

Schema for the names DynamicFrame:
root
|-- family_name: string
|-- given_name: string

+-----------+----------+
|family_name|given_name|
+-----------+----------+
|    Collins|   Michael|
|   Huizenga|      Bill|
|    Clawson|    Curtis|
|    Solomon|    Gerald|
|     Rigell|    Edward|
|      Crapo|   Michael|
|      Hutto|      Earl|
|      Ertel|     Allen|
|     Minish|    Joseph|
|    Andrews|    Robert|
|     Walden|      Greg|
|      Kazen|   Abraham|
|     Turner|   Michael|
|      Kolbe|     James|
|  Lowenthal|      Alan|
|    Capuano|   Michael|
|   Schrader|      Kurt|
|     Nadler|   Jerrold|
|     Graves|       Tom|
|   McMillan|      John|
+-----------+----------+
only showing top 20 rows
```

## simplify\$1ddb\$1json
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-simplify"></a>

**`simplify_ddb_json(): DynamicFrame`**

简化 `DynamicFrame` 中的嵌套列，其具体位于 DynamoDB JSON 结构中，并返回一个新的简化 `DynamicFrame`。如果 List 类型中有多种类型或 Map 类型，则不会简化 List 中的元素。请注意，这是一种特定类型的转换，其行为与常规 `unnest` 转换不同，并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息，请参阅 [DynamoDB JSON](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.Output.html#DataExport.Output.Data)。

例如，使用 DynamoDB JSON 结构读取导出的架构可能如下所示：

```
root
|-- Item: struct
|    |-- parentMap: struct
|    |    |-- M: struct
|    |    |    |-- childMap: struct
|    |    |    |    |-- M: struct
|    |    |    |    |    |-- appName: struct
|    |    |    |    |    |    |-- S: string
|    |    |    |    |    |-- packageName: struct
|    |    |    |    |    |    |-- S: string
|    |    |    |    |    |-- updatedAt: struct
|    |    |    |    |    |    |-- N: string
|    |-- strings: struct
|    |    |-- SS: array
|    |    |    |-- element: string
|    |-- numbers: struct
|    |    |-- NS: array
|    |    |    |-- element: string
|    |-- binaries: struct
|    |    |-- BS: array
|    |    |    |-- element: string
|    |-- isDDBJson: struct
|    |    |-- BOOL: boolean
|    |-- nullValue: struct
|    |    |-- NULL: boolean
```

`simplify_ddb_json()` 转换会将此转换为：

```
root
|-- parentMap: struct
|    |-- childMap: struct
|    |    |-- appName: string
|    |    |-- packageName: string
|    |    |-- updatedAt: string
|-- strings: array
|    |-- element: string
|-- numbers: array
|    |-- element: string
|-- binaries: array
|    |-- element: string
|-- isDDBJson: boolean
|-- nullValue: null
```

### 示例：使用 simplify\$1ddb\$1json 调用 DynamoDB JSON simplify 命令
<a name="pyspark-simplify-ddb-json-example"></a>

以下代码示例使用 `simplify_ddb_json` 方法来使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON simplify 命令，以及打印分区数量。

**示例代码**

```
from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext()
glueContext = GlueContext(sc)

dynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type = "dynamodb",
    connection_options = {
        'dynamodb.export': 'ddb',
        'dynamodb.tableArn': '<table arn>',
        'dynamodb.s3.bucket': '<bucket name>',
        'dynamodb.s3.prefix': '<bucket prefix>',
        'dynamodb.s3.bucketOwner': '<account_id of bucket>'
    }
)
simplified = dynamicFrame.simplify_ddb_json()
print(simplified.getNumPartitions())
```

## spigot
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-spigot"></a>

**`spigot(path, options={})`**

将示例记录写入指定的目标，以帮助您验证作业执行的转换。
+ `path` – 要写入的目标的路径（必需）。
+ `options` – 指定选项的键值对（可选）。`"topk"` 选项指定应写入第一条 `k` 记录。`"prob"` 选项指定选择任何给定记录的可能性（以十进制数字形式表示）。您可以在选择要写入的记录时使用它。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。

### 示例：使用 spigot 将 `DynamicFrame` 中的示例字段写入 Amazon S3
<a name="pyspark-spigot-example"></a>

此代码示例使用 `spigot` 方法在应用了 `select_fields` 转换后将示例记录写入 Amazon S3 存储桶。

**示例数据集**

**注意**  
要访问本示例中使用的数据集，请参阅 [代码示例：对数据进行联接和关系化](aws-glue-programming-python-samples-legislators.md) 并按照 [步骤 1：爬取 Amazon S3 存储桶中的数据](aws-glue-programming-python-samples-legislators.md#aws-glue-programming-python-samples-legislators-crawling) 中的说明进行操作。

该示例通过以下架构使用了名为 `persons` 的 `DynamicFrame`：

```
root
|-- family_name: string
|-- name: string
|-- links: array
|    |-- element: struct
|    |    |-- note: string
|    |    |-- url: string
|-- gender: string
|-- image: string
|-- identifiers: array
|    |-- element: struct
|    |    |-- scheme: string
|    |    |-- identifier: string
|-- other_names: array
|    |-- element: struct
|    |    |-- lang: string
|    |    |-- note: string
|    |    |-- name: string
|-- sort_name: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string
|-- given_name: string
|-- birth_date: string
|-- id: string
|-- contact_details: array
|    |-- element: struct
|    |    |-- type: string
|    |    |-- value: string
|-- death_date: string
```

**示例代码**

```
# Example: Use spigot to write sample records
# to a destination during a transformation
# from pyspark.context import SparkContext.
# Replace DOC-EXAMPLE-BUCKET with your own location.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load table data into a DynamicFrame
persons = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="persons_json"
)

# Perform the select_fields on the DynamicFrame
persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"])

# Use spigot to write a sample of the transformed data
# (the first 10 records)
spigot_output = persons.spigot(
    path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10}
)
# Example: Use spigot to write sample records
# to a destination during a transformation
# from pyspark.context import SparkContext.
# Replace DOC-EXAMPLE-BUCKET with your own location.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load table data into a DynamicFrame
persons = glueContext.create_dynamic_frame.from_catalog(
    database="legislators", table_name="persons_json"
)

# Perform the select_fields on the DynamicFrame
persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"])

# Use spigot to write a sample of the transformed data
# (the first 10 records)
spigot_output = persons.spigot(
    path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10}
)
```

#### Output
<a name="spigot-example-output"></a>

以下是 `spigot` 写入 Amazon S3 的数据示例。由于指定了示例代码 `options={"topk": 10}`，因此示例数据包含前 10 条记录。

```
{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"}
{"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"}
{"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"}
{"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"}
{"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"}
{"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"}
{"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"}
{"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"}
{"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"}
{"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}
```

## split\$1fields
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_fields"></a>

**`split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

返回一个新的 `DynamicFrameCollection`，其中包含两个 `DynamicFrames`。第一个 `DynamicFrame` 包含所有已拆分的节点，第二个包含其余节点。
+ `paths` – 字符串列表，每个字符串是到一个您要拆分为新的 `DynamicFrame` 的节点的完整路径。
+ `name1` – 拆分的 `DynamicFrame` 的名称字符串。
+ `name2` – 指定节点拆分后留存的 `DynamicFrame` 的名称字符串。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 split\$1fields 将选定字段拆分为单独的 `DynamicFrame`
<a name="pyspark-split_fields-example"></a>

此代码示例使用 `split_fields` 方法将指定字段列表拆分为单独的 `DynamicFrame`。

**示例数据集**

该示例使用了集合 `legislators_relationalized` 中名为 `l_root_contact_details` 的 `DynamicFrame`。

`l_root_contact_details` 有以下架构和条目。

```
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string

+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
|  1|    0|                   phone|             202-225-5265|
|  1|    1|                 twitter|              kathyhochul|
|  2|    0|                   phone|             202-225-3252|
|  2|    1|                 twitter|            repjackyrosen|
|  3|    0|                     fax|             202-225-1314|
|  3|    1|                   phone|             202-225-3772|
...
```

**示例代码**

```
# Example: Use split_fields to split selected
# fields into a separate DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Load the input DynamicFrame and inspect its schema
frame_to_split = legislators_relationalized.select("l_root_contact_details")
print("Inspect the input DynamicFrame schema:")
frame_to_split.printSchema()

# Split id and index fields into a separate DynamicFrame
split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right")

# Inspect the resulting DynamicFrames
print("Inspect the schemas of the DynamicFrames created with split_fields:")
split_fields_collection.select("left").printSchema()
split_fields_collection.select("right").printSchema()
```

#### Output
<a name="split_fields-example-output"></a>

```
Inspect the input DynamicFrame's schema:
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string

Inspect the schemas of the DynamicFrames created with split_fields:
root
|-- id: long
|-- index: int

root
|-- contact_details.val.type: string
|-- contact_details.val.value: string
```

## split\$1rows
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-split_rows"></a>

**`split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

将 `DynamicFrame` 中的一个或多个行拆分到新的 `DynamicFrame` 中。

该方法可返回一个新的 `DynamicFrameCollection`，其中包含两个 `DynamicFrames`。第一个 `DynamicFrame` 包含所有已拆分的行，第二个包含其余行。
+ `comparison_dict` – 一个词典，其中的键是到一个列的路径，值是另一个词典，用于将比较器映射到与该列值所比较的值。例如，`{"age": {">": 10, "<": 20}}` 拆分其年龄列中的值大于 10 但小于 20 的行。
+ `name1` – 拆分的 `DynamicFrame` 的名称字符串。
+ `name2` – 指定节点拆分后留存的 `DynamicFrame` 的名称字符串。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 split\$1rows 拆分 `DynamicFrame` 中的行
<a name="pyspark-split_rows-example"></a>

此代码示例使用 `split_rows` 方法根据 `id` 字段值拆分 `DynamicFrame` 中的行。

**示例数据集**

该示例使用了从集合 `legislators_relationalized` 中选择的名为 `l_root_contact_details` 的 `DynamicFrame`。

`l_root_contact_details` 有以下架构和条目。

```
root
|-- id: long
|-- index: int
|-- contact_details.val.type: string
|-- contact_details.val.value: string

+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
|  1|    0|                   phone|             202-225-5265|
|  1|    1|                 twitter|              kathyhochul|
|  2|    0|                   phone|             202-225-3252|
|  2|    1|                 twitter|            repjackyrosen|
|  3|    0|                     fax|             202-225-1314|
|  3|    1|                   phone|             202-225-3772|
|  3|    2|                 twitter|          MikeRossUpdates|
|  4|    0|                     fax|             202-225-1314|
|  4|    1|                   phone|             202-225-3772|
|  4|    2|                 twitter|          MikeRossUpdates|
|  5|    0|                     fax|             202-225-1314|
|  5|    1|                   phone|             202-225-3772|
|  5|    2|                 twitter|          MikeRossUpdates|
|  6|    0|                     fax|             202-225-1314|
|  6|    1|                   phone|             202-225-3772|
|  6|    2|                 twitter|          MikeRossUpdates|
|  7|    0|                     fax|             202-225-1314|
|  7|    1|                   phone|             202-225-3772|
|  7|    2|                 twitter|          MikeRossUpdates|
|  8|    0|                     fax|             202-225-1314|
+---+-----+------------------------+-------------------------+
```

**示例代码**

```
# Example: Use split_rows to split up 
# rows in a DynamicFrame based on value

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Retrieve the DynamicFrame to split
frame_to_split = legislators_relationalized.select("l_root_contact_details")

# Split up rows by ID
split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low")

# Inspect the resulting DynamicFrames
print("Inspect the DynamicFrame that contains IDs < 10")
split_rows_collection.select("low").toDF().show()
print("Inspect the DynamicFrame that contains IDs > 10")
split_rows_collection.select("high").toDF().show()
```

#### Output
<a name="split_rows-example-output"></a>

```
Inspect the DynamicFrame that contains IDs < 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
|  1|    0|                   phone|             202-225-5265|
|  1|    1|                 twitter|              kathyhochul|
|  2|    0|                   phone|             202-225-3252|
|  2|    1|                 twitter|            repjackyrosen|
|  3|    0|                     fax|             202-225-1314|
|  3|    1|                   phone|             202-225-3772|
|  3|    2|                 twitter|          MikeRossUpdates|
|  4|    0|                     fax|             202-225-1314|
|  4|    1|                   phone|             202-225-3772|
|  4|    2|                 twitter|          MikeRossUpdates|
|  5|    0|                     fax|             202-225-1314|
|  5|    1|                   phone|             202-225-3772|
|  5|    2|                 twitter|          MikeRossUpdates|
|  6|    0|                     fax|             202-225-1314|
|  6|    1|                   phone|             202-225-3772|
|  6|    2|                 twitter|          MikeRossUpdates|
|  7|    0|                     fax|             202-225-1314|
|  7|    1|                   phone|             202-225-3772|
|  7|    2|                 twitter|          MikeRossUpdates|
|  8|    0|                     fax|             202-225-1314|
+---+-----+------------------------+-------------------------+
only showing top 20 rows

Inspect the DynamicFrame that contains IDs > 10
+---+-----+------------------------+-------------------------+
| id|index|contact_details.val.type|contact_details.val.value|
+---+-----+------------------------+-------------------------+
| 11|    0|                   phone|             202-225-5476|
| 11|    1|                 twitter|            RepDavidYoung|
| 12|    0|                   phone|             202-225-4035|
| 12|    1|                 twitter|           RepStephMurphy|
| 13|    0|                     fax|             202-226-0774|
| 13|    1|                   phone|             202-225-6335|
| 14|    0|                     fax|             202-226-0774|
| 14|    1|                   phone|             202-225-6335|
| 15|    0|                     fax|             202-226-0774|
| 15|    1|                   phone|             202-225-6335|
| 16|    0|                     fax|             202-226-0774|
| 16|    1|                   phone|             202-225-6335|
| 17|    0|                     fax|             202-226-0774|
| 17|    1|                   phone|             202-225-6335|
| 18|    0|                     fax|             202-226-0774|
| 18|    1|                   phone|             202-225-6335|
| 19|    0|                     fax|             202-226-0774|
| 19|    1|                   phone|             202-225-6335|
| 20|    0|                     fax|             202-226-0774|
| 20|    1|                   phone|             202-225-6335|
+---+-----+------------------------+-------------------------+
only showing top 20 rows
```

## unbox
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unbox"></a>

**`unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)`**

取消装箱（重新格式化）`DynamicFrame` 中的一个字符串字段并返回包含已取消装箱 `DynamicRecords` 的新的 `DynamicFrame`。

一个 `DynamicRecord` 表示 `DynamicFrame` 中的一条逻辑记录。它类似于 Apache Spark `DataFrame` 中的一行，但其具有自描述性，可用于不符合固定架构的数据。
+ `path` – 要取消装箱的字符串节点的完整路径。
+ `format` – 格式规范（可选）。您可将其用于 Amazon S3 或支持多种格式的 AWS Glue 连接。相关受支持的格式，请参阅 [AWS Glue for Spark 中的输入和输出的数据格式选项](aws-glue-programming-etl-format.md)。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `options` – 下列一个或多个：
  + `separator` – 包含分隔符字符的字符串。
  + `escaper` – 包含转义字符的字符串。
  + `skipFirst` – 指示是否跳过第一个实例的布尔值。
  + `withSchema` - 包含节点架构的 JSON 表示形式的字符串。架构的 JSON 表示格式由 `StructType.json()` 的输出定义。
  + `withHeader` – 指示是否包括标头的布尔值。

### 示例：使用 unbox 将字符串字段拆开为结构
<a name="pyspark-unbox-example"></a>

此代码示例使用 `unbox` 方法将 `DynamicFrame` 中的字符串字段*拆开*或重新格式化为结构类型的字段。

**示例数据集**

该示例通过以下架构和条目使用了名为 `DynamicFrame` 的 `mapped_with_string`。

请注意名为 `AddressString` 的字段。这是拆开为结构示例中的字段。

```
root
|-- Average Total Payments: string
|-- AddressString: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
|    |-- Zip.Code: string
|    |-- City: string
|    |-- Array: array
|    |    |-- element: string
|    |-- State: string
|    |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string

+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|Average Total Payments|       AddressString|Average Covered Charges|      DRG Definition|Average Medicare Payments|Hospital Referral Region Description|             Address|Provider Id|Total Discharges|       Provider Name|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|              $5777.24|{"Street": "1108 ...|              $32963.07|039 - EXTRACRANIA...|                 $4763.73|                         AL - Dothan|[36301, DOTHAN, [...|      10001|              91|SOUTHEAST ALABAMA...|
|              $5787.57|{"Street": "2505 ...|              $15131.85|039 - EXTRACRANIA...|                 $4976.71|                     AL - Birmingham|[35957, BOAZ, [25...|      10005|              14|MARSHALL MEDICAL ...|
|              $5434.95|{"Street": "205 M...|              $37560.37|039 - EXTRACRANIA...|                 $4453.79|                     AL - Birmingham|[35631, FLORENCE,...|      10006|              24|ELIZA COFFEE MEMO...|
|              $5417.56|{"Street": "50 ME...|              $13998.28|039 - EXTRACRANIA...|                 $4129.16|                     AL - Birmingham|[35235, BIRMINGHA...|      10011|              25|   ST VINCENT'S EAST|
...
```

**示例代码**

```
# Example: Use unbox to unbox a string field
# into a struct in a DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

unboxed = mapped_with_string.unbox("AddressString", "json")
unboxed.printSchema()
unboxed.toDF().show()
```

#### Output
<a name="unbox-example-output"></a>

```
root
|-- Average Total Payments: string
|-- AddressString: struct
|    |-- Street: string
|    |-- City: string
|    |-- State: string
|    |-- Zip.Code: string
|    |-- Array: array
|    |    |-- element: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
|    |-- Zip.Code: string
|    |-- City: string
|    |-- Array: array
|    |    |-- element: string
|    |-- State: string
|    |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string

+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|Average Total Payments|       AddressString|Average Covered Charges|      DRG Definition|Average Medicare Payments|Hospital Referral Region Description|             Address|Provider Id|Total Discharges|       Provider Name|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
|              $5777.24|[1108 ROSS CLARK ...|              $32963.07|039 - EXTRACRANIA...|                 $4763.73|                         AL - Dothan|[36301, DOTHAN, [...|      10001|              91|SOUTHEAST ALABAMA...|
|              $5787.57|[2505 U S HIGHWAY...|              $15131.85|039 - EXTRACRANIA...|                 $4976.71|                     AL - Birmingham|[35957, BOAZ, [25...|      10005|              14|MARSHALL MEDICAL ...|
|              $5434.95|[205 MARENGO STRE...|              $37560.37|039 - EXTRACRANIA...|                 $4453.79|                     AL - Birmingham|[35631, FLORENCE,...|      10006|              24|ELIZA COFFEE MEMO...|
|              $5417.56|[50 MEDICAL PARK ...|              $13998.28|039 - EXTRACRANIA...|                 $4129.16|                     AL - Birmingham|[35235, BIRMINGHA...|      10011|              25|   ST VINCENT'S EAST|
|              $5658.33|[1000 FIRST STREE...|              $31633.27|039 - EXTRACRANIA...|                 $4851.44|                     AL - Birmingham|[35007, ALABASTER...|      10016|              18|SHELBY BAPTIST ME...|
|              $6653.80|[2105 EAST SOUTH ...|              $16920.79|039 - EXTRACRANIA...|                 $5374.14|                     AL - Montgomery|[36116, MONTGOMER...|      10023|              67|BAPTIST MEDICAL C...|
|              $5834.74|[2000 PEPPERELL P...|              $11977.13|039 - EXTRACRANIA...|                 $4761.41|                     AL - Birmingham|[36801, OPELIKA, ...|      10029|              51|EAST ALABAMA MEDI...|
|              $8031.12|[619 SOUTH 19TH S...|              $35841.09|039 - EXTRACRANIA...|                 $5858.50|                     AL - Birmingham|[35233, BIRMINGHA...|      10033|              32|UNIVERSITY OF ALA...|
|              $6113.38|[101 SIVLEY RD, H...|              $28523.39|039 - EXTRACRANIA...|                 $5228.40|                     AL - Huntsville|[35801, HUNTSVILL...|      10039|             135| HUNTSVILLE HOSPITAL|
|              $5541.05|[1007 GOODYEAR AV...|              $75233.38|039 - EXTRACRANIA...|                 $4386.94|                     AL - Birmingham|[35903, GADSDEN, ...|      10040|              34|GADSDEN REGIONAL ...|
|              $5461.57|[600 SOUTH THIRD ...|              $67327.92|039 - EXTRACRANIA...|                 $4493.57|                     AL - Birmingham|[35901, GADSDEN, ...|      10046|              14|RIVERVIEW REGIONA...|
|              $5356.28|[4370 WEST MAIN S...|              $39607.28|039 - EXTRACRANIA...|                 $4408.20|                         AL - Dothan|[36305, DOTHAN, [...|      10055|              45|    FLOWERS HOSPITAL|
|              $5374.65|[810 ST VINCENT'S...|              $22862.23|039 - EXTRACRANIA...|                 $4186.02|                     AL - Birmingham|[35205, BIRMINGHA...|      10056|              43|ST VINCENT'S BIRM...|
|              $5366.23|[400 EAST 10TH ST...|              $31110.85|039 - EXTRACRANIA...|                 $4376.23|                     AL - Birmingham|[36207, ANNISTON,...|      10078|              21|NORTHEAST ALABAMA...|
|              $5282.93|[1613 NORTH MCKEN...|              $25411.33|039 - EXTRACRANIA...|                 $4383.73|                         AL - Mobile|[36535, FOLEY, [1...|      10083|              15|SOUTH BALDWIN REG...|
|              $5676.55|[1201 7TH STREET ...|               $9234.51|039 - EXTRACRANIA...|                 $4509.11|                     AL - Huntsville|[35609, DECATUR, ...|      10085|              27|DECATUR GENERAL H...|
|              $5930.11|[6801 AIRPORT BOU...|              $15895.85|039 - EXTRACRANIA...|                 $3972.85|                         AL - Mobile|[36608, MOBILE, [...|      10090|              27| PROVIDENCE HOSPITAL|
|              $6192.54|[809 UNIVERSITY B...|              $19721.16|039 - EXTRACRANIA...|                 $5179.38|                     AL - Tuscaloosa|[35401, TUSCALOOS...|      10092|              31|D C H REGIONAL ME...|
|              $4968.00|[750 MORPHY AVENU...|              $10710.88|039 - EXTRACRANIA...|                 $3898.88|                         AL - Mobile|[36532, FAIRHOPE,...|      10100|              18|     THOMAS HOSPITAL|
|              $5996.00|[701 PRINCETON AV...|              $51343.75|039 - EXTRACRANIA...|                 $4962.45|                     AL - Birmingham|[35211, BIRMINGHA...|      10103|              33|BAPTIST MEDICAL C...|
+----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+
only showing top 20 rows
```

## 联合
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-union"></a>

**`union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)`**

联合两个 DynamicFrame。返回 DynamicFrame，其中包含来自两个输入 DynamicFrame 的所有记录。这种转换可能会从两个 DataFrame 与等效数据的合并中返回不同的结果。如果您需要 Spark DataFrame 联合行为，可以考虑使用 `toDF`。
+ `frame1` – 第一个要联合的 DynamicFrame。
+ `frame2` – 第二个要联合的 DynamicFrame。
+ `transformation_ctx` –（可选）用于标识统计信息/状态信息的唯一字符串 
+ `info` –（可选）与转换中的错误关联的任何字符串 
+ `stageThreshold` –（可选）在处理出错之前转换中出现的最大错误数 
+ `totalThreshold` –（可选）在处理出错之前出现的最大总错误数 

## unnest
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unnest"></a>

**`unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**

取消嵌套 `DynamicFrame` 中的嵌套对象，使其成为顶级对象，并返回新的取消嵌套的 `DynamicFrame`。
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数（可选）。默认值为零，则表示进程不应出错。

### 示例：使用 unnest 将嵌套字段转换为顶级字段
<a name="pyspark-unnest-example"></a>

此代码示例使用 `unnest` 方法将 `DynamicFrame` 中的所有嵌套字段展平为顶级字段。

**示例数据集**

该示例通过以下架构使用了名为 `mapped_medicare` 的 `DynamicFrame`。请注意，`Address` 字段是唯一包含嵌套数据的字段。

```
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address: struct
|    |-- Zip.Code: string
|    |-- City: string
|    |-- Array: array
|    |    |-- element: string
|    |-- State: string
|    |-- Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
```

**示例代码**

```
# Example: Use unnest to unnest nested
# objects in a DynamicFrame

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Unnest all nested fields
unnested = mapped_medicare.unnest()
unnested.printSchema()
```

#### Output
<a name="unnest-example-output"></a>

```
root
|-- Average Total Payments: string
|-- Average Covered Charges: string
|-- DRG Definition: string
|-- Average Medicare Payments: string
|-- Hospital Referral Region Description: string
|-- Address.Zip.Code: string
|-- Address.City: string
|-- Address.Array: array
|    |-- element: string
|-- Address.State: string
|-- Address.Street: string
|-- Provider Id: string
|-- Total Discharges: string
|-- Provider Name: string
```

## unnest\$1ddb\$1json
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-unnest_ddb_json"></a>

解除 `DynamicFrame` 中的嵌套列，其具体位于 DynamoDB JSON 结构中，并返回一个新的非嵌套 `DynamicFrame`。属于结构类型数组的列将不会被解除嵌套。请注意，这是一种特定类型的非嵌套转换，其行为与常规 `unnest` 转换不同，并且要求数据已存在于 DynamoDB JSON 结构中。有关更多信息，请参阅 [DynamoDB JSON](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DataExport.Output.html#DataExport.Output.Data)。

**`unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)`**
+ `transformation_ctx` – 用于标识状态信息的唯一字符串 (可选)。
+ `info` – 与此转换的错误报告关联的字符串 (可选)。
+ `stageThreshold` – 此转换过程中遇到的将导致过程出错的错误数 (可选，默认为零，表示此过程应该不会出错)。
+ `totalThreshold` – 此转换之前及转换过程中遇到的将导致过程出错的错误数 (可选，默认为零，表示此过程应该不会出错)。

例如，使用 DynamoDB JSON 结构读取导出的架构可能如下所示：

```
root
|-- Item: struct
|    |-- ColA: struct
|    |    |-- S: string
|    |-- ColB: struct
|    |    |-- S: string
|    |-- ColC: struct
|    |    |-- N: string
|    |-- ColD: struct
|    |    |-- L: array
|    |    |    |-- element: null
```

`unnest_ddb_json()` 转换会将此转换为：

```
root
|-- ColA: string
|-- ColB: string
|-- ColC: string
|-- ColD: array    
|    |-- element: null
```

以下代码示例演示了如何使用 AWS Glue DynamoDB 导出连接器、调用 DynamoDB JSON 解除嵌套命令，以及打印分区数量：

```
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
glue_context= GlueContext(SparkContext.getOrCreate())
job = Job(glue_context)
job.init(args["JOB_NAME"], args)

dynamicFrame = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "<test_source>",
        "dynamodb.s3.bucket": "<bucket name>",
        "dynamodb.s3.prefix": "<bucket prefix>",
        "dynamodb.s3.bucketOwner": "<account_id>",
    }
)
unnested = dynamicFrame.unnest_ddb_json()
print(unnested.getNumPartitions())

job.commit()
```

## write
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-write"></a>

**`write(connection_type, connection_options, format, format_options, accumulator_size)`**

从此 `DynamicFrame` 的 [GlueContext 类](aws-glue-api-crawler-pyspark-extensions-glue-context.md) 获得指定连接类型的 [DataSink(object)](aws-glue-api-crawler-pyspark-extensions-types.md#aws-glue-api-crawler-pyspark-extensions-types-awsglue-data-sink) 并将其用于格式化和写入此 `DynamicFrame` 的内容。返回按指定进行格式化和写入的新的 `DynamicFrame`。
+ `connection_type` – 要使用的连接类型。有效值包括 `s3`、`mysql`、`postgresql`、`redshift`、`sqlserver` 和 `oracle`。
+ `connection_options` – 要使用的连接选项（可选）。对于 `connection_type` 的 `s3`，将会定义 Amazon S3 路径。

  ```
  connection_options = {"path": "s3://aws-glue-target/temp"}
  ```

  对于 JDBC 连接，必须定义多个属性。请注意，数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
**警告**  
不建议在脚本中存储密码。请考虑使用 `boto3` 从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。

  ```
  connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"} 
  ```
+ `format` – 格式规范（可选）。这用于 Amazon Simple Storage Service（Amazon S3）或支持多种格式的 AWS Glue 连接。有关支持的格式，请参阅 [AWS Glue for Spark 中的输入和输出的数据格式选项](aws-glue-programming-etl-format.md)。
+ `format_options` – 指定格式的格式选项。有关支持的格式，请参阅 [AWS Glue for Spark 中的输入和输出的数据格式选项](aws-glue-programming-etl-format.md)。
+ `accumulator_size` - 要使用的可累积大小（以字节为单位）（可选)。

##  – 错误 –
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-_errors"></a>
+ [assertErrorThreshold](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-assertErrorThreshold)
+ [errorsAsDynamicFrame](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-errorsAsDynamicFrame)
+ [errorsCount](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-errorsCount)
+ [stageErrorsCount](#aws-glue-api-crawler-pyspark-extensions-dynamic-frame-stageErrorsCount)

## assertErrorThreshold
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-assertErrorThreshold"></a>

`assertErrorThreshold( )` – 创建此 `DynamicFrame` 的转换中的错误的资产。从底层 `DataFrame` 返回 `Exception`。

## errorsAsDynamicFrame
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-errorsAsDynamicFrame"></a>

`errorsAsDynamicFrame( )` – 返回其中包含嵌套的错误记录的 `DynamicFrame`。

### 示例：使用 errorsAsDynamicFrame 查看错误记录
<a name="pyspark-errorsAsDynamicFrame-example"></a>

以下代码示例展示了如何使用 `errorsAsDynamicFrame` 方法查看 `DynamicFrame` 的错误记录。

**示例数据集**

该示例使用以下数据集，您可以将以下数据集作为 JSON 上传到 Amazon S3。请注意，第二条记录的格式有误。当您使用 SparkSQL 时，格式错误的数据通常会中断文件解析。但是，`DynamicFrame` 会识别格式错误的问题，并将格式错误的行转换为您能单独处理的错误记录。

```
{"id": 1, "name": "george", "surname": "washington", "height": 178}
{"id": 2, "name": "benjamin", "surname": "franklin", 
{"id": 3, "name": "alexander", "surname": "hamilton", "height": 171}
{"id": 4, "name": "john", "surname": "jay", "height": 190}
```

**示例代码**

```
# Example: Use errorsAsDynamicFrame to view error records.
# Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

# Create GlueContext
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

# Create errors DynamicFrame, view schema
errors = glueContext.create_dynamic_frame.from_options(
    "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json"
)
print("Schema of errors DynamicFrame:")
errors.printSchema()

# Show that errors only contains valid entries from the dataset
print("errors contains only valid records from the input dataset (2 of 4 records)")
errors.toDF().show()

# View errors
print("Errors count:", str(errors.errorsCount()))
print("Errors:")
errors.errorsAsDynamicFrame().toDF().show()

# View error fields and error data
error_record = errors.errorsAsDynamicFrame().toDF().head()

error_fields = error_record["error"]
print("Error fields: ")
print(error_fields.asDict().keys())

print("\nError record data:")
for key in error_fields.asDict().keys():
    print("\n", key, ": ", str(error_fields[key]))
```

#### Output
<a name="errorsAsDynamicFrame-example-output"></a>

```
Schema of errors DynamicFrame:
root
|-- id: int
|-- name: string
|-- surname: string
|-- height: int

errors contains only valid records from the input dataset (2 of 4 records)
+---+------+----------+------+
| id|  name|   surname|height|
+---+------+----------+------+
|  1|george|washington|   178|
|  4|  john|       jay|   190|
+---+------+----------+------+

Errors count: 1
Errors:
+--------------------+
|               error|
+--------------------+
|[[  File "/tmp/20...|
+--------------------+

Error fields: 
dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord'])

Error record data:

 callsite :  Row(site='  File "/tmp/2060612586885849088", line 549, in <module>\n    sys.exit(main())\n  File "/tmp/2060612586885849088", line 523, in main\n    response = handler(content)\n  File "/tmp/2060612586885849088", line 197, in execute_request\n    result = node.execute()\n  File "/tmp/2060612586885849088", line 103, in execute\n    exec(code, global_dict)\n  File "<stdin>", line 10, in <module>\n  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n    format_options, transformation_ctx, push_down_predicate, **kwargs)\n  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n    source.setFormat(format, **format_options)\n', info='')

 msg :  error in jackson reader

 stackTrace :  com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name
 at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650)
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740)
	at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57)
	at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57)
	at scala.collection.Iterator$$anon$9.next(Iterator.scala:162)
	at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599)
	at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120)
	at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116)
	at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209)
	at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202)
	at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116)
	at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109)
	at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247)
	at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


 input :  

 bytesread :  252

 source :  

 dynamicRecord :  Row(id=2, name='benjamin', surname='franklin')
```

## DynamicFrame 综合示例
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-comprehensive-examples"></a>

以下示例演示了在基本 Glue 目录场景之外创建和使用 DynamicFrames 的各种方法。

### 使用 SQL SELECT 查询从 PostgreSQL 加载
<a name="dynamicframe-postgresql-example"></a>

此示例显示了如何使用自定义 SQL SELECT 查询从 PostgreSQL 数据库加载数据：

```
from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

# Load specific data from PostgreSQL with custom query
postgres_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": "jdbc:postgresql://your-postgres-host:5432/your-database",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "(SELECT customer_id, customer_name, email FROM customers WHERE active = true) AS filtered_customers"
    }
)
```

### 加载特定列避免全表扫描
<a name="dynamicframe-column-selection-example"></a>

此示例演示了如何从大型数据库表中仅加载特定列：

```
from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

# Load only specific columns from a large table
selected_columns_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "url": "jdbc:mysql://your-mysql-host:3306/your-database",
        "user": "your-username", 
        "password": "your-password",
        "dbtable": "(SELECT order_id, customer_id FROM large_orders_table) AS selected_data"
    }
)

# Alternative approach using column selection in query
efficient_load_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": "jdbc:postgresql://your-postgres-host:5432/your-database",
        "user": "your-username",
        "password": "your-password", 
        "query": "SELECT product_id, product_name FROM products WHERE category = 'electronics'"
    }
)
```

### 通过 JDBC 连接进行行级筛选
<a name="dynamicframe-row-filtering-example"></a>

此示例显示了如何使用行级筛选从数据库表中仅加载特定行：

```
from awsglue.context import GlueContext
from pyspark.context import SparkContext

sc = SparkContext()
glueContext = GlueContext(sc)

# Load filtered rows using WHERE clause
filtered_rows_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": "jdbc:postgresql://your-postgres-host:5432/your-database",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "(SELECT * FROM transactions WHERE transaction_date >= '2024-01-01' AND amount > 100) AS recent_large_transactions"
    }
)

# Using partitionColumn for parallel loading with filtering
partitioned_load_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "url": "jdbc:mysql://your-mysql-host:3306/your-database",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "sales_data",
        "partitionColumn": "sale_date",
        "lowerBound": "2024-01-01",
        "upperBound": "2024-12-31",
        "numPartitions": "10"
    }
)
```

### 从内存中的 Python 数据创建 DynamicFrame
<a name="dynamicframe-in-memory-example"></a>

此示例演示了如何从 Python 列表、元组或字典创建 DynamicFrame：

```
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from pyspark.sql import Row

sc = SparkContext()
glueContext = GlueContext(sc)

# Method 1: From list of tuples
data_tuples = [
    ("John", "Doe", 30, "Engineer"),
    ("Jane", "Smith", 25, "Designer"), 
    ("Bob", "Johnson", 35, "Manager")
]

# Convert to RDD of Rows
rdd = sc.parallelize([Row(first_name=row[0], last_name=row[1], age=row[2], job=row[3]) for row in data_tuples])
df = glueContext.spark_session.createDataFrame(rdd)
dyf_from_tuples = DynamicFrame.fromDF(df, glueContext, "employees_from_tuples")

# Method 2: From list of dictionaries
data_dicts = [
    {"product_id": 1, "product_name": "Laptop", "price": 999.99, "category": "Electronics"},
    {"product_id": 2, "product_name": "Book", "price": 19.99, "category": "Education"},
    {"product_id": 3, "product_name": "Chair", "price": 149.99, "category": "Furniture"}
]

df_from_dicts = glueContext.spark_session.createDataFrame(data_dicts)
dyf_from_dicts = DynamicFrame.fromDF(df_from_dicts, glueContext, "products_from_dicts")

# Method 3: From nested data structures
nested_data = [
    {
        "customer_id": 1,
        "customer_info": {
            "name": "Alice Brown",
            "email": "alice@example.com"
        },
        "orders": [
            {"order_id": 101, "amount": 250.00},
            {"order_id": 102, "amount": 175.50}
        ]
    }
]

df_nested = glueContext.spark_session.createDataFrame(nested_data)
dyf_nested = DynamicFrame.fromDF(df_nested, glueContext, "customers_with_orders")
```

### 大型数据集性能优化
<a name="dynamicframe-performance-tips"></a>

处理大型数据集时，请考虑以下性能优化技术：

```
# Use partitioning for parallel reads
large_table_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "url": "jdbc:postgresql://your-postgres-host:5432/your-database",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "large_table",
        "partitionColumn": "id",
        "lowerBound": "1",
        "upperBound": "1000000", 
        "numPartitions": "20"
    }
)

# Use pushdown predicates to filter at source
filtered_dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "url": "jdbc:mysql://your-mysql-host:3306/your-database",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "transactions"
    },
    push_down_predicate="transaction_date >= '2024-01-01'"
)
```

## errorsCount
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-errorsCount"></a>

`errorsCount( )` – 返回 `DynamicFrame` 中的错误总数。

## stageErrorsCount
<a name="aws-glue-api-crawler-pyspark-extensions-dynamic-frame-stageErrorsCount"></a>

`stageErrorsCount` – 返回生成此 `DynamicFrame` 的过程中发生的错误数。