我正在对来自 Teradata 数据库的数据执行增量加载并将其存储为 parquet 文件。由于 Teradata 中的表包含数十亿行,因此我希望我的 PySpark 脚本能够比较哈希值。
Teradata 表: 来自 Teradata 的示例表
当前存储的 Parquet 文件: 数据存储在镶木地板文件中
我的 PySpark 脚本使用 JDBC 读取连接来调用 teradata:
tdDF = return spark.read \
.format("jdbc") \
.option("driver", "com.teradata.jdbc.TeraDriver") \
.option("url", "jdbc:teradata://someip/DATABASE=somedb,MAYBENULL=ON") \
.option("dbtable", "(SELECT * FROM somedb.table)tmp")
读取镶木地板的 Spark 脚本:
myDF = spark.read.parquet("myParquet")
myDF.createOrReplaceTempView("myDF")
spark.sql("select * from myDF").show()
我怎样才能:
您想要插入新行,或者,如果存在具有标识信息的行,则更新它们。这称为“更新插入”,或者在 teradata 中称为“合并”。 这取决于允许更改哪些列以及哪些列使行成为“新”。
在你的例子中你有:
terradata
Name Account Product
------+--------+---------
Sam 1234 Speakers
Jane 1256 Earphones
Janet 3214 Laptop
Billy 5678 HardDisk
parquet
Name Account Product
------+--------+---------
Sam 1234 Speakers
Jane 1256 Earphones
因此,如果任何名称、帐户组合应该是唯一的,则数据库表应该为其定义唯一的键。
这样,数据库将不允许插入具有相同唯一键的另一行,但允许您更新它。
因此,按照
这个示例,使用示例数据,您的 sql 命令将如下所示:
UPDATE somedb.table SET product = 'Speakers' WHERE name = 'Sam' AND account = 1234 ELSE INSERT INTO somedb.table(name, account, product) VALUES('Sam',1234,'Speakers');
UPDATE somedb.table SET product = 'Earphones' WHERE name = 'Jane' AND account = 1256 ELSE INSERT INTO somedb.table(name, account, product) VALUES('Jane',1256,'Earphones');
UPDATE somedb.table SET product = 'Laptop' WHERE name = 'Janet' AND account = 3214 ELSE INSERT INTO somedb.table(name, account, product) VALUES('Janet',3214,'Laptop');
UPDATE somedb.table SET product = 'HardDisk' WHERE name = 'Billy' AND account = 5678 ELSE INSERT INTO somedb.table(name, account, product) VALUES('Billy',5678,'HardDisk');
但这是一种非常简单的方法,效果可能会很差。
Google 搜索“teradata 批量上传”可找到以下链接: