

 从补丁 198 开始，Amazon Redshift 将不再支持创建新的 Python UDF。现有的 Python UDF 将继续正常运行至 2026 年 6 月 30 日。有关更多信息，请参阅[博客文章](https://aws.amazon.com/blogs/big-data/amazon-redshift-python-user-defined-functions-will-reach-end-of-support-after-june-30-2026/)。

# 适用于 Apache Spark 的 Amazon Redshift 集成
<a name="spark-redshift-connector"></a>

 [Apache Spark](https://aws.amazon.com/emr/features/spark/) 是一个分布式处理框架和编程模型，可帮助您进行机器学习、流处理或图形分析。Spark 与 Apache Hadoop 类似，也是一款常用于大数据工作负载的开源、分布式处理系统。Spark 具有优化的有向无环图（DAG）执行引擎，可主动在内存中缓存数据。这可以提高性能，特别适合某些算法和交互式查询。

 此集成为您提供了 Spark 连接器，您可以将其用于构建 Apache Spark 应用程序，这些应用程序在 Amazon Redshift 和 Amazon Redshift Serverless 中读取和写入数据。这些应用程序不会影响应用程序性能或数据的事务一致性。此集成自动包括在 [Amazon EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/) 和 [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/) 中，因此您可以立即运行 Apache Spark 作业，在数据摄取和转换管道过程中访问数据并将其加载到 Amazon Redshift 中。

目前，可以使用 Spark 的 3.3.x、3.4.x、3.5.x 和 4.0.0 版本进行此集成。

 此集成提供以下内容：
+  AWS Identity and Access Management（IAM）身份验证 有关更多信息，请参阅 [Amazon Redshift 中的 Identity and Access Management](https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-authentication-access-control.html)。
+ 利用谓词和查询下推来提高性能。
+  Amazon Redshift 数据类型。
+ 与 Amazon Redshift 和 Amazon Redshift Serverless 的连接。

## 使用 Spark 连接器时的注意事项和限制
<a name="spark-redshift-connector-considerations"></a>
+  tempdir URI 指向 Amazon S3 位置。此临时目录不会自动清理，可能会增加额外的成本。我们建议使用《Amazon Simple Storage Service 用户指南》**中的 [Amazon S3 生命周期策略](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html)，定义 Amazon S3 存储桶的保留规则。
+  原定设置情况下，如果 S3 桶和 Redshift 集群位于不同的 AWS 区域，则 Amazon S3 和 Redshift 之间的复制不起作用。要使用单独的 AWS 区域，请将 `tempdir_region` 参数设置为对 `tempdir` 使用的 S3 存储桶的区域。
+ 如果使用 `tempformat` 参数写入 Parquet 数据，则在 S3 和 Redshift 之间进行跨区域写入。
+ 我们建议使用 [Amazon S3 服务器端加密](https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html)以加密使用的 Amazon S3 存储桶。
+ 我们建议[阻止对 Amazon S3 存储桶的公有访问](https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html)。
+  我们建议不要公开访问 Amazon Redshift 集群。
+  我们建议启用 [Amazon Redshift 审核日志记录](https://docs.aws.amazon.com/redshift/latest/mgmt/db-auditing.html)。
+  我们建议启用 [Amazon Redshift 静态加密](https://docs.aws.amazon.com/redshift/latest/mgmt/security-server-side-encryption.html)。
+  我们建议您为从 Spark on Amazon EMR 到 Amazon Redshift 的 JDBC 连接启用 SSL。
+ 我们建议使用参数 `aws_iam_role` 为 Amazon Redshift 身份验证参数传递 IAM 角色。

# 使用 Spark 连接器进行身份验证
<a name="redshift-spark-connector-authentication"></a>

下图描述了 Amazon S3、Amazon Redshift、Spark 驱动程序和 Spark 执行程序之间的身份验证。

![\[这是 Spark 连接器身份验证的示意图。\]](http://docs.aws.amazon.com/zh_cn/redshift/latest/mgmt/images/spark-connector-authentication.png)


## Redshift 和 Spark 之间的身份验证
<a name="redshift-spark-authentication"></a>

 您可以使用 Amazon Redshift 提供的 JDBC 驱动程序版本 2.x 驱动程序，通过指定登录凭证，使用 Spark 连接器连接到 Amazon Redshift。要使用 IAM，[请将您的 JDBC url 配置为使用 IAM 身份验证](https://docs.aws.amazon.com/redshift/latest/mgmt/generating-iam-credentials-configure-jdbc-odbc.html)。要从 Amazon EMR 或 AWS Glue 连接到 Redshift 集群，确保您的 IAM 角色具有检索临时 IAM 凭证所必需的权限。以下列表描述了您的 IAM 角色检索凭证和运行 Amazon S3 操作所需的所有权限。
+ [ Redshift:GetClusterCredentials](https://docs.aws.amazon.com/redshift/latest/APIReference/API_GetClusterCredentials.html)（适用于预置的 Redshift 集群）
+ [ Redshift:DescribeClusters](https://docs.aws.amazon.com/redshift/latest/APIReference/API_DescribeClusters.html)（适用于预置的 Redshift 集群）
+ [Redshift:GetWorkgroup](https://docs.aws.amazon.com/redshift-serverless/latest/APIReference/API_GetWorkgroup.html)（适用于 Amazon Redshift Serverless 工作组）
+ [ Redshift:GetCredentials](https://docs.aws.amazon.com/redshift-serverless/latest/APIReference/API_GetCredentials.html)（适用于 Amazon Redshift Serverless 工作组）
+ [s3:ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListBuckets.html)
+ [ s3:GetBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_GetBucket.html)
+ [ s3:GetObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html)
+ [ s3:PutObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html)
+ [ s3:GetBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetBucketLifecycleConfiguration.html)

 有关 GetClusterCredentials 的更多信息，请参阅 [GetClusterCredentials 的 IAM 策略](https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-access-control-identity-based.html#redshift-policy-resources.getclustercredentials-resources)。

您还必须确保 Amazon Redshift 可以在 `COPY` 和 `UNLOAD` 操作期间担任 IAM 角色。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "redshift.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
```

------

如果您使用的是最新的 JDBC 驱动程序，则驱动程序将自动管理从 Amazon Redshift 自签名证书到 ACM 证书的转换。但是，您必须[为 JDBC url 指定 SSL 选项](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-configuration-options.html#jdbc20-ssl-option)。

 以下是有关如何指定 JDBC 驱动程序 URL 和 `aws_iam_role` 以连接到 Amazon Redshift 的示例。

```
df.write \
  .format("io.github.spark_redshift_community.spark.redshift ") \
  .option("url", "jdbc:redshift:iam://<the-rest-of-the-connection-string>") \
  .option("dbtable", "<your-table-name>") \
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
  .option("aws_iam_role", "<your-aws-role-arn>") \
  .mode("error") \
  .save()
```

## Amazon S3 和 Spark 之间的身份验证
<a name="spark-s3-authentication"></a>

 如果您使用 IAM 角色在 Spark 和 Amazon S3 之间进行身份验证，则使用以下方法之一：
+ AWS SDK for Java 会自动尝试使用由 DefaultAWSCredentialsProviderChain 类实施的默认凭证提供程序链来查找 AWS 凭证。有关更多信息，请参阅[使用默认凭证提供程序链](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default)。
+ 您可以通过 [Hadoop 配置属性](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md)指定 AWS 密钥。例如，如果您的 `tempdir` 配置指向 `s3n://` 文件系统，请在 Hadoop XML 配置文件中设置 `fs.s3n.awsAccessKeyId` 和 `fs.s3n.awsSecretAccessKey` 属性或调用 `sc.hadoopConfiguration.set()` 以更改 Spark 的全局 Hadoop 配置。

例如，如果您使用的是 s3n 文件系统，则添加：

```
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
```

对于 s3a 文件系统，请添加：

```
sc.hadoopConfiguration.set("fs.s3a.access.key", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")
```

如果您使用的是 Python，则使用以下操作：

```
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
```
+ 在 `tempdir` URL 中对身份验证密钥进行编码。例如，URI `s3n://ACCESSKEY:SECRETKEY@bucket/path/to/temp/dir` 对密钥对（`ACCESSKEY`，`SECRETKEY`）进行编码。

## Redshift 和 Amazon S3 之间的身份验证
<a name="redshift-s3-authentication"></a>

 如果您在查询中使用 COPY 和 UNLOAD 命令，则还必须向 Amazon S3 授予访问 Amazon Redshift 的权限，这样才能代表您运行查询。为此，请先[授权 Amazon Redshift 访问其他 AWS 服务](https://docs.aws.amazon.com/redshift/latest/mgmt/authorizing-redshift-service.html)，然后[使用 IAM 角色授权 COPY 和 UNLOAD 操作](https://docs.aws.amazon.com/redshift/latest/mgmt/copy-unload-iam-role.html)。

作为最佳实践，我们建议将权限策略附加到 IAM 角色，然后根据需要将其分配给用户和组。有关更多信息，请参阅 [Amazon Redshift 中的 Identity and Access Management](https://docs.aws.amazon.com/redshift/latest/mgmt/redshift-iam-authentication-access-control.html)。

## 与 AWS Secrets Manager 集成
<a name="redshift-secrets-manager-authentication"></a>

您可以从 AWS Secrets Manager 中存储的密钥检索您的 Redshift 用户名和密码凭证。要自动提供 Redshift 凭证，请使用 `secret.id` 参数。有关如何创建 Redshift 凭证密钥的更多信息，请参阅[创建 AWS Secrets Manager 数据库密钥](https://docs.aws.amazon.com/secretsmanager/latest/userguide/create_database_secret.html)。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/redshift/latest/mgmt/redshift-spark-connector-authentication.html)

**注意**  
 致谢：本文档包含 [Apache Software Foundation](http://www.apache.org/) 根据 [Apache 2.0 许可证](https://www.apache.org/licenses/LICENSE-2.0)的许可而开发的示例代码和语言。

# 通过下推提高性能
<a name="spark-redshift-connector-pushdown"></a>

 Spark 连接器自动应用谓词和查询下推来优化性能。有了这种支持就意味着，如果您在查询中使用支持的函数，Spark 连接器会将该函数转换成 SQL 查询，并在 Amazon Redshift 中运行该查询。这种优化会减少需要检索的数据，因此 Apache Spark 可以处理更少的数据并获得更好的性能。默认情况下，自动激活下推。要停用它，请将 `autopushdown` 设置为 false。

```
import sqlContext.implicits._val 
 sample= sqlContext.read
    .format("io.github.spark_redshift_community.spark.redshift")
    .option("url",jdbcURL )
    .option("tempdir", tempS3Dir)
    .option("dbtable", "event")
    .option("autopushdown", "false")
    .load()
```

 下推支持以下函数。如果您使用不在此列表中的函数，Spark 连接器将在 Spark 中（而不是在 Amazon Redshift 中）执行此函数，从而导致性能未优化。有关 Spark 中函数的完整列表，请参阅[内置函数](https://spark.apache.org/docs/latest/api/sql/index.html)。
+ 聚合函数
  + avg
  + count
  + max
  + min
  + sum
  + stddev\$1samp
  + stddev\$1pop
  + var\$1samp
  + var\$1pop
+ 布尔运算符
  + in
  + isnull
  + isnotnull
  + contains
  + endswith
  + startswith
+ 逻辑运算符
  + and
  + or
  + not (or \$1)
+ 数学函数
  + \$1
  + -
  + \$1
  + /
  + - (unary)
  + abs
  + acos
  + asin
  + atan
  + ceil
  + cos
  + EXP
  + floor
  + greatest
  + least
  + log10
  + pi
  + pow
  + round
  + sin
  + sqrt
  + tan
+ 其他函数
  + cast
  + coalesce
  + decimal
  + if
  + in
+ 关系运算符
  + \$1=
  + =
  + >
  + >=
  + <
  + <=
+ 字符串函数
  + ascii
  + lpad
  + rpad
  + 翻译
  + upper
  + lower
  + length
  + trim
  + ltrim
  + rtrim
  + like
  + substring
  + concat
+ 日期和时间函数
  + add\$1months
  + date
  + date\$1add
  + date\$1sub
  + date\$1trunc
  + timestamp
  + trunc
+ 数学运算
  + CheckOverflow
  + PromotePrecision
+ 关系运算
  + Aliases（例如，AS）
  + CaseWhen
  + Distinct
  + InSet
  + 联接和交叉联接
  + Limits
  + Unions，union all
  + ScalarSubquery
  + Sorts（升序和降序）
  + UnscaledValue

# 其他配置选项
<a name="spark-redshift-connector-other-config"></a>

在本页中，您可以找到可为 Amazon Redshift Spark 连接器指定的选项的说明。

## 字符串列的最大大小
<a name="spark-redshift-connector-other-config-max-size"></a>

Redshift 在创建表时将字符串列创建为文本列，它们存储为 VARCHAR(256)。如果您想要支持更大大小的列，则可以使用 maxlength 来指定字符串列的最大长度。下面是说明如何指定 `maxlength` 的示例。

```
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}
```

## 列类型
<a name="spark-redshift-connector-other-config-column-type"></a>

要设置列类型，请使用 `redshift_type` 字段。

```
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}
```

## 列上的压缩编码
<a name="spark-redshift-connector-other-config-compression-encoding"></a>

 要对列使用特定的压缩编码，请使用编码字段。有关支持的压缩编码的完整列表，请参阅[压缩编码](https://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html)。

## 列的描述
<a name="spark-redshift-connector-other-config-description"></a>

要设置描述，请使用 `description` 字段。

## Redshift 和 Amazon S3 之间的身份验证
<a name="spark-redshift-connector-other-config-unload-as-text"></a>

 默认情况下，结果将以 Parquet 格式转存到 Amazon S3。要以竖线分隔的文本文件转存结果，请指定以下选项。

```
.option("unload_s3_format", "TEXT")
```

## 下推语句
<a name="spark-redshift-connector-other-config-lazy-pushdown"></a>

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/redshift/latest/mgmt/spark-redshift-connector-other-config.html)

## 连接器参数
<a name="spark-redshift-connector-other-config-spark-parameters"></a>

Spark SQL 中的参数映射或 `OPTIONS` 支持以下设置。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/redshift/latest/mgmt/spark-redshift-connector-other-config.html)

**注意**  
 致谢：本文档包含 [Apache Software Foundation](http://www.apache.org/) 根据 [Apache 2.0 许可证](https://www.apache.org/licenses/LICENSE-2.0)的许可而开发的示例代码和语言。

# 支持的数据类型
<a name="spark-redshift-connector-data-types"></a>

Spark 连接器支持 Amazon Redshift 中的以下数据类型。有关 Amazon Redshift 中支持的数据类型的完整列表，请参阅[数据类型](https://docs.aws.amazon.com//redshift/latest/dg/c_Supported_data_types.html)。如果某个数据类型不在下表中，则 Spark 连接器不支持该数据类型。

[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/zh_cn/redshift/latest/mgmt/spark-redshift-connector-data-types.html)

## 复杂的数据类型
<a name="spark-redshift-connector-complex-data-types"></a>

 您可以使用 spark 连接器在 Redshift SUPER 数据类型列中读写 Spark 复杂数据类型，如 `ArrayType`、`MapType` 和 `StructType`。如果您在读取操作期间提供架构，则该列中的数据将在 Spark 中转换为相应的复杂类型，包括任何嵌套类型。此外，如果启用 `autopushdown`，嵌套属性、映射值和数组索引的投影将下推到 Redshift，这样，当只访问一部分数据时，就不再需要卸载整个嵌套数据结构。

从连接器写入 DataFrame 时，任何类型为 `MapType`（使用 `StringType`）、`StructType` 或 `ArrayType` 的列都会写入 Redshift SUPER 数据类型列。在写入这些嵌套数据结构时，`tempformat` 参数必须为类型 `CSV`、`CSV GZIP` 或 `PARQUET`。使用 `AVRO` 将导致异常。写入一个键类型不是 `StringType` 的 `MapType` 数据结构也会导致异常。

### StructType
<a name="spark-redshift-connector-complex-data-types-examples-structtype"></a>

以下示例演示如何使用包含结构的 SUPER 数据类型创建表

```
create table contains_super (a super);
```

然后，您可以使用连接器，使用下面示例中的类似架构，从表中的 SUPER 列 `a` 查询 `StringType` 字段 `hello`。

```
import org.apache.spark.sql.types._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil)

val helloDF = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", jdbcURL )
.option("tempdir", tempS3Dir)
.option("dbtable", "contains_super")
.schema(schema)
.load().selectExpr("a.hello")
```

以下示例演示如何向列 `a` 写入结构。

```
import org.apache.spark.sql.types._
import org.apache.spark.sql._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

val schema = StructType(StructField("a", StructType(StructField("hello", StringType) ::Nil)) :: Nil)
val data = sc.parallelize(Seq(Row(Row("world"))))
val mydf = sqlContext.createDataFrame(data, schema)

mydf.write.format("io.github.spark_redshift_community.spark.redshift").
option("url", jdbcUrl).
option("dbtable", tableName).
option("tempdir", tempS3Dir).
option("tempformat", "CSV").
mode(SaveMode.Append).save
```

### MapType
<a name="spark-redshift-connector-complex-data-types-examples-maptype"></a>

如果您更喜欢使用 `MapType` 来表示数据，那么您可以在架构中使用 `MapType` 数据结构，并检索映射中与键对应的值。请注意，`MapType` 数据结构中的所有键都必须是 String 类型，并且所有值都必须是相同的类型，例如 int。

以下示例演示如何获取列 `a` 中键 `hello` 的值。

```
import org.apache.spark.sql.types._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

val schema = StructType(StructField("a", MapType(StringType, IntegerType))::Nil)

val helloDF = sqlContext.read
    .format("io.github.spark_redshift_community.spark.redshift")
    .option("url", jdbcURL )
    .option("tempdir", tempS3Dir)
    .option("dbtable", "contains_super")
    .schema(schema)
    .load().selectExpr("a['hello']")
```

### ArrayType
<a name="spark-redshift-connector-complex-data-types-examples-arraytype"></a>

如果该列包含数组而不是结构，则可以使用连接器查询数组中的第一个元素。

```
import org.apache.spark.sql.types._

val sc = // existing SparkContext
val sqlContext = new SQLContext(sc)

val schema = StructType(StructField("a", ArrayType(IntegerType)):: Nil)

val helloDF = sqlContext.read
    .format("io.github.spark_redshift_community.spark.redshift")
    .option("url", jdbcURL )
    .option("tempdir", tempS3Dir)
    .option("dbtable", "contains_super")
    .schema(schema)
    .load().selectExpr("a[0]")
```

### 限制
<a name="spark-redshift-connector-complex-data-types-limitations"></a>

通过 spark 连接器使用复杂数据类型有以下限制：
+ 所有嵌套的结构字段名称和映射键必须为小写。如果查询带有大写字母的复杂字段名称，可以尝试省略架构，并使用 `from_json` spark 函数在本地转换返回的字符串来作为解决方法。
+ 在读取或写入操作中使用的任何映射字段都必须只有 `StringType` 键。
+ 只有 `CSV`、`CSV GZIP` 和 `PARQUET ` 是支持将复杂类型写入 Redshift 的临时格式值。尝试使用 `AVRO ` 会引发异常。