使用 Databricks Pyspark 读取大型 JSON 文件时出错 - 如何处理大型 json 文件

问题描述 投票:0回答:1

我是 Databricks 新手,希望得到任何帮助:)

我正在将 Databricks 用于 ELT 项目,以从 Blob 存储中读取数据 将数据转换为数据框, 并将数据写入另一个 blob 存储(本质上是 Lighthouse 架构)

问题: 当我尝试处理 databricks 中的大文件(1GB)时出现错误。 我怎样才能更有效地做到这一点? 我已经尝试过并行化并首先将其写入镶木地板,但我仍然遇到资源限制的问题。

我的环境是: Azure 数据块 集群计算大小:DBR 14.3 SPARK3.5.0。 4-8核

当我尝试处理大小为 1GB 的文件时出现错误。

我的代码是这样的:

# Define your storage account details
storage_account_name = "{SomeblobName}"
storage_account_key = "{SomePassword}"

# Create a connection string
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"

# Create a BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)

# Specify the container and blob (file) you want to access
container_name = "incoming\SampleData"
file_name = "{SomeName}.json"

# Get a BlobClient object to interact with the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)

# Download the content of the blob
blob_data = blob_client.download_blob().readall()

# If the file is JSON, load it into a Python dictionary
json_data = json.loads(blob_data)

# Display the data or perform further operations
print(json_data)

这部分有效。

当我尝试加载和处理文件时的第二部分:

# COMMAND ----------

# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("JsonProcessing").getOrCreate()

# Assuming `json_data` is a dictionary or list, we first parallelize it and then read it as JSON
# Convert `json_data` into an RDD and load it as a DataFrame
df_root_spark = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Display the DataFrame
display(df_root_spark.limit(10))

我收到错误:

java.lang.Exception: Results too large
    at com.databricks.backend.daemon.driver.OutputAggregator$.maybeApplyOutputAggregation(OutputAggregator.scala:514)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:861)
    at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1575)
    at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
json databricks azure-databricks data-engineering
1个回答
0
投票

我也是新手,我发表评论纯粹是为了能够看到你问题的答案。也许如果结果太大(假设这不是转移注意力的错误),您应该尝试将字典列表拆分为较小的块并对其进行迭代?希望能为您提供一些不错的答案:)

© www.soinside.com 2019 - 2024. All rights reserved.