我正在使用 PyFlink,并遇到了一个问题,尽管我的 SQL 查询已经过测试并且在 SQL 中正常工作,但我的最终输出包含重复的记录。我的设置包括一个接收 CalledNumber 的 Kafka 消费者,我尝试使用从 CSV 文件读取的静态数据集加入该流。 这是我的设置:
我首先尝试作为一个大查询执行
with cte as(
select
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,Zone
,CAST( Code as DOUBLE) as Code
,Rate
,sd.FileName as FileName
from streaming_data sd
left join static_data st
ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
)
,
LongestMatch as(
select
SetupTime
,CallingNumber
,CalledNumber
,max(Code) as Code
,UniqueRecordID
,FileName
from cte
group by
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,FileName
)
select
CalledNumber
,Zone
,CAST(st.Code AS VARCHAR(30)) AS Code
,CAST(Rate AS VARCHAR(30)) AS Rate
,SetupTime
,UniqueRecordID
,FileName
from LongestMatch lm
left join static_data st on lm.Code= CAST (st.Code as DOUBLE)
我还尝试使用表 API 将查询拆分为一个查询和多个表
static_data = ts_env.from_path("static_data")
static_data_renamed = static_data.select(
expr.col("Zone").alias("static_zone"),
expr.col("Code"),
expr.col("Rate")
)
cte_query= """
select
SetupTime
,CallingNumber
,CalledNumber
,UniqueRecordID
,CAST( Code as DOUBLE) as Code
,sd.FileName as FileName
from streaming_data sd
left join static_data st
ON sd.CalledNumber LIKE CONCAT(st.Code, '%')
"""
cte = ts_env.sql_query(cte_query)
ts_env.create_temporary_view("cte", cte)
# Group by 'Zone' and calculate the maximum 'Code' for each group
max_code_by_zone = (
cte.group_by(
expr.col("CalledNumber")
,expr.col("SetupTime")
,expr.col("UniqueRecordID")
,expr.col("FileName")
)
.select(
expr.col("Code").max.alias("max_code")
,expr.col("CalledNumber")
,expr.col("SetupTime")
,expr.col("UniqueRecordID")
,expr.col("FileName")
)
)
#Join the tables on the specified condition
joined_table = (
static_data_renamed
.join(max_code_by_zone)
.where( expr.col("Code").cast(DataTypes.DOUBLE()) == expr.col("max_code"))
)
# # # Select specific columns from the joined table
final_result = (
joined_table
.select(
expr.col("CalledNumber"),
expr.col("static_zone").alias("Zone"),
expr.col("Code").cast(DataTypes.STRING()).alias("Code"),
expr.col("Rate").cast(DataTypes.STRING()).alias("Rate"),
expr.col("SetupTime"),
expr.col("UniqueRecordID"),
expr.col("FileName")
)
)
final_result.execute().print()
这里是示例结果
| +I | xxxxxxxxxxx | 236 | 0.31771425 | 2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| -U | xxxxxxxxxxx | 236 | 0.31771425 | 2024-08-09T14:17:50.927606 | ef15016b-8a9e-430a-acf4-cb0
| +I | zzzzzzzzzz | 266 | 0.4332153 | 2024-08-09T14:17:50.927606 | b08f3a3f-4f68-4915-a7c8-7c2
| +U | yyyyyyyyyyyy | 23350 | 0.16603965 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| +I | yyyyyyyyyyyy | 233 | 0.21922425 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
| -U | yyyyyyyyyyyy | 233 | 0.21922425 | 2024-08-09T14:17:50.927606 | 31acf9de-405d-4af9-a255-324
我还尝试应用 ROW_NUMBER() 来排除并获取最终结果。
使用 KAFKA 进行窗口发射最终时是否有类似的方法。
还有第二种方法,但我在第一步失败了,当尝试将 steupTime 定义为模式的 Types.SQL_TIMESTAMP() 和 DataTypes.TIMESTAMP_LTZ() 时,第一次反序列化时总会出现问题 第一个错误
java.time.format.DateTimeParseException: Text '2024-08-09 14:27:20.968' could not be parsed at index 10
第二个错误
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.Instant (java.sql.Timestamp is in module java.sql of loader 'platform';
我使用翻滚窗口解决了这个问题,遵循文档 它解决了问题,我不知道是否有其他方法。