自动加载器过滤重复项不起作用

问题描述 投票:0回答:1

我正在通过 azure 数据湖中的 dataverse 从 D365 crm 加载数据。我已经使用

append only
功能配置了突触链接,这也将保留历史数据。

使用自动加载器并使用以下代码从 adls 读取数据。

df = (
    spark.readStream
    .option("delimiter", ",")
    .option("quote", '"')
    .option("mode", "permissive")
    .option("lineSep", "\r\n")
    .option("multiLine", "true")
    .format("cloudFiles")
    .option("cloudFiles.format", source_format)
    .option("cloudFiles.schemaLocation", checkpoint_directory)
    .option("header", "false")
    .option("escape", '"')
    .schema(schema)
    .load(data_source)
)

接下来我尝试对

df
进行一些转换,以便仅保留基于最新
modifiedon
列的唯一 ID(无重复)。

from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("max_modifiedon"))
agg = agg.select("id", "modifiedon")
final =df.join(agg, on=["id", "modifiedon"], how="inner")

然后尝试加载到增量表(使用azure databricks)

final.writeStream.format("delta").option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")

查询表格后,我仍然可以看到重复的 ID,并且

max(modifedon)
未被选中。

我哪里出错了?

azure pyspark azure-databricks autoload
1个回答
0
投票

您可以使用

window
函数并聚合它。 使用下面的代码。

df = df.dropDuplicates().withColumn("modifiedon",to_timestamp("modifiedon","dd/MM/yyyy")).withWatermark("modifiedon", "1 day")
agg = df.groupBy(window("modifiedon","10 day"),"id").agg(max("modifiedon").alias("modifiedon")).select("id","modifiedon")
final =df.join(agg, on=["id","modifiedon"], how="inner")

这里我使用的是10天的window,你根据你的流媒体数据来使用。 我建议您使用超过 7 天的窗口。

enter image description here

输出:

final.writeStream.format("delta").option("checkpointLocation", "/checkpoint_directory/").trigger(availableNow=True).start("/out_delta/")
spark.read.format("delta").load("/out_delta/").show()

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.