我有一个项目,我正在从 Amazon S3 读取 CSV 文件并使用 Spark 执行数据处理。然后我创建一个 DataFrame 并通过 JDBC 将它加载到 PostgreSQL 数据库。但是,我收到约束冲突错误,因为
customernumber
表中的 customerlocation
列是唯一的。
我需要的是让 Spark 作业读取 CSV 文件,对其进行处理,然后将数据追加到 PostgreSQL 表中。如果在
customernumber
列中发现重复键,我希望忽略它并插入非重复记录而不会使 Spark 作业失败。
CREATE TABLE IF NOT EXISTS Customerlocations(
LOCATIONID INT,
CUSTOMERNUMBER INT,
CUSTOMERCOUNTRY VARCHAR(50),
CUSTOMERSTATE VARCHAR(50),
CUSTOMERDISTRICT VARCHAR(50),
CUSTOMERCITY VARCHAR(50),
SERIALNO VARCHAR(50),
CORPID VARCHAR(50),
INDATE TIMESTAMP,
CUSTOMERMOB2 BIGINT,
DISTRIID INT,
unique (CUSTOMERNUMBER)
);
这是我的表结构,在 spark 中具有相同的列数据框,我想将此数据框写入 jdbc,但每当发生 customernumber violation 错误时,我什么都不做并插入剩余的行,这样我就这样写了
customerlocationDF.write \
.format("jdbc") \
.option("url", jdbcurl) \
.option("dbtable", "customerlocations") \
.option("user", properties_local['user']) \
.option("password", properties_local['password']) \
.option("batchsize", "5000") \
.option("rewriteBatchedStatements", "true") \
.option("driver", properties_local['driver']) \
.option("numPartitions", "10") \
.option("saveMode", "Append") \
.option("batchsize", "1000") \
.option("truncate", "false") \
.option("ignoreDuplicateKeys", "true") \
.option("upsertBatchSize", "1000") \
.option("upsertKeyCols", "customernumber") \
.option("rewriteBatchedStatements", "true") \
.option("insertStatement", "INSERT INTO customerlocations (LOCATIONID, CUSTOMERNUMBER, CUSTOMERCOUNTRY, CUSTOMERSTATE, CUSTOMERDISTRICT, CUSTOMERCITY, SERIALNO, CORPID, INDATE, CUSTOMERMOB2, DISTRIID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (CUSTOMERNUMBER) DO NOTHING") \
.mode("append") \
.save()
它抛出错误
ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO customerlocations ("locationid","customernumber","customercountry","customerstate","customerdistrict","customercity","serialno","corpid","indate","customermob2","distriid") VALUES (22183512,26327538,'INDIA','MAHARASHTRA','PARBHANI','PARBHANI','L-B6BC2AF58C',NULL,'2023-01-01 05:32:07+05:30'::timestamp,NULL,0) was aborted: ERROR: duplicate key value violates unique constraint "customerlocations_customernumber_key"
Detail: Key (customernumber)=(26327538) already exists. Call getNextException to see other errors in the batch.
任何人都可以针对此问题提出解决方案或解决方法吗?