本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 PySpark 分析模板在已配置的表上运行 PySpark 作业
此过程演示如何使用 AWS Clean Rooms 控制台中的 PySpark 分析模板通过自定义分析规则分析已配置的表。
使用 PySpark 分析模板在已配置的表上运行 PySpark 作业
登录 AWS 管理控制台 并打开 AWS Clean Rooms 控制台,网址为https://console.aws.amazon.com/cleanrooms
-
在左侧导航窗格中,选择协作。
-
选择处于 “您的成员权限” 状态为 “运行作业” 的协作。
-
在分析选项卡的表格部分下,查看表格及其关联的分析规则类型(自定义分析规则)。
-
在 “分析” 部分下,在 “分析” 模式下,选择 “运行分析模板”。
-
从 “ PySpark 分析模板” 下拉列表中选择分析模板。
PySpark 分析模板中的参数将自动填充到定义中。
-
如果分析模板定义了参数,请在 “参数” 下提供参数值:
-
查看每个参数的参数名称和默认值(如果已配置)。
-
为要覆盖的每个参数输入一个值。
注意
如果您未提供值但存在默认值,则将使用默认值。
重要
参数值最多可为 1,000 个字符,并且支持 UTF-8 编码。所有参数值都被视为字符串,并通过上下文对象传递给您的用户脚本。
确保您的用户脚本能够安全地验证和处理参数值。有关安全参数处理的更多信息,请参阅使用 PySpark 分析模板中的参数。
-
-
指定支持的工作器类型和工作人员人数。
使用下表来确定您的用例所需的工作人员类型和人数。
Worker 类型 vCPU 内存(GB) 存储(GB) 工作线程数 洁净室处理单元总数 (CRPU) CR.1X(默认值) 4 30 100 4 8 128 256 CR.4X 16 120 400 4 32 32 256 注意
不同的工作人员类型和人数会产生相关成本。要了解有关定价的更多信息,请参阅AWS Clean Rooms 定价
。 -
指定支持的 Spark 属性。
-
选择 “添加 Spark 属性”。
-
在 Spark 属性对话框中,从下拉列表中选择一个属性名称并输入值。
下表提供了每个属性的定义。
有关 Spark 属性的更多信息,请参阅 Apache Spark 文档中的 Spark 属性
。 注意
您最多可以配置 50 个 Spark 属性。每个属性值最多可包含 500 个字符。
属性名称 说明 默认值 Spark.task.maxFail
控制任务在失败之前可以连续失败多少次。需要一个大于或等于 1 的值。允许的重试次数等于该值减去 1。如果任何尝试成功,则失败计数将重置。不同任务的失败不会累积到这个极限。
4
spark.sql.files.max PartitionBytes
设置从基于文件的源(例如 Parquet、JSON 和 ORC)读取数据时要打包到单个分区的最大字节数。
128MB
spark.hadoop.fs.s3.maxRetries
设置 Amazon S3 文件操作的最大重试次数。
(无)
spark.network.
设置所有网络交互的默认超时时间。如果未配置以下超时设置,则覆盖这些设置:
-
Spark.storage.block ManagerHeartbeatTimeoutMs
-
spark.shuffle.io.connectionT
-
Spark.rpc.askTimeout
-
spark.rpc.lookupTim
120
spark.rdd.com
指定是否使用 spark.io.compression.codec 压缩序列化的 RDD 分区。适用于 Java 和 Scala 中的 StorageLevel.MEMORY _ONLY_SER,或者仅适用于 Python 中的 _ StorageLevel.MEMORY ONLY。减少存储空间,但需要额外的 CPU 处理时间。
false
Spark.shuffle.spill.compress
指定是否使用 spark.io.compression.codec 压缩随机播放数据。
true
shuffle.compress
指定是否压缩地图输出文件。压缩使用 spark.io.compression.codec。
true
spark.shuffle.service.index.cache.si
设置缓存大小限制,除非另有说明,否则以字节为单位。
100m
spark.shuffle.io.maxRetries
为由于异常而失败的提取设置最大重试次数。 IO-related
3
spark.shuffle.io.retryWait
设置两次重试提取之间的等待时间。默认情况下,重试导致的最大延迟为 15 秒,计算方法为 maxRetries * retryWait。
5 秒
spark.shuffle.io.connectionT
如果仍有未处理的提取请求但频道上没有流量,则将 shuffle 服务器和客户端之间建立的连接的超时标记为空闲并关闭。
(Spark.network.timeout 的值)
Spark.driver.max ResultSize
设置每个 Spark 操作的所有分区序列化结果的总大小限制,以字节为单位。应至少为 1M,或者 0 表示无限制。
1g
火花记忆分数
设置用于执行和存储的比例(堆空间-300MB)。该值越低,溢出和缓存数据被驱逐的频率就越高。建议将其保留为默认值。
0.6
Spark.scheduler.mode
设置提交给相同任务之间的调度模式 SparkContext。可以设置为 FAIR 以使用公平共享,而不是一个接一个地排队作业。支持的值:FAIR、FIFO。
FIFO
spark.sql.adaptive.adv PartitionSizeInBytes
当 spark.sql.adaptive.enabled 为真时,设置自适应优化期间洗牌分区的目标大小(以字节为单位)。控制合并小分区或拆分倾斜分区时的分区大小。
(spark.sql.adaptive.shuffle.target PostShuffleInputSize 的值)
Spark.sql.adaptive.auto BroadcastJoinThreshold
设置在联接期间向工作节点广播的最大表大小(以字节为单位)。仅适用于自适应框架。使用与 spark.sql.a BroadcastJoinThreshold uto 相同的默认值。设置为 -1 可禁用广播。
(无)
spark.sql.adaptive.coalesce Partitions.enabled
指定是否基于 spark.sql.adaptive.advisory 合并连续的洗牌分区以优化任务大小。PartitionSizeInBytes 需要 spark.sql.adaptive.enabled 才为真。
true
spark.sql.adaptive.coalesce Partitions.initialPartitionNum
定义合并前随机分区的初始数量。要求 spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalesce 都为真。Partitions.enabled 默认为 spark.sql.shuffle.partitions 的值。
(无)
spark.sql.adaptive.coalesce Partitions.minPartitionSize
设置合并后的随机分区的最小大小,以防止自适应优化期间分区变得太小。
1 MB
spark.sql.adaptive.coalesce Partitions.parallelismFirst
指定在分区合并期间是否根据群集并行度而不是 spark.sql.adaptive. PartitionSizeInBytes advisory 来计算分区大小。生成的分区大小小于配置的目标大小,以最大限度地提高并行度。我们建议在繁忙的群集上将其设置为 false,以通过防止过多的小任务来提高资源利用率。
true
sql.adaptive.enabled
指定是否启用自适应查询执行,以便在查询执行期间根据准确的运行时统计数据重新优化查询计划。
true
sql.adaptive.force OptimizeSkewedJoin
指定是否强制启用, OptimizeSkewedJoin 即使它引入了额外的随机播放。
false
Spark.sql.adaptive.local ShuffleReader.enabled
指定在不需要随机分区时(例如从排序合并联接转换为广播哈希联接之后)是否使用本地随机播放读取器。需要 spark.sql.adaptive.enabled 才为真。
true
spark.sql.adaptive.max ShuffledHashJoinLocalMapThreshold
设置用于构建本地哈希映射的最大分区大小(以字节为单位)。在以下情况下,优先考虑洗牌后的哈希联接而不是排序合并联接:
-
此值等于或超过 spark.sql.adaptive.advisory PartitionSizeInBytes
-
所有分区大小均在此限制范围内
覆盖 spark.sql.join.prefer 设置。SortMergeJoin
0 字节
sql.adaptive.optimize SkewsInRebalancePartitions.enabled
指定是否根据 spark.sql.adaptive.advisory 通过将倾斜的随机分区拆分为较小的分区来优化这些分区。PartitionSizeInBytes需要 spark.sql.adaptive.enabled 才为真。
true
spark.sql.adaptive.rebalan PartitionsSmallPartitionFactor
定义拆分期间合并分区的大小阈值系数。小于该系数乘以 spark.sql.adaptive.advisory 的PartitionSizeInBytes 分区将被合并。
0.2
spark.sql.adaptive.skew Join.enabled
指定是否通过拆分和可选复制倾斜的分区来处理洗牌联接中的数据倾斜。适用于排序合并和洗牌哈希联接。需要 spark.sql.adaptive.enabled 才为真。
true
spark.sql.adaptive.skew Join.skewedPartitionFactor
确定决定分区偏斜的大小系数。当分区的大小超过两个分区时,分区就会出现偏差:
-
该因子乘以分区大小中位数
-
spark.sql.adaptive.skew 的值 Join.skewedPartitionThresholdInBytes
5
spark.sql.adaptive.skew Join.skewedPartitionThresholdInBytes
设置用于识别偏斜分区的大小阈值(以字节为单位)。当分区的大小超过两个分区时,分区就会出现偏差:
-
这个门槛
-
分区大小中值乘以 spark.sql.adaptive.skew Join.skewedPartitionFactor
我们建议将此值设置为大于 spark.sql.adaptive. PartitionSizeInBytes advisory 的值。
256MB
sql.broadcastTimeout
控制广播加入期间广播操作的超时时间(以秒为单位)。
300 秒
spark.sql.cbo.enabled
指定是否为计划统计数据估算启用基于成本的优化 (CBO)。
false
spark.sql.cbo.join Reorder.dp.star.filter
指定是否在基于开销的联接枚举期间应用星型联接过滤器启发式算法。
false
spark.sql.cbo.join Reorder.dp.threshold
设置动态规划算法中允许的最大连接节点数。
12
spark.sql.cbo.join Reorder.enabled
指定是否在基于成本的优化 (CBO) 中启用联接重新排序。
false
spark.sql.cbo.plan Stats.enabled
指定在逻辑计划生成期间是否从目录中提取行数和列统计信息。
false
spark.sql.cbo.star SchemaDetection
指定是否启用基于星型架构检测的联接重新排序。
false
spark.sql.files.max PartitionNum
为基于文件的源(Parquet、JSON 和 ORC)设置拆分文件分区的目标最大数量。当初始计数超过此值时,重新缩放分区。这是建议的目标,而不是保证的上限。
(无)
spark.sql.files.max RecordsPerFile
设置写入单个文件的最大记录数。如果设置为零或负值,则不适用任何限制。
0
spark.sql.files.min PartitionNum
为基于文件的源(Parquet、JSON 和 ORC)设置拆分文件分区的目标最小数量。默认为 spark.sql.leaf。NodeDefaultParallelism这是建议的目标,而不是保证的上限。
(无)
spark.sql.in MemoryColumnarStorage.batchSize
控制列式缓存的批次大小。增加大小可以提高内存利用率和压缩率,但会增加出现内存不足错误的风险。
10000
spark.sql.in MemoryColumnarStorage.compressed
指定是否根据数据统计信息自动为列选择压缩编解码器。
true
spark.sql.in MemoryColumnarStorage.enableVectorizedReader
指定是否为列式缓存启用矢量化读取。
true
Spark.sql.legacy.allog HashOnMapType
指定是否允许对地图类型数据结构进行哈希操作。此传统设置保持了与旧版 Spark 地图类型处理的兼容性。
(无)
Spark.sql.legacy.allog NegativeScaleOfDecimal
指定是否允许在十进制类型定义中使用负比例值。此传统设置保持了与支持负十进制小数位数的旧 Spark 版本的兼容性。
(无)
spark.sql.legacy.cast ComplexTypesToString.enabled
指定是否启用将复杂类型转换为字符串的传统行为。保持与旧版 Spark 的类型转换规则的兼容性。
(无)
spark.sql.legacy.char VarcharAsString
指定是否将 CHAR 和 VARCHAR 类型视为字符串类型。此传统设置提供了与旧版 Spark 的字符串类型处理的兼容性。
(无)
Spark.sql.legacy.create EmptyCollectionUsingStringType
指定是否使用字符串类型元素创建空集合。此传统设置保持了与旧版 Spark 的集合初始化行为的兼容性。
(无)
Spark.sql.legacy.exponent LiteralAsDecimal.enabled
指定是否将指数文字解释为十进制类型。此传统设置保持了与旧版 Spark 的数字文字处理的兼容性。
(无)
spark.sql.legacy.json.allow EmptyString.enabled
指定是否允许在 JSON 处理中使用空字符串。此传统设置保持了与旧版 Spark 的 JSON 解析行为的兼容性。
(无)
spark.sql.legacy.parquet.int96 RebaseModelRead
指定在读取 Parquet 文件时是否使用传统的 INT96 时间戳变基模式。此传统设置保持了与旧版 Spark 的时间戳处理的兼容性。
(无)
spark.sql.legacy.time ParserPolicy
控制时间解析行为以实现向后兼容。此传统设置决定了如何从字符串中解析时间戳和日期。
(无)
Spark.sql.legacy.type Coercion.datetimeToString.enabled
指定在将日期时间值转换为字符串时是否启用传统类型强制行为。保持与旧版 Spark 版本的日期时间转换规则的兼容性。
(无)
spark.sql.max SinglePartitionBytes
设置最大分区大小(以字节为单位)。规划器为较大的分区引入了洗牌操作以提高并行度。
128m
spark.sql.metadatacachettlSeconds
控制元数据缓存的生存时间 (TTL)。适用于分区文件元数据和会话目录缓存。需要:
-
大于零的正值
-
Spark.sql.catalogCatalog实现设置为蜂巢
-
spark.sql.hive.filesource 大于零 PartitionFileCacheSize
-
spark.sql.hive.manage 设置为 true FilesourcePartitions
-1000 毫秒
spark.sql.optimizer.col ProjectAlwaysInline
指定是否折叠相邻的投影和行内表达式,即使它会导致重复。
false
spark.sql.optimizer.dy PartitionPruning.enabled
指定是否为用作联接键的分区列生成谓词。
true
Spark.sql.optimizer.enable CsvExpressionOptimization
指定是否通过从 from_csv 操作中删除不必要的列来优化 SQL 优化器中的 CSV 表达式。
true
Spark.sql.optimizer.enable JsonExpressionOptimization
通过以下方式指定是否优化 SQL 优化器中的 JSON 表达式:
-
从 from_json 操作中删除不必要的列
-
简化 from_json 和 to_json 的组合
-
优化 named_struct 操作
true
spark.sql.Optimizer.ExcludedR
定义要禁用的优化器规则,由逗号分隔的规则名称标识。某些规则不能被禁用,因为它们是正确性所必需的。优化器会记录哪些规则已成功禁用。
(无)
sql.optimizer.runtime.bloom Filter.applicationSideScanSizeThreshold
设置在应用程序端注入 Bloom 过滤器所需的最小聚合扫描大小(以字节为单位)。
10GB
sql.optimizer.runtime.bloom Filter.creationSideThreshold
定义在创建端注入 Bloom 滤镜的最大大小阈值。
10MB
sql.optimizer.runtime.bloom Filter.enabled
指定当随机连接的一侧具有选择性谓词时,是否插入布隆过滤器以减少随机播放数据。
true
sql.optimizer.runtime.bloom Filter.expectedNumItems
定义运行时 Bloom 过滤器中预期项目的默认数量。
1000000
sql.optimizer.runtime.bloom Filter.maxNumBits
设置运行时 Bloom 过滤器中允许的最大位数。
67108864
sql.optimizer.runtime.bloom Filter.maxNumItems
设置运行时 Bloom 过滤器中允许的最大预期项目数。
4000000
sql.optimizer.runtime.bloom Filter.numBits
定义运行时 Bloom 过滤器中使用的默认位数。
8388608
Spark.sql.optimizer.runtime.rowlevel OperationGroupFilter.enabled
指定是否为行级操作启用运行时组筛选。允许数据源:
-
使用数据源筛选器修剪整组数据(例如文件或分区)
-
执行运行时查询以识别匹配的记录
-
丢弃不必要的组以避免昂贵的重写
限制:
-
并非所有表达式都可以转换为数据源筛选器
-
有些表达式需要 Spark 求值(例如子查询)
true
spark.sql.optimizer.run Filter.number.threshold
设置注入的运行时过滤器的总数(非 DPP)。这是为了防止使用过多 Bloom 过滤器的驱动程序 OOM。
10
spark.sql.optimizer.run Filter.semiJoinReduction.enabled
指定当随机连接的一侧具有选择性谓词时,是否插入半联接以减少随机播放数据。
false
spark.sql.parquet.AgregatePus
指定是否将聚合向下推送到 Parquet 进行优化。支持:
-
布尔型、整数、浮点型和日期类型的最小值和最大值
-
所有数据类型的计数
如果任何 Parquet 文件页脚中缺少统计信息,则会抛出异常。
false
Spark.sql.parquet.columnar ReaderBatchSize
控制每个 Parquet 矢量化阅读器批次中的行数。选择一个平衡性能开销和内存使用量的值,以防止出现内存不足错误。
4096
Spark.sql.parquet.enable VectorizedReader
指定是否启用矢量化 Parquet 解码。
true
spark.sql.shuffle.part
设置联接或聚合期间用于数据洗牌的默认分区数。无法在结构化流式查询从同一检查点位置重新启动之间进行修改。
200
sql.sql.shuffled HashJoinFactor
定义用于确定 shuffle 哈希加入资格的乘法系数。当小边数据大小乘以此系数小于大边数据大小时,将选择随机哈希联接。
3
spark.sql.sources.par PartitionDiscovery.threshold
使用基于文件的源(Parquet、JSON 和 ORC)设置驱动端文件列表的最大路径数。如果在分区发现期间超出限制,则使用单独的 Spark 分布式作业列出文件。
32
spark.sql.statistics.histics.h
指定是否在列统计数据计算期间生成等高直方图以提高估计精度。除了基本列统计数据所需的扫描之外,还需要进行额外的表扫描。
false
火花动态 Allocation.executorIdleTimeout
设置启用动态分配后,执行器在移除之前必须处于空闲状态的持续时间。
60s
火花动态 Allocation.schedulerBacklogTimeout
设置启用动态分配后,在请求新的执行者之前必须将待处理任务积压的持续时间。
1s
火花动态 Allocation.sustainedSchedulerBacklogTimeout
与 spark.dynamic 相同Allocation.schedulerBacklogTimeout,但仅用于后续的执行器请求。
(spark.dynamic Allocation.schedulerBacklogTimeout 的值)
spark.scheduler.min RegisteredResourcesRatio
设置计划开始前要等待的已注册资源(注册资源/预期资源总量)的最小比率。指定为 0.0 和 1.0 之间的双精度。无论是否已达到最小资源比例,在计划开始之前等待的最大时间都由 spark.scheduler.max 控制。RegisteredResourcesWaitingTime
0.8
spark.scheduler.max RegisteredResourcesWaitingTime
设置在计划开始之前等待资源注册的最长时间。
30
spark.sql.hive.metastore PartitionPruningFallbackOnException
指定是否回退到从 Hive 元数据仓获取所有分区,并在遇到 MetaException 元数据仓时在 Spark 客户端执行分区修剪。
false
属性名称 说明 默认值 Spark.sql.auto BroadcastJoinThreshold
设置在联接期间向工作节点广播的最大表大小(以字节为单位)。设置为 -1 可禁用广播。
10MB
spark.io.compression.cod
设置用于压缩内部数据(例如 RDD 分区、事件日志、广播变量和 shuffle 输出)的编解码器。支持的值:lz4、snappy、zstd、gzip。
lz4
Spark.sql.session.time
定义会话时区,用于处理字符串文字中的时间戳和 Java 对象转换。接受:
-
Region-based area/city 格式化的 ID(例如 America/Los _Angeles)
-
区域偏移量采用 (+/-) HH、(+/-) 或 (+/-) HH:mm HH:mm:ss 格式(例如 -08 或 + 01:00)
-
UTC 或 Z 作为 + 00:00 的别名
(当地时区的值)
spark.cleanrooms.executor. OverheadFactor
设置执行器总内存中用于确定 spark.executor.memory 和 spark.executor.memoryOffeard 之间分配的比例。指定为介于 0.0 和小于 1.0 之间的双精度。
0.1
Spark.cleanrooms.driver. OverheadFactor
设置用于确定 spark.driver.memory 和 spark.driver.memoryOffed 之间分配的驱动程序总内存的比例。指定为介于 0.0 和小于 1.0 之间的双精度。
0.1
Spark.memory.storageFraction
设置不受驱逐影响的存储内存量,表示为 spark.memory.fraction 预留的区域大小的一小部分。该值越高,可用于执行的工作内存就越少,任务可能会更频繁地溢出到磁盘。建议将其保留为默认值。
0.5
Spark.rpc.askTimeout
设置 RPC 提问操作在超时之前等待的持续时间。
(Spark.network.timeout 的值)
Spark.executor.Heartbeat
设置每个执行者与驱动程序的心跳之间的间隔。Heartbeats 让驱动程序知道执行者还活着,并使用正在进行的任务的指标对其进行更新。spark.executor.HeartbeatInterval 应该大大低于 spark.network.timeout。
10s
spark.stage.max ConsecutiveAttempts
设置中止阶段之前允许的连续阶段尝试次数。
4
spark.task.cpus
设置要为每项任务分配的内核数。
1
shuffle.file.buffer
设置每个 shuffle 文件输出流的内存缓冲区大小,除非另有说明,否则以 KiB 为单位。这些缓冲区减少了在创建中间随机播放文件时进行磁盘搜索和系统调用的次数。
32k
spark.reducer.max SizeInFlight
设置从每个缩减任务中同时获取的最大地图输出大小,除非另有说明,否则以 MiB 为单位。由于每个输出都需要一个缓冲区才能接收它,因此这表示每个 reduce 任务都有固定的内存开销,因此除非有大量内存,否则请保持较小的内存开销。
48 米
-
-
(可选)对于计算付款人,请选择支付工作计算费用的协作成员。
注意
如果协作中只有一个候选人负责任务计算,则默认为该付款人。
-
选择运行。
注意
如果可以接收结果的成员尚未配置作业结果设置,则无法运行作业。
-
继续调整参数并重新运行作业,或者选择 + 按钮在新选项卡中开始新作业。