我有一个〜40Gb(〜80m记录,仅2列,文本)数据,并且在该数据上有不同的计数。我可以在AWS的r5a.4xlarge实例上成功运行它。大约需要3分钟返回结果。但是,当我将实例更改为较大的实例r5a.12xlarge时,运行相同的代码时会出现“ Too Many Open Files”错误。我在Spark会话中尝试了几种不同的配置,但均无效果。另外,我将打开文件的LINUX限制增加到4096,没有更改。下面是代码和错误的第一部分。
spark = (SparkSession
.builder
.appName('Project_name')
.config('spark.executor.memory', "42G") #Tried 19G to 60G
.config('spark.executor.instances', "4") #Tried 1 to 5
.config('spark.executor.cores', "4") #Tried 1 to 5
.config("spark.dynamicAllocation.enabled", "true") #Also tried without dynamic allocation
.config("spark.dynamicAllocation.minExecutors","1")
.config("spark.dynamicAllocation.maxExecutors","5")
.config('spark.driver.memory', "42G") #Tried 19G to 60G
.config('spark.driver.maxResultSize', '10G') #Tried 1G to 10G
.config('spark.worker.cleanup.enabled', 'True')
.config("spark.local.dir", "/tmp/spark-temp")
.getOrCreate())
错误:
>>> data.select(f.countDistinct("column_name")).show()
Py4JJavaError: An error occurred while calling o315.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 5.0 failed 1 times, most recent failure: Lost task 20.0 in stage 5.0 (TID 64, localhost, executor driver): java.io.FileNotFoundException: /tmp/spark-temp/blockmgr-c2f18891-a868-42ba-9075-dc145faaa4c4/16/temp_shuffle_f9c96d48-336d-423a-9edd-dcb9af5705a7 (Too many open files)
有什么想法吗?
由于它是一个大文件,当spark读取文件时,它将为该文件创建292(292 * 128MB〜40G)分区。默认情况下,spark的spark.sql.shuffle.partitions = 200。因此,您只需要将此数字增加到大于分区数的数字即可。此外,您可以将文件缓存在内存中以提高性能。
spark = (SparkSession
.builder
.appName('Project_name')
.config('spark.executor.memory', "20G")
.config('spark.driver.memory', "20G")
.config('spark.driver.maxResultSize', '10G')
.config('spark.sql.shuffle.partitions',300) # Increasing SQL shuffle partitions
.config('spark.worker.cleanup.enabled', 'True')
.config("spark.local.dir", "/tmp/spark-temp")
.getOrCreate())
>>> data.select(f.countDistinct("column_name")).show() # Result in ~2min