将具有相同模式的两个csv文件(数百万行)与主键列进行比较并打印出差异的最佳方法是什么。例如 ,
Id name zip
1 name1 07112
2 name2 07234
3 name3 10290
Id name zip
1 name1 07112
2 name21 07234
4 name4 10290
将修改后的文件CSV2与原始数据CSV1进行比较,
Id name zip
2 name21 07234 Modified
3 name3 10290 Deleted
4 name4 10290 Added
我是Spark SQL的新手,我想将数据导入Hive表,然后运行Spark SQL来识别更改。
1)是否有任何行修改方法可用于识别行是否已修改而不是比较每列中的值? 2)是否有更好的方法可以使用Spark或其他HDFS工具实现?
感谢您的反馈
存在许多方法;这是一个可以并行完成的事情:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val origDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c", "d"),
("3", "e", "f")
)).toDF("k", "v1", "v2")
val newDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c2", "d"),
("4", "g", "h")
)).toDF("k", "v1", "v2")
val df1 = origDF.except(newDF) // if k not exists in df2, then deleted
//df1.show(false)
val df2 = newDF.except(origDF) // if k not exists in df1, then added
//df2.show(false)
// if no occurrence in both dfs, then the same
// if k exists in both, then k in df2 = modified
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
val df3 = spark.sql("""SELECT df1.k, df1.v1, df1.v2, "deleted" as operation
FROM df1
WHERE NOT EXISTS (SELECT df2.k
FROM df2
WHERE df2.k = df1.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "added" as operation
FROM df2
WHERE NOT EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "modified" as operation
FROM df2
WHERE EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
""")
df3.show(false)
收益:
+---+---+---+---------+
|k |v1 |v2 |operation|
+---+---+---+---------+
|4 |g |h |added |
|2 |c2 |d |modified |
|3 |e |f |deleted |
+---+---+---+---------+
没那么难,没有标准实用程序。