我正在尝试使用 Flink 来消费 Debezium 生成的更改事件日志。 JSON 是这样的:
{
"schema":{
},
"payload":{
"before":null,
"after":{
"team_config_id":3800,
"team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
"team_config_sfdc_account_id":null,
"team_config_sfdc_account_url":null,
"team_config_business_type":5,
"team_config_dpsa_status":0,
"team_config_desc":null,
"team_config_company_id":null,
"team_config_hm_count_stages":null,
"team_config_assign_credits_times":null,
"team_config_real_renew_date":null,
"team_config_action_date":null,
"team_config_last_action_date":null,
"team_config_business_tier_notification":"{}",
"team_config_create_date":1670724933000,
"team_config_update_date":1670724933000,
"team_config_rediscovery_tier":0,
"team_config_rediscovery_tier_notification":"{}",
"team_config_sfdc_industry":null,
"team_config_sfdc_market_segment":null,
"team_config_unterminated_note_id":0
},
"source":{
},
"op":"c",
"ts_ms":1670724933149,
"transaction":null
}
}
我尝试了两种方法来声明输入模式。
第一种方式是直接解析JSON数据:
create table team_config_source (
`payload` ROW <
`after` ROW <
...
team_config_create_date timestamp(3),
team_config_update_date timestamp(3),
...
>
>
) WITH (
'connector' = 'kafka',
...
'format' = 'json'
)
但是 Flink 会抛出由
org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date
引起的错误 java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0
。 Flink不支持这种格式的时间戳吗?
我还尝试了另一种方法,使用内置的 Debezium 格式:
create table team_config_source (
team_config_create_id int,
...
) WITH (
'connector' = 'kafka',
...
'format' = 'debezium-json'
)
但是 Flink 又出现了由
java.io.IOException: Corrupt Debezium JSON message
引起的另一个错误 java.lang.NullPointerException
。我发现有人说更新事件不应该将 null
作为 before
值,但此消息是一个创建事件。
有人可以帮忙检查我的DDL吗?
我是 Flink 专家,但 Flink 中的 TIMESTAMP 不是 Epoch 时间,它是 datetime 格式。
在这种情况下,您可以定义如下表格:
team_config_create_bigint BIGINT,
team_config_update_bigint BIGINT,
...
team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))
这篇文章可能会有所帮助,它建议将
'debezium-json.schema-include' = 'true'
添加到连接器WITH 语句中。