我正在使用 Spark Structured Streaming,特别是 Databricks Autoloader,来处理 S3 存储桶中的数百万条小记录,并将它们索引到 Delta Lake 表中。大约有 30 亿个文件需要处理 500TB 的信息。我想分享我的轶事。我注意到默认的随机播放大小似乎无法很好地扩展——在处理大量单个文件时,它会导致过多的上下文切换和性能瓶颈。
这是我的示例,请诚实地说明细节:我有 3 个“r4.2xlarge”实例。这使得执行器上有 8 个核心,相当于 16 个线程。我们正在从 binaryFiles 流处理程序运行 FileScanRDD。每条记录大约有 128KB 大,对于这个小样本,我们在 Spark Structured Streaming 中一次仅消耗几百万条记录,摄取目标是 3 亿条。
根据我的经验,将
spark.sql.shuffle.partitions
配置从 200(默认)调整是提高性能的一种方法。当我设法将其核心数量降低到 3 倍时,我观察到处理速度要好得多。我认为这个问题是任务饱和,以及集群默认情况下的集群上下文切换过多。
但是尝试更改此默认值很困难。有一个问题。
spark.sql.shuffle.partitions
设置保留在流的检查点中,这意味着即使在更改它之后,流仍然使用旧值进行状态聚合,除非我创建新的检查点。很少有消息来源证实了这一点,首先,当我尝试调整 Scan Binary File Stage、Spark Docs 和 Databricks 社区论坛帖子的大小时,我对 Spark UI 中长时间运行的任务的 numPartitions 测量结果令人沮丧地没有从 200 变化.
那么,鉴于 shuffle 大小一旦设置就无法更改,我如何确定流作业的最佳 Spark.sql.shuffle.partitions ?
我是否应该坚持默认的 200 个分区,尤其是在使用更多内核时?我担心较小的 shuffle 大小可能会限制可扩展性,并且在扩展集群时无法充分利用资源。相反,我担心默认设置可能不适合我的工作负载,并且需要更昂贵、更大的集群。调整此参数以平衡最小化上下文切换和保持生产工作负载的可扩展性的最佳实践是什么?
再多的猜测也无法提供最佳答案,唯一的方法就是测量。
小文件会增加调度开销,同时处理速度更快。您可以在下次开始时使用不超过核心数 3 倍的任务,或者使用更大的集群。通常更大的集群不是问题,因为它需要的时间更少,所以最终花费的钱应该是相同的。
您可以将环境变量
SPARK_WORKER_CORES
设置为 2 倍核心,使 Spark 在同一核心上运行多个任务,在另一个任务等待 I/O 时利用更多 CPU。请参阅文档中的部分:https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts