Standalone Pyspark错误:打开的文件太多

问题描述 投票:-1回答:1

我有一个〜40Gb(〜80m记录,仅2列,文本)数据,并且在该数据上有不同的计数。我可以在AWS的r5a.4xlarge实例上成功运行它。大约需要3分钟返回结果。但是,当我将实例更改为较大的实例r5a.12xlarge时,运行相同的代码时会出现“ Too Many Open Files”错误。我在Spark会话中尝试了几种不同的配置,但均无效果。另外,我将打开文件的LINUX限制增加到4096,没有更改。下面是代码和错误的第一部分。

spark = (SparkSession
    .builder
    .appName('Project_name')
        .config('spark.executor.memory', "42G") #Tried 19G to 60G
        .config('spark.executor.instances', "4") #Tried 1 to 5 
        .config('spark.executor.cores', "4") #Tried 1 to 5 
        .config("spark.dynamicAllocation.enabled", "true") #Also tried without dynamic allocation
        .config("spark.dynamicAllocation.minExecutors","1")
        .config("spark.dynamicAllocation.maxExecutors","5")
        .config('spark.driver.memory', "42G") #Tried 19G to 60G
        .config('spark.driver.maxResultSize', '10G') #Tried 1G to 10G
    .config('spark.worker.cleanup.enabled', 'True')
    .config("spark.local.dir", "/tmp/spark-temp")
    .getOrCreate())

错误:

>>> data.select(f.countDistinct("column_name")).show()

Py4JJavaError: An error occurred while calling o315.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 5.0 failed 1 times, most recent failure: Lost task 20.0 in stage 5.0 (TID 64, localhost, executor driver): java.io.FileNotFoundException: /tmp/spark-temp/blockmgr-c2f18891-a868-42ba-9075-dc145faaa4c4/16/temp_shuffle_f9c96d48-336d-423a-9edd-dcb9af5705a7 (Too many open files)

有什么想法吗?

pyspark bigdata
1个回答
0
投票

由于它是一个大文件,当spark读取文件时,它将为该文件创建292(292 * 128MB〜40G)分区。默认情况下,spark的spark.sql.shuffle.partitions = 200。因此,您只需要将此数字增加到大于分区数的数字即可。此外,您可以将文件缓存在内存中以提高性能。

spark = (SparkSession
    .builder
    .appName('Project_name')
    .config('spark.executor.memory', "20G") 
    .config('spark.driver.memory', "20G") 
    .config('spark.driver.maxResultSize', '10G') 
    .config('spark.sql.shuffle.partitions',300) # Increasing SQL shuffle partitions
    .config('spark.worker.cleanup.enabled', 'True')
    .config("spark.local.dir", "/tmp/spark-temp")
    .getOrCreate())

>>> data.select(f.countDistinct("column_name")).show() # Result in ~2min
© www.soinside.com 2019 - 2024. All rights reserved.