静态数据加入Kafka Stream时如何避免PyFlink中出现重复记录?

问题描述 投票:0回答:1

我正在使用 PyFlink,并遇到了一个问题,尽管我的 SQL 查询已经过测试并且在 SQL 中正常工作,但我的最终输出包含重复的记录。我的设置包括一个接收 CalledNumber 的 Kafka 消费者,我尝试使用从 CSV 文件读取的静态数据集加入该流。 这是我的设置:

  • 静态数据:从 CSV 文件中读取,并注册为名为 static_data fields(zone,code,rate) 的表
  • 流数据:从Kafka消费,注册为名为streaming_data的表。(主要对 CalledNumber 和 uniqueRecordId 感兴趣)
  • SQL查询:根据条件在streaming_data和static_data之间执行左连接。

我首先尝试作为一个大查询执行


 with cte as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,Zone
        ,CAST( Code as DOUBLE) as Code
        ,Rate
        ,sd.FileName as FileName
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
        )
        ,
        LongestMatch as(
        select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,max(Code) as Code
        ,UniqueRecordID
        ,FileName
        from cte
        group by
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,FileName
        )
        select 
        CalledNumber
        ,Zone
        ,CAST(st.Code AS VARCHAR(30)) AS Code
        ,CAST(Rate AS VARCHAR(30)) AS Rate
        ,SetupTime
        ,UniqueRecordID
        ,FileName
        from LongestMatch lm
        left join static_data st on lm.Code= CAST (st.Code as DOUBLE)

我还尝试使用表 API 将查询拆分为一个查询和多个表

static_data = ts_env.from_path("static_data")
    static_data_renamed = static_data.select(
        expr.col("Zone").alias("static_zone"),
        expr.col("Code"),
        expr.col("Rate")
     
    )


    cte_query= """
         select 
        SetupTime
        ,CallingNumber
        ,CalledNumber
        ,UniqueRecordID
        ,CAST( Code as DOUBLE) as Code
        ,sd.FileName as FileName        
        from streaming_data sd
        left join static_data st
        ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
    """
    cte = ts_env.sql_query(cte_query)
    ts_env.create_temporary_view("cte", cte)
  
    # Group by 'Zone' and calculate the maximum 'Code' for each group
    max_code_by_zone = (
        cte.group_by(
            expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
        .select(
            expr.col("Code").max.alias("max_code")
            ,expr.col("CalledNumber")
            ,expr.col("SetupTime")
            ,expr.col("UniqueRecordID")
            ,expr.col("FileName")
            )
    )
    
   
    #Join the tables on the specified condition
    joined_table = (
        static_data_renamed
        .join(max_code_by_zone)
        .where( expr.col("Code").cast(DataTypes.DOUBLE()) == expr.col("max_code"))
    )

    # # # Select specific columns from the joined table
    final_result = (
        joined_table
        .select(
            expr.col("CalledNumber"),
            expr.col("static_zone").alias("Zone"),
            expr.col("Code").cast(DataTypes.STRING()).alias("Code"),
            expr.col("Rate").cast(DataTypes.STRING()).alias("Rate"),
            expr.col("SetupTime"),
            expr.col("UniqueRecordID"),
            expr.col("FileName")
        )
    )

    final_result.execute().print()

这里是示例结果

| +I |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| -U |                    xxxxxxxxxxx |               236 |     0.31771425 |     2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| +I |                     zzzzzzzzzz |               266 |      0.4332153 |     2024-08-09T14:17:50.927606 | b08f3a3f-4f68-4915-a7c8-7c2
| +U |                   yyyyyyyyyyyy |             23350 |     0.16603965 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| +I |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| -U |                   yyyyyyyyyyyy |               233 |     0.21922425 |     2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324

我还尝试应用 ROW_NUMBER() 来排除并获取最终结果。

使用 KAFKA 进行窗口发射最终时是否有类似的方法。

还有第二种方法,但我在第一步失败了,当尝试将 steupTime 定义为模式的 Types.SQL_TIMESTAMP() 和 DataTypes.TIMESTAMP_LTZ() 时,第一次反序列化时总会出现问题 第一个错误

 java.time.format.DateTimeParseException: Text '2024-08-09 14:27:20.968' could not be parsed at index 10

第二个错误

Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.Instant (java.sql.Timestamp is in module java.sql of loader 'platform'; 
apache-flink streaming etl pyflink
1个回答
0
投票

我使用翻滚窗口解决了这个问题,遵循文档 它解决了问题,我不知道是否有其他方法。

© www.soinside.com 2019 - 2024. All rights reserved.