我正在通过 azure 数据湖中的 dataverse 从 D365 crm 加载数据。我已经使用
append only
功能配置了突触链接,这也将保留历史数据。
使用自动加载器并使用以下代码从 adls 读取数据。
df = (
spark.readStream
.option("delimiter", ",")
.option("quote", '"')
.option("mode", "permissive")
.option("lineSep", "\r\n")
.option("multiLine", "true")
.format("cloudFiles")
.option("cloudFiles.format", source_format)
.option("cloudFiles.schemaLocation", checkpoint_directory)
.option("header", "false")
.option("escape", '"')
.schema(schema)
.load(data_source)
)
接下来我尝试对
df
进行一些转换,以便仅保留基于最新 modifiedon
列的唯一 ID(无重复)。
from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("max_modifiedon"))
agg = agg.select("id", "modifiedon")
final =df.join(agg, on=["id", "modifiedon"], how="inner")
然后尝试加载到增量表(使用azure databricks)
final.writeStream.format("delta").option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
查询表格后,我仍然可以看到重复的 ID,并且
max(modifedon)
未被选中。
我哪里出错了?
您可以使用
window
函数并聚合它。
使用下面的代码。
df = df.dropDuplicates().withColumn("modifiedon",to_timestamp("modifiedon","dd/MM/yyyy")).withWatermark("modifiedon", "1 day")
agg = df.groupBy(window("modifiedon","10 day"),"id").agg(max("modifiedon").alias("modifiedon")).select("id","modifiedon")
final =df.join(agg, on=["id","modifiedon"], how="inner")
这里我使用的是10天的window,你根据你的流媒体数据来使用。 我建议您使用超过 7 天的窗口。
输出:
final.writeStream.format("delta").option("checkpointLocation", "/checkpoint_directory/").trigger(availableNow=True).start("/out_delta/")
spark.read.format("delta").load("/out_delta/").show()