

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将 Delta Lake OSS 与 EMR Serverless 结合使用
<a name="using-delta-lake"></a>

## Amazon EMR 6.9.0 及更高版本
<a name="using-delta-lake-6.9.0"></a>

**注意**  
Amazon EMR 7.0.0 及更高版本使用 Delta Lake 3.0，该版本将 `delta-core.jar` 文件更名为 `delta-spark.jar`。如果您使用 Amazon EMR 7.0.0 或更高版本，请务必在配置中指定 `delta-spark.jar`。

Amazon EMR 6.9.0 及更高版本包含 Delta Lake，因此无需再自行打包 Delta Lake 或为 EMR Serverless 作业提供 `--packages` 标志。

1. 提交 EMR Serverless 作业时，请确保具有以下配置属性，并在 `sparkSubmitParameters` 字段中包含以下参数。

   ```
   --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar
       --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
       --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
   ```

1. 创建本地 `delta_sample.py` 以测试创建和读取 Delta 表。

   ```
   # delta_sample.py
       from pyspark.sql import SparkSession
       
       import uuid
       
       url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4())
       spark = SparkSession.builder.appName("DeltaSample").getOrCreate()
       
       ## creates a Delta table and outputs to target S3 bucket
       spark.range(5).write.format("delta").save(url)
       
       ## reads a Delta table and outputs to target S3 bucket
       spark.read.format("delta").load(url).show
   ```

1. 使用 AWS CLI，将`delta_sample.py`文件上传到您的 Amazon S3 存储桶。然后使用 `start-job-run` 命令向现有的 EMR Serverless 应用程序提交作业。

   ```
   aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/
       
       aws emr-serverless start-job-run \
           --application-id application-id \
           --execution-role-arn job-role-arn \
           --name emr-delta \
           --job-driver '{
               "sparkSubmit": {
                   "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py",
                   "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
               }
           }'
   ```

要将 Python 库与 Delta Lake 结合使用，请通过[将其打包为依赖项](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html)或[将其用作自定义映像](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-custom-images.html)来添加 `delta-core` 库。

或者，您可以使用 `SparkContext.addPyFile` 从 `delta-core` JAR 文件添加 Python 库：

```
import glob
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])
```

## Amazon EMR 6.8.0 及更低版本
<a name="using-delta-lake-6.8.0"></a>

如果您使用的是 Amazon EMR 6.8.0 或更低版本，请按照以下步骤将 Delta Lake OSS 与 EMR Serverless 应用程序结合使用。

1. 要在您的 Amazon EMR Serverless 应用程序上构建与 Spark 版本兼容的 [Delta Lak](https://delta.io/) e 开源版本，请导航到 [Delta GitHub](https://github.com/delta-io/delta) 并按照说明进行操作。

1. 将三角洲湖库上传到您的 Amazon S3 存储桶中 AWS 账户。

1. 在应用程序配置中提交 EMR Serverless 作业时，请包含存储桶中现有的 Delta Lake JAR 文件。

   ```
   --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
   ```

1. 为确保您可以对 Delta 表进行读取和写入，请运行示例 PySpark测试。

   ```
   from pyspark import SparkConf, SparkContext
       from pyspark.sql import HiveContext, SparkSession
       
       import uuid
       
       conf = SparkConf()
       sc = SparkContext(conf=conf)
       sqlContext = HiveContext(sc)
       
       url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4())
       
       ## creates a Delta table and outputs to target S3 bucket
       session.range(5).write.format("delta").save(url)
       
       ## reads a Delta table and outputs to target S3 bucket
       session.read.format("delta").load(url).show
   ```