Flink 无法反序列化 Debezium 生成的 JSON

问题描述 投票:0回答:2

我正在尝试使用 Flink 来消费 Debezium 生成的更改事件日志。 JSON 是这样的:

{
    "schema":{

    },
    "payload":{
        "before":null,
        "after":{
            "team_config_id":3800,
            "team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
            "team_config_sfdc_account_id":null,
            "team_config_sfdc_account_url":null,
            "team_config_business_type":5,
            "team_config_dpsa_status":0,
            "team_config_desc":null,
            "team_config_company_id":null,
            "team_config_hm_count_stages":null,
            "team_config_assign_credits_times":null,
            "team_config_real_renew_date":null,
            "team_config_action_date":null,
            "team_config_last_action_date":null,
            "team_config_business_tier_notification":"{}",
            "team_config_create_date":1670724933000,
            "team_config_update_date":1670724933000,
            "team_config_rediscovery_tier":0,
            "team_config_rediscovery_tier_notification":"{}",
            "team_config_sfdc_industry":null,
            "team_config_sfdc_market_segment":null,
            "team_config_unterminated_note_id":0
        },
        "source":{

        },
        "op":"c",
        "ts_ms":1670724933149,
        "transaction":null
    }
}

我尝试了两种方法来声明输入模式。

第一种方式是直接解析JSON数据:

create table team_config_source (
      `payload` ROW <
        `after` ROW <
          ...
          team_config_create_date timestamp(3),
          team_config_update_date timestamp(3),
          ...
        >
      >
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'json'
    )

但是 Flink 会抛出由

org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date
引起的错误
java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0
。 Flink不支持这种格式的时间戳吗?

我还尝试了另一种方法,使用内置的 Debezium 格式:

create table team_config_source (
      team_config_create_id int,
      ...
    ) WITH (
    'connector' = 'kafka',
    ...
    'format' = 'debezium-json'
    )

但是 Flink 又出现了由

java.io.IOException: Corrupt Debezium JSON message
引起的另一个错误
java.lang.NullPointerException
。我发现有人说更新事件不应该将
null
作为
before
值,但此消息是一个创建事件。

有人可以帮忙检查我的DDL吗?

apache-flink debezium flink-sql pyflink
2个回答
1
投票

我是 Flink 专家,但 Flink 中的 TIMESTAMP 不是 Epoch 时间,它是 datetime 格式

在这种情况下,您可以定义如下表格:

team_config_create_bigint BIGINT,
team_config_update_bigint BIGINT,
...
team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))

0
投票

这篇文章可能会有所帮助,它建议将

'debezium-json.schema-include' = 'true'
添加到连接器WITH 语句中。

© www.soinside.com 2019 - 2024. All rights reserved.