读取数据并将数据写回同一个表 [UNSUPPORTED_OVERWRITE.TABLE]

问题描述 投票:0回答:1

我无法让我的 PySpark 应用程序将更新的 DataFrame 写入持久 Parquet 存储。

我尝试读取和写入存储的表

我遵循了创建 Spark 会话、读取/写入 DataFrame 以及更新相关 DataFrame 的常规步骤。执行此操作时,我收到错误:

AnalysisException:[UNSUPPORTED_OVERWRITE.TABLE] 无法覆盖正在读取的目标。

我正在尝试做什么:

类型 行动1 类型 行动2(稍后)
获取 获取数据 获取 获取新数据
使用 parquet 格式在表 TABLE 中写入数据 阅读 阅读表格
SCD1 执行SCD1来更新我的数据
在表中写入数据 TABLE
错误 错误:[UNSUPPORTED_OVERWRITE.TABLE]

我认为我对某些事情有误解......任何人都可以指出我正确的方向或提供解决这个问题的方法吗?我非常感谢您提供的任何帮助或指导!

我已经尝试过了

我尝试过各种方法:

  1. community.cloudera.com > 如何同时读取和写入同一个 parquet 文件
  2. community.cloudera.com > 无法覆盖也从
  3. 读取的路径
  4. stackoverflow.com > 读取并写回同一 S3 位置
    df1 cache() and count()
    然后
    del(df)
    然后
    df1 write

还有一个手动管理文件的过程,但看起来很疯狂:

  1. stackoverflow.com > 无法覆盖 pyspark 中的 parquet hive 表
  2. stackoverflow.com > Pyspark 读取所有文件并在转换后将其写回同一个文件

这是一个可以重现的简化代码:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Create a Spark session with a connection to my Dockerized Spark cluster
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

# Declare some useful variables
path          = "/data/user_table"
database_name = "DB_test"
table_name    = "test_users"

# create dataframe and store it
df_ = spark.createDataFrame( 
        [(0,  "Tom"), 
        (0,  "Alice")], 
        ["age", "name"])

# write df
df_.write.mode('overwrite') \
    .option("path", path) \
    .saveAsTable(f"{database_name}.{table_name}")

# read the dataframe back from storage 
df = spark.table(f"{database_name}.{table_name}")

# make something
df1 = df.withColumn('test', F.lit(1))

# save back
df1.write.mode('overwrite') \
    .option("path", path)
    .saveAsTable(f"{database_name}.{table_name}")

我使用 Dockerized Spark 集群,如下所示:

name: spark-cluster
services:
  spark-master:
    image: apache/spark:3.5.1
    container_name: spark-master
    hostname: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_MASTER_WEBUI_PORT=8080
      - SPARK_MASTER_PORT=7077
      - SPARK_SUBMIT_OPTIONS=--packages io.delta:delta-spark_2.12:3.2.1,io.delta:delta-storage_2.12:3.2.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
      - SPARK_MASTER_HOST=spark-master
    ports:
      - 8080:8080
      - 7077:7077
    networks:
      - spark-network
    volumes:
      - ./data:/data:rw
    entrypoint:
      - "bash"
      - "-c"
      - "/opt/spark/sbin/start-master.sh && tail -f /dev/null"

  spark-connect:
    image: apache/spark:3.5.1
    container_name: spark-connect
    hostname: spark-connect
    ports:
      - "4040:4040"
      - "15002:15002"
    networks:
      - spark-network
    depends_on:
      - spark-master
    volumes:
      - ./jars/spark-connect_2.12-3.5.1.jar:/opt/spark/jars/spark-connect_2.12-3.5.1.jar
      - ./jars/delta-spark_2.12-3.2.0.jar:/opt/spark/jars/delta-spark_2.12-3.2.0.jar
      - ./jars/delta-storage-3.2.0.jar:/opt/spark/jars/delta-storage-3.2.0.jar
      - ./data:/data:rw
    command:
      - "bash"
      - "-c"
      - '/opt/spark/sbin/start-connect-server.sh --jars /opt/spark/jars/spark-connect_2.12-3.5.1.jar,/opt/spark/jars/delta-spark_2.12-3.2.0.jar,delta-storage-3.2.0.jar --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" && tail -f /dev/null'
  spark-worker:
    image: apache/spark:3.5.1
    container_name: spark-worker
    hostname: spark-worker
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=8
      - SPARK_WORKER_MEMORY=16G
      - SPARK_WORKER_WEBUI_PORT=8081
    ports:
      - 8081:8081
    networks:
      - spark-network
    volumes:
      - ./data:/data:rw
    depends_on:
      - spark-master
    entrypoint:
      - "bash"
      - "-c"
      - "/opt/spark/sbin/start-worker.sh spark://spark-master:7077 && tail -f /dev/null"

networks:
  spark-network:

从以下位置获取 jar 文件:


apache-spark pyspark parquet
1个回答
0
投票

delta 格式让它发挥作用

df_.write.format('delta') \
   .mode('overwrite') \
   .option("path", path) \
   .saveAsTable(f"{database_name}.{table_name}")

df = spark.table(f"{database_name}.{table_name}").limit(1)
df.write.format("delta").mode("overwrite").saveAsTable(f"{database_name}.{table_name}")
© www.soinside.com 2019 - 2024. All rights reserved.