{
"sark.kryoserializer.buffer.max": "2047m",
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "1",
"spark.dynamicAllocation.maxExecutors": "100",
"spark.dynamicAllocation.initialExecutors": "50",
"spark.executor.memory": "16g",
"spark.driver.memory": "16g",
"spark.executor.cores": "4",
"spark.driver.cores": "4",
"spark.sql.shuffle.partitions": "750",
"spark.sql.catalogImplementation": "hive",
"spark.driver.maxResultSize": "4g"
}
有时还不够。而且问题总是带有.collect()我对此过滤数据:
data_collected = data_filtered.rdd.map(lambda row: row.asDict()).collect()
我需要这样的数据(或者我不知道其他方式),因为我正在循环使用每行创建字典的数据。我尝试了其他一些方法,例如使用foreach/tolocaliterator(也许是错误的),但是我遇到了问题,我认为,因为当我。 thanks!
用来使用
json.loads()
。 收集不是大数据工具。 这是“我的电脑工具上的”。
如果您需要在每个行上调用一个函数,那么大数据工具是
用户定义的函数。(udf)。 表演不会很好,但是嘿,至少您完成了工作。 (它的表现不佳,因为它一次将数据汇回数据,并且一次是python解释器一行。)我不确定为什么您要为每一行创建一个字典。 您可以通过Select语句访问每一行,那么为什么要创建字典? 也许考虑使用SPARK/PYSPARK工具而不是Python工具探索其他选项。
使用
collect()
总是一个明智的选择。 collect()
对不同节点上的所有行都加载到驾驶员的内存中。当处理大量数据时,这可能是特别危险的(您在主内存中拟合了大量),从而导致内存过载。
如果您只需要一个子集,请使用