使用 PySpark 将 JSON 数据存储为 Delta Lake 表中的空值

问题描述 投票:0回答:1

我在尝试使用 PySpark 和 Delta Lake 将 JSON 数据存储为 Delta Lake 表时遇到问题。

这是我的代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta import *

delta_version = "2.4.0" 

spark = SparkSession.builder \
    .appName("JSONToDeltaLake") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", f"io.delta:delta-core_2.12:{delta_version}") \
    .getOrCreate()

json_data = """
[
    {
        "name": "John Doe",
        "age": 30,
        "city": "New York"
    },
    {
        "name": "Jane Smith",
        "age": 25,
        "city": "Los Angeles"
    }
]
"""

json_path = "example_data.json"
with open(json_path, "w") as file:
    file.write(json_data)

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

try:
    df = spark.read.schema(schema).json(json_path)
except Exception as e:
    print(f"Error reading JSON file: {e}")
    spark.stop()
    exit(1)

df.printSchema()
df.show()

delta_path = "example_delta_table"
df.write.format("delta").mode("overwrite").save(delta_path)
delta_table = DeltaTable.forPath(spark, delta_path)
delta_df = delta_table.toDF()
delta_df.show()

spark.stop()

此代码生成示例 JSON 数据,将其保存到文件中,使用 PySpark 读取 JSON 数据,然后将其存储为 Delta Lake 表。

但是,当我运行代码时,只有空值存储在 Delta Lake 表中。

& C:/Users/no2si/AppData/Local/Programs/Python/Python311/python.exe c:/Users/no2si/Documents/MarketReSearch/TodayhomeScrape/deltalpp.py
:: loading settings :: url = jar:file:/C:/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: C:\Users\no2si\.ivy2\cache
The jars for the packages stored in: C:\Users\no2si\.ivy2\jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.4.0 in central
        found io.delta#delta-storage;2.4.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 173ms :: artifacts dl 8ms
        :: modules in use:
        io.delta#delta-core_2.12;2.4.0 from central in [default]
        io.delta#delta-storage;2.4.0 from central in [default]
        org.antlr#antlr4-runtime;4.9.3 from central in [default]
        ---------------------------------------------------------------------        
        |                  |            modules            ||   artifacts   |        
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|        
        ---------------------------------------------------------------------        
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |        
        ---------------------------------------------------------------------        
:: retrieving :: org.apache.spark#spark-submit-parent-58d527be-f279-42cc-a057-5a43146af2cd
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/10ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+

24/06/07 13:58:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+----+----+----+
|name| age|city|
+----+----+----+
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
|null|null|null|
+----+----+----+

24/06/07 13:58:56 WARN SparkEnv: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:108)
        at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2175)   
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1509)        
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2175)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2081)
        at org.apache.spark.api.java.JavaSparkContext.stop(JavaSparkContext.scala:550)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)        
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)      
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)       
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)      
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
PS C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape> 24/06/07 13:58:57 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c
java.io.IOException: Failed to delete: C:\Users\no2si\Documents\MarketReSearch\TodayhomeScrape\spark_temp\spark-eb98b831-105b-46fe-8173-be24ef43c323\userFiles-6cf29488-da35-4e58-8d65-1156838ddd7c\org.antlr_antlr4-runtime-4.9.3.jar
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:151)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursivelyUsingJavaIO(JavaUtils.java:134)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:121)
        at org.apache.spark.network.util.JavaUtils.deleteRecursively(JavaUtils.java:94)
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1231)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$4$adapted(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)       
        at org.apache.spark.util.ShutdownHookManager$.$anonfun$new$2(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)      
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)    
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)        
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
SUCCESS: The process with PID 7444 (child process of PID 14792) has been terminated.
SUCCESS: The process with PID 14792 (child process of PID 13788) has been terminated.
SUCCESS: The process with PID 13788 (child process of PID 1864) has been terminated. 

我应该修改什么来解决这个问题?如果您能指导我如何在 Delta Lake 表中正确存储 JSON 数据,我将不胜感激。

此外,如果您对日志中出现的警告和错误消息的原因和解决方案有任何建议,我也将不胜感激。

谢谢你。

我尝试验证 JSON 文件是否正确加载以及指定的存储路径是否有效。

为了检查 JSON 文件是否正确加载,我添加了代码以在保存 JSON 文件后打印其内容。这使我能够确认 JSON 数据已按预期写入文件。

json apache-spark pyspark delta-lake data-processing
1个回答
0
投票

您的 json 文件使用了多行,因此您需要使用多行选项来读取文件。 读取 json 文件时添加 .option("multiline", "true")

df = spark.read.schema(schema).option("multiline", "true").json(json_path)

参考资料:

将 JSON 文件读入 Spark 时出现

_corrupt_record 错误

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html

© www.soinside.com 2019 - 2024. All rights reserved.