我有文件列表,我对它们执行以下操作
import org.apache.spark.sql.*;
for(File file :files){
df = spark.read.csv(file)
df = df.withColumn("last_update_date",from_unixtime(unix_timestamp(from_utc_timestamp(current_timestamp(), "PST"))))
Thread.sleep(500)
}
Finaldf = Finaldf.union(df)
然后我会根据我添加的列进行重复数据删除。我开始知道,由于 Spark 的惰性评估,所有文件都具有相同的
last_update_date
值,最终我从最终的 DF 中删除了错误的重复项。
我已尝试以下方法,但有时仍然会遇到问题
df.cache()
如上所述,在尝试上述 2 个解决方案后,我发现该问题的频率降低了,但我想完全消除它。任何解决方案将不胜感激。
在 Spark 的惰性评估中,您使用的是
current_timestamp()
,在这种情况下,当最终触发操作时,所有行都会获得相同的时间戳值,这就是您面临问题的原因。 RealSkeptic 建议使用 int 值而不是 current_timestamp。
如果您想确保每个文件都有唯一的时间戳,那么:
import org.apache.spark.sql.*;
// Add python lib of time
import time
// assuming the files in array formate
Finaldf = spark.createDataFrame([], schema)
for (file in files){
df = spark.read.csv(file)
// Get current time in milliseconds at first using time lib
current_time_in_millisec = int(time.mktime(Date().timetuple()) * 1000)
// Your line of code
//df = df.withColumn("last_update_date",from_unixtime(unix_timestamp(from_utc_timestamp(current_timestamp(), "PST"))))
// New way of this: Here used lit() function to add static value at first then add the current timestamp
df = df.withColumn("last_update_date", from_unixtime(lit(current_time_in_millisec / 1000).cast('long')))
Finaldf = Finaldf.union(df)
time.sleep(0.5)
//Thread.sleep(500)
}
//Finaldf = Finaldf.union(df)
我评论了你的代码并并排编写了改进版本。