我是 Databricks 新手,希望得到任何帮助:)
我正在将 Databricks 用于 ELT 项目,以从 Blob 存储中读取数据 将数据转换为数据框, 并将数据写入另一个 blob 存储(本质上是 Lighthouse 架构)
问题: 当我尝试处理 databricks 中的大文件(1GB)时出现错误。 我怎样才能更有效地做到这一点? 我已经尝试过并行化并首先将其写入镶木地板,但我仍然遇到资源限制的问题。
我的环境是: Azure 数据块 集群计算大小:DBR 14.3 SPARK3.5.0。 4-8核
当我尝试处理大小为 1GB 的文件时出现错误。
我的代码是这样的:
# Define your storage account details
storage_account_name = "{SomeblobName}"
storage_account_key = "{SomePassword}"
# Create a connection string
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
# Create a BlobServiceClient object
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Specify the container and blob (file) you want to access
container_name = "incoming\SampleData"
file_name = "{SomeName}.json"
# Get a BlobClient object to interact with the blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
# Download the content of the blob
blob_data = blob_client.download_blob().readall()
# If the file is JSON, load it into a Python dictionary
json_data = json.loads(blob_data)
# Display the data or perform further operations
print(json_data)
这部分有效。
当我尝试加载和处理文件时的第二部分:
# COMMAND ----------
# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("JsonProcessing").getOrCreate()
# Assuming `json_data` is a dictionary or list, we first parallelize it and then read it as JSON
# Convert `json_data` into an RDD and load it as a DataFrame
df_root_spark = spark.read.json(spark.sparkContext.parallelize([json_data]))
# Display the DataFrame
display(df_root_spark.limit(10))
我收到错误:
java.lang.Exception: Results too large
at com.databricks.backend.daemon.driver.OutputAggregator$.maybeApplyOutputAggregation(OutputAggregator.scala:514)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:861)
at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1575)
at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:335)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
我也是新手,我发表评论纯粹是为了能够看到你问题的答案。也许如果结果太大(假设这不是转移注意力的错误),您应该尝试将字典列表拆分为较小的块并对其进行迭代?希望能为您提供一些不错的答案:)