我用 Spark 编写了一个自定义 RDD 提供程序,并在 SQL 查询下测试其执行时间。我不确定该提供程序的内部机制是否与该问题相关,但基本上它用于从本地和远程文件读取数据。
问题:使用
distinct
关键字运行查询(无论是否用于聚合查询)都会对查询的性能产生可怕的影响。
示例:
查询:
select sourceip, sourceport, destinationport from table where destinationport=80
查询:
select distinct sourceip, sourceport, destinationport from events where destinationport=80
此结果适用于非常少量的数据,当我尝试在中等大小的数据集上运行它时,Spark 会因“打开文件太多”而崩溃。
日志:
/tmp/spark-a47f859b-2a1f-4466-8333-0bf40c3968eb/executor-9c98264a-23a8-49b8-ab6f-ddf7349e0155/blockmgr-b73fc639-8705-4956-8652-e7300b35 527a/3f/temp_shuffle_b4afe57f-9db1-4653-91c0- 22d207933748(打开文件太多)
从长远来看,使用
distinct
会用 5 分钟的数据破坏集群,而如果不使用 distinct
,服务器会成功运行 2 天的查询。
有什么想法可能会导致这个问题吗?
附注我使用以下方法验证了打开文件的数量及其限制:
打开文件:
lsof | wc -l
导致~1.4Mcat /proc/sys/fs/file-max
结果为 9-42M(取决于机器 - 主服务器有 9M)
以下两个步骤可帮助您调试问题:
1)distinct() 肯定是在跨分区进行洗牌。要了解更多发生的情况,请在 RDD 上运行 .toDebugString ?.
2)您可以使用 Spark UI 检查您的作业是否正确并行化?
还要检查您的分区是否倾斜,这里是进一步阅读的链接
当然,如果你有不同的查询,你的查询将会变慢。
select sourceip, sourceport, destinationport from table where destinationport=80
这是一个简单的选择过滤器查询。所以不需要任何洗牌。 Spark 将执行谓词下推并仅使用过滤器操作给出结果
鉴于
select distinct sourceip, sourceport, destinationport from events where destinationport=80
在这里你有独特的。这本质上是要求 Spark 进行随机播放。它会先过滤掉结果,然后计算每个结果的哈希值,然后进行reduce操作以删除重复的结果(相同哈希多行归结为一行)
现在 shuffle 是一个相对昂贵的操作,因为它需要通过网络发送数据,因此第二个查询将比前一个查询慢得多