使用
pd.read_csv(chunksize=CHUNKSIZE)
CHUNKSIZE=5_000_000
CHUNKSIZE=3_000_000
我仍然会遇到相同的错误,但是在以后的迭代中(139或4.17亿行,因此似乎在一定数量的行后发生了):
org.apache.spark.sparkexception:由于阶段失败而流产的工作:序列化任务43293:250是282494392字节,超过最大允许:spark.rpc.message.maxsize(268435456字节)。考虑增加spark.rpc.message.maxsize或使用广播变量用于大值。我知道直接阅读到Spark DF是理想的选择,但是我无法找到直接读取Azure Blob中的ZIP文件的方法,因此绝对也可以在这方面的建议开放。 我的群集配置:
2工人
Standard_DS3_v2
14 GB内存,4个核心56 GB内存,8核驱动器
Standard_DS13_v2
我尝试过的事物:
spark.rpc.message.maxSize
如使用错误消息所建议的
SparkSession.builder.config("spark.rpc.message.maxSize", "512")
所建议的情况)。这也是databricks
SparkSession.builder.config("spark.rpc.message.maxSize", "536870912")
spark.conf.get("spark.rpc.message.maxSize")
时,这些设置仍会给我相同的错误
使用
spark.conf.set("spark.rpc.message.maxSize", "512")
实例化Spark实例后,这给了我一个错误,说明Spark的实例化后无法更改该参数
ENTIRE CODEBLOCK:
def convert_zip_to_delta(snapshot_date: str, start_chunk: int = 0):
# File paths
zip_file = f"{snapshot_date}.zip"
delta_file = f"{snapshot_date}_delta"
delta_table_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{delta_file}/"
spark = (
SparkSession.builder.config("spark.sql.shuffle.partitions", "100")
.config("spark.hadoop.fs.azure.retries", "10")
.config("spark.rpc.message.maxSize", "536870912") # 512 MiB in bytes
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
spark.conf.set(f"fs.azure.account.key.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", BLOB_CREDENTIAL)
print("**** spark.rpc.message.maxSize = ", spark.conf.get("spark.rpc.message.maxSize"))
if start_chunk == 0:
# Delete file if exists
print("**** Deleting existing delta table")
if fs.exists(f"{CONTAINER_NAME}/{delta_file}"):
fs.rm(f"{CONTAINER_NAME}/{delta_file}", recursive=True)
chunksize = 3_000_000
with fs.open(f"{CONTAINER_NAME}/{zip_file}", "rb") as file:
with zipfile.ZipFile(file, "r") as zip_ref:
file_name = zip_ref.namelist()[0]
with zip_ref.open(file_name) as csv_file:
csv_io = TextIOWrapper(csv_file, "utf-8")
headers = pd.read_csv(csv_io, sep="\t", nrows=0).columns.tolist()
chunk_iter = pd.read_csv(
csv_io,
sep="\t",
header=None,
names=headers,
usecols=["col1", "col2", "col3"],
dtype=str, # Read all as strings to avoid errors
chunksize=chunksize,
skiprows=start_chunk*chunksize
)
for chunk in tqdm(chunk_iter, desc="Processing chunks"):
# Convert pd DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(chunk)
(spark_df.repartition(8).write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(delta_table_path)
)
# Clear memory after each iteration
spark_df.unpersist(blocking=True)
del chunk
del spark_df
gc.collect()
read.csv
方法。 另一个优点是,现在您可以将其分成较小的块。