获取spark.rpc.message.maxsize错误带有大文件

问题描述 投票:0回答:0
启动SparkSession

使用

pd.read_csv(chunksize=CHUNKSIZE)

  • 遍历迭代器中的每个块
  • 对于每个部分:
    将块PD DF转化为火花DF
  • 写入/附加到Azure Blob存储中的Delta表
  • CLEARMEMORY
    • 我在迭代83(处理了4.15亿行)时遇到此错误
    • org.apache.spark.sparkexception:由于阶段失败而流产的工作:序列化任务42776:450是282494392字节,超过最大允许:spark.rpc.message.maxsize(268435456字节)。考虑增加spark.rpc.message.maxsize或使用广播变量用于大值。
    将大块缩小为
  • CHUNKSIZE=5_000_000

CHUNKSIZE=3_000_000

我仍然会遇到相同的错误,但是在以后的迭代中(139或4.17亿行,因此似乎在一定数量的行后发生了):

org.apache.spark.sparkexception:由于阶段失败而流产的工作:序列化任务43293:250是282494392字节,超过最大允许:spark.rpc.message.maxsize(268435456字节)。考虑增加spark.rpc.message.maxsize或使用广播变量用于大值。

我知道直接阅读到Spark DF是理想的选择,但是我无法找到直接读取Azure Blob中的ZIP文件的方法,因此绝对也可以在这方面的建议开放。 我的群集配置:

2工人
Standard_DS3_v2

14 GB内存,4个核心

驱动器

Standard_DS13_v2
56 GB内存,8核

我尝试过的事物:

    设置
  • spark.rpc.message.maxSize
    如使用错误消息所建议的
    SparkSession.builder.config("spark.rpc.message.maxSize", "512")
  • 假设单元在MIB中(似乎是
  • SparkConfigDocs
    所建议的情况)。这也是
    databricks
提出的

SparkSession.builder.config("spark.rpc.message.maxSize", "536870912")
    如果单位在某种程度上是字节
  • 即使当我打印
    spark.conf.get("spark.rpc.message.maxSize")
    时,这些设置仍会给我相同的错误
    使用
    spark.conf.set("spark.rpc.message.maxSize", "512")实例化Spark实例后,这给了我一个错误,说明Spark的实例化后无法更改该参数 ENTIRE CODEBLOCK:
  • def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0): # File paths zip_file = f"{snapshot_date}.zip" delta_file = f"{snapshot_date}_delta" delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{delta_file}/" spark = ( SparkSession.builder.config("spark.sql.shuffle.partitions", "100") .config("spark.hadoop.fs.azure.retries", "10") .config("spark.rpc.message.maxSize", "536870912") # 512 MiB in bytes .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() ) spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", BLOB_CREDENTIAL) print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize")) if start_chunk == 0: # Delete file if exists print("**** Deleting existing delta table") if fs.exists(f"{CONTAINER_NAME}/{delta_file}"): fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True) chunksize = 3_000_000 with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file: with zipfile.ZipFile(file, "r") as zip_ref: file_name = zip_ref.namelist()[0] with zip_ref.open(file_name) as csv_file: csv_io = TextIOWrapper(csv_file, "utf-8") headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist() chunk_iter = pd.read_csv( csv_io, sep="\t", header=None, names=headers, usecols=["col1", "col2", "col3"], dtype=str, # Read all as strings to avoid errors chunksize=chunksize, skiprows=start_chunk*chunksize ) for chunk in tqdm(chunk_iter, desc="Processing chunks"): # Convert pd DataFrame to Spark DataFrame spark_df = spark.createDataFrame(chunk) (spark_df.repartition(8).write .format("delta") .mode("append") .option("mergeSchema", "true") .save(delta_table_path) ) # Clear memory after each iteration spark_df.unpersist(blocking=True) del chunk del spark_df gc.collect()
  • 	
  • 如果这是一次一次操作,为什么不将其拉开临时位置并尝试使用Spark的
  • read.csv
    方法。
    另一个优点是,现在您可以将其分成较小的块。
        
python azure apache-spark databricks azure-databricks
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.