

# 在 AWS Glue 中使用 CSV 格式
<a name="aws-glue-programming-etl-format-csv-home"></a>

AWS Glue 从源中检索数据，并将数据写入以各种数据格式存储和传输的目标。如果您的数据以 CSV 数据格式存储或传输，本文档将向您介绍供您使用 AWS Glue 中的数据的可用功能。

 AWS Glue 支持逗号分隔值（CSV）格式。此格式是一种基于行的最小数据格式。CSV 通常不严格遵守某一标准，但您可以参考 [RFC 4180](https://tools.ietf.org/html/rfc4180) 和 [RFC 7111](https://tools.ietf.org/html/rfc7111) 了解更多信息。

您可以使用 AWS Glue 从 Amazon S3 和流式传输源读取 CSV，以及将 CSV 写入 Amazon S3。您可以读取并写入包含 S3 中的 CSV 文件的 `bzip` 和 `gzip` 存档。请在 [S3 连接参数](aws-glue-programming-etl-connect-s3-home.md#aws-glue-programming-etl-connect-s3) 上而非本页中讨论的配置中配置压缩行为。

下表显示了哪些常用 AWS Glue 功能支持 CSV 格式选项。


| 读取 | 写入 | 流式处理读取 | 对小文件进行分组 | 作业书签 | 
| --- | --- | --- | --- | --- | 
| 支持 | 支持 | 支持 | 支持 | 支持 | 

## 示例：从 S3 读取 CSV 文件或文件夹
<a name="aws-glue-programming-etl-format-csv-read"></a>

 **先决条件：**您将需要至您想要读取的 CSV 文件或文件夹的 S3 路径（`s3path`）。

 **配置：**在函数选项中，请指定 `format="csv"`。在您的 `connection_options` 中，请使用 `paths` 键指定 `s3path`。您可以在 `connection_options` 中配置读取器与 S3 的交互方式。有关详细信息，请参阅 AWS Glue 中 ETL 的连接类型和选项：[S3 连接参数](aws-glue-programming-etl-connect-s3-home.md#aws-glue-programming-etl-connect-s3)。您可以配置读取器如何解释 `format_options` 中的 CSV 文件。有关详细信息，请参阅 [CSV 配置参考](#aws-glue-programming-etl-format-csv-reference)。

以下 AWS Glue ETL 脚本显示了从 S3 读取 CSV 文件或文件夹的过程。

 我们提供自定义的 CSV 读取器，其中包含通过 `optimizePerformance` 配置键针对常见工作流进行的性能优化。要确定此读取器是否适合您的工作负载，请参阅 [使用向量化 SIMD CSV 读取器优化读取性能](#aws-glue-programming-etl-format-simd-csv-reader)。

------
#### [ Python ]

在本示例中，使用 [create\_dynamic\_frame.from\_options](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_options) 方法。

```
# Example: Read CSV from S3
# For show, we handle a CSV with a header row.  Set the withHeader option.
# Consider whether optimizePerformance is right for your workflow.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

dynamicFrame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": ["s3://{{s3path}}"]},
    format="csv",
    format_options={
        "withHeader": True,
        # "optimizePerformance": True,
    },
)
```

您还可以使用脚本（`pyspark.sql.DataFrame`）中的 DataFrames。

```
dataFrame = spark.read\
    .format("csv")\
    .option("header", "true")\
    .load("s3://{{s3path}}")
```

------
#### [ Scala ]

在本示例中，使用 [getSourceWithFormat](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getSourceWithFormat) 操作。

```
// Example: Read CSV from S3
// For show, we handle a CSV with a header row.  Set the withHeader option.
// Consider whether optimizePerformance is right for your workflow.

import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    
    val dynamicFrame = glueContext.getSourceWithFormat(
      formatOptions=JsonOptions("""{"withHeader": true}"""),
      connectionType="s3",
      format="csv",
      options=JsonOptions("""{"paths": ["s3://{{s3path}}"], "recurse": true}""")
    ).getDynamicFrame()
  }
}
```

您还可以使用脚本（`org.apache.spark.sql.DataFrame`）中的 DataFrames。

```
val dataFrame = spark.read
  .option("header","true")
  .format("csv")
  .load("s3://{{s3path}}“)
```

------

## 示例：将 CSV 文件和文件夹写入 S3
<a name="aws-glue-programming-etl-format-csv-write"></a>

 **先决条件：**您将需要一个初始化的 DataFrame（`dataFrame`）或 DynamicFrame（`dynamicFrame`）。您还需要预期 S3 输出路径 `s3path`。

 **配置：**在函数选项中，请指定 `format="csv"`。在您的 `connection_options` 中，请使用 `paths` 键指定 `s3path`。您可以在 `connection_options` 中配置编写器与 S3 的交互方式。有关详细信息，请参阅 AWS Glue 中 ETL 的连接类型和选项：[S3 连接参数](aws-glue-programming-etl-connect-s3-home.md#aws-glue-programming-etl-connect-s3)。您可以配置自己的操作在 `format_options` 中写入文件的内容的方式。有关详细信息，请参阅 [CSV 配置参考](#aws-glue-programming-etl-format-csv-reference)。以下 AWS Glue ETL 脚本显示了将 CSV 文件和文件夹写入 S3 的过程。

------
#### [ Python ]

在本示例中，使用 [write\_dynamic\_frame.from\_options](aws-glue-api-crawler-pyspark-extensions-glue-context.md#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_options) 方法。

```
# Example: Write CSV to S3
# For show, customize how we write string type values.  Set quoteChar to -1 so our values are not quoted.

from pyspark.context import SparkContext
from awsglue.context import GlueContext

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)

glueContext.write_dynamic_frame.from_options(
    frame=dynamicFrame,
    connection_type="s3",
    connection_options={"path": "s3://{{s3path}}"},
    format="csv",
    format_options={
        "quoteChar": -1,
    },
)
```

您还可以使用脚本（`pyspark.sql.DataFrame`）中的 DataFrames。

```
dataFrame.write\
    .format("csv")\
    .option("quote", None)\
    .mode("append")\
    .save("s3://{{s3path}}")
```

------
#### [ Scala ]

在本示例中，请使用 [getSinkWithFormat](glue-etl-scala-apis-glue-gluecontext.md#glue-etl-scala-apis-glue-gluecontext-defs-getSinkWithFormat) 方法。

```
// Example: Write CSV to S3
// For show, customize how we write string type values. Set quoteChar to -1 so our values are not quoted.

import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    
    glueContext.getSinkWithFormat(
        connectionType="s3",
        options=JsonOptions("""{"path": "s3://{{s3path}}"}"""),
        format="csv"
    ).writeDynamicFrame(dynamicFrame)
  }
}
```

您还可以使用脚本（`org.apache.spark.sql.DataFrame`）中的 DataFrames。

```
dataFrame.write
    .format("csv")
    .option("quote", null)
    .mode("Append")
    .save("s3://{{s3path}}")
```

------

## CSV 配置参考
<a name="aws-glue-programming-etl-format-csv-reference"></a>

您可以在 AWS Glue 库指定 `format="csv"` 的任何位置使用以下 `format_options`：
+ `separator` – 指定分隔符。默认值为逗号，但也可以指定任何其他字符。
  + **类型：**文本，**默认值：**`","`
+ `escaper` – 指定要用于转义的字符。此选项仅在读取 CSV 文件而非写入时使用。如果启用，则按原样使用紧跟其后的字符，一小组已知的转义符（`\n`、`\r`、`\t` 和 `\0`）除外。
  + **类型：**文本，**默认值：**无
+ `quoteChar` – 指定要用于引用的字符。默认值为双引号。将这设置为 `-1` 可完全关闭引用。
  + **类型：**文本，**默认值：**`'"'`
+ `multiLine` – 指定单个记录能否跨越多行。当字段包含带引号的换行符时，会出现此选项。如果有记录跨越多个行，您必须将此选项设置为 `True`。启用 `multiLine` 可能会降低性能，因为它在解析时需要更加谨慎的文件拆分。
  + **类型：**布尔值，**默认值：**`false`
+ `withHeader` – 指定是否将第一行视为标头。可以在 `DynamicFrameReader` 类中使用此选项。
  + **类型：**布尔值，**默认值：**`false`
+ `writeHeader` – 指定是否将标头写入输出。可以在 `DynamicFrameWriter` 类中使用此选项。
  + **类型：**布尔值，**默认值：**`true`
+ `skipFirst` – 指定是否跳过第一个数据行。
  + **类型：**布尔值，**默认值：**`false`
+ `optimizePerformance` – 指定是否使用高级 SIMD CSV 读取器以及基于 Apache Arrow 的列式内存格式。仅适用于 AWS Glue 3.0\+。
  + **类型：**布尔值，**默认值：**`false`
+ `strictCheckForQuoting` - 在编写 CSV 时，Glue 可能会在其解释为字符串的值中添加引号。这样做是为了防止写出的内容出现模棱两可之处。为了节省决定写入什么的时间，Glue 可能会在某些不需要引号的情况下进行引用。启用严格检查将执行更密集的计算，并且只有在绝对必要时才会引用。仅适用于 AWS Glue 3.0\+。
  + **类型：**布尔值，**默认值：**`false`

## 使用向量化 SIMD CSV 读取器优化读取性能
<a name="aws-glue-programming-etl-format-simd-csv-reader"></a>

AWS Glue 3.0 版添加了经过优化的 CSV 读取器，与基于行的 CSV 读取器相比，它可以显著提高整体任务性能。

 优化的读取器：
+ 使用 CPU SIMD 指令从磁盘读取
+ 立即以列格式（Apache Arrow）将记录写入内存 
+ 将记录分成几批

这样可以节省日后对记录进行批处理或转换为列格式时的处理时间。例如，更改架构或按列检索数据时。

要使用优化的读取器，请将在 `format_options` 或表属性中将 `"optimizePerformance"` 设置为 `true`。

```
glueContext.create_dynamic_frame.from_options(
    frame = datasource1,
    connection_type = "s3", 
    connection_options = {"paths": ["s3://s3path"]}, 
    format = "csv", 
    format_options={
        "optimizePerformance": True, 
        "separator": ","
        }, 
    transformation_ctx = "datasink2")
```

**矢量化 CSV 读取器的限制**  
请注意向量化 CSV 读取器的以下限制：
+ 它不支持 `multiLine` 和 `escaper` 格式选项。它使用默认双引号字符 `'"'` 的 `escaper`。设置这些选项后，AWS Glue 会自动回退使用基于行的 CSV 读取器。
+ 它不支持创建具有 [ChoiceType](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-types.html#aws-glue-api-crawler-pyspark-extensions-types-awsglue-choicetype) 的 DynamicFrame。
+ 它不支持创建具有[错误记录](https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-dynamicframe-class.html#glue-etl-scala-apis-glue-dynamicframe-class-defs-errorsAsDynamicFrame)的 DynamicFrame。
+ 它不支持读取带多字节字符（如日语或中文字符）的 CSV 文件。