我无法让我的 PySpark 应用程序将更新的 DataFrame 写入持久 Parquet 存储。
我遵循了创建 Spark 会话、读取/写入 DataFrame 以及更新相关 DataFrame 的常规步骤。执行此操作时,我收到错误:
AnalysisException:[UNSUPPORTED_OVERWRITE.TABLE] 无法覆盖正在读取的目标。
我正在尝试做什么:
类型 | 行动1 | 类型 | 行动2(稍后) |
---|---|---|---|
获取 | 获取数据 | 获取 | 获取新数据 |
写 | 使用 parquet 格式在表 TABLE 中写入数据 | 阅读 | 阅读表格 |
SCD1 | 执行SCD1来更新我的数据 | ||
写 | 在表中写入数据 TABLE | ||
错误 | 错误:[UNSUPPORTED_OVERWRITE.TABLE] |
我认为我对某些事情有误解......任何人都可以指出我正确的方向或提供解决这个问题的方法吗?我非常感谢您提供的任何帮助或指导!
我尝试过各种方法:
df1 cache() and count()
然后 del(df)
然后 df1 write
还有一个手动管理文件的过程,但看起来很疯狂:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# Create a Spark session with a connection to my Dockerized Spark cluster
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
# Declare some useful variables
path = "/data/user_table"
database_name = "DB_test"
table_name = "test_users"
# create dataframe and store it
df_ = spark.createDataFrame(
[(0, "Tom"),
(0, "Alice")],
["age", "name"])
# write df
df_.write.mode('overwrite') \
.option("path", path) \
.saveAsTable(f"{database_name}.{table_name}")
# read the dataframe back from storage
df = spark.table(f"{database_name}.{table_name}")
# make something
df1 = df.withColumn('test', F.lit(1))
# save back
df1.write.mode('overwrite') \
.option("path", path)
.saveAsTable(f"{database_name}.{table_name}")
name: spark-cluster
services:
spark-master:
image: apache/spark:3.5.1
container_name: spark-master
hostname: spark-master
environment:
- SPARK_MODE=master
- SPARK_MASTER_WEBUI_PORT=8080
- SPARK_MASTER_PORT=7077
- SPARK_SUBMIT_OPTIONS=--packages io.delta:delta-spark_2.12:3.2.1,io.delta:delta-storage_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
- SPARK_MASTER_HOST=spark-master
ports:
- 8080:8080
- 7077:7077
networks:
- spark-network
volumes:
- ./data:/data:rw
entrypoint:
- "bash"
- "-c"
- "/opt/spark/sbin/start-master.sh && tail -f /dev/null"
spark-connect:
image: apache/spark:3.5.1
container_name: spark-connect
hostname: spark-connect
ports:
- "4040:4040"
- "15002:15002"
networks:
- spark-network
depends_on:
- spark-master
volumes:
- ./jars/spark-connect_2.12-3.5.1.jar:/opt/spark/jars/spark-connect_2.12-3.5.1.jar
- ./jars/delta-spark_2.12-3.2.0.jar:/opt/spark/jars/delta-spark_2.12-3.2.0.jar
- ./jars/delta-storage-3.2.0.jar:/opt/spark/jars/delta-storage-3.2.0.jar
- ./data:/data:rw
command:
- "bash"
- "-c"
- '/opt/spark/sbin/start-connect-server.sh --jars /opt/spark/jars/spark-connect_2.12-3.5.1.jar,/opt/spark/jars/delta-spark_2.12-3.2.0.jar,delta-storage-3.2.0.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" && tail -f /dev/null'
spark-worker:
image: apache/spark:3.5.1
container_name: spark-worker
hostname: spark-worker
environment:
- SPARK_MODE=worker
- SPARK_MASTER=spark://spark-master:7077
- SPARK_WORKER_CORES=8
- SPARK_WORKER_MEMORY=16G
- SPARK_WORKER_WEBUI_PORT=8081
ports:
- 8081:8081
networks:
- spark-network
volumes:
- ./data:/data:rw
depends_on:
- spark-master
entrypoint:
- "bash"
- "-c"
- "/opt/spark/sbin/start-worker.sh spark://spark-master:7077 && tail -f /dev/null"
networks:
spark-network:
从以下位置获取 jar 文件:
delta 格式让它发挥作用
df_.write.format('delta') \
.mode('overwrite') \
.option("path", path) \
.saveAsTable(f"{database_name}.{table_name}")
df = spark.table(f"{database_name}.{table_name}").limit(1)
df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.{table_name}")