假设我想为我的 Flink SQL 使用 kafka 源...由 aiven 管理。
如何获取消息的密钥?
问题:
我用一点java在源主题上生成消息,kafka消息看起来像这样:(我使用
avro
作为消息的value
,一个简单的string
作为key
)
{
key: "key12", << string
topic: "mytopic",
value: { ... content ... } << avro
}
一切似乎都已正确序列化。例如,当使用 aiven 控制台时,我可以看到主题
mytopic
中消息的键。
我期待 flink 将它放入我的另一个主题“mysink”中......它有效......但键都是空的!
更多详情:
我的源定义如下所示:
CREATE TABLE my_source (
name STRING,
the_kafka_key STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'mytopic',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '...',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '...'
)
我的水槽是这样的:
CREATE TABLE my_sink (
name STRING,
key STRING
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'scan.startup.mode' = 'earliest-offset',
'topic' = 'mysink',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '...',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '...'
)
我的 FlinkSQL 语句(这个词正确吗?)如下所示:
INSERT INTO mysink
SELECT
mytopic.name AS name,
mytopic.the_kafka_key AS key
FROM mytopic
当我查看我的主题时:我可以看到按键。 当我查看我的水槽时:我看不到钥匙!他们是空的! 不过消息本身已被正确解码...
我错过了什么?我可以做什么检查出了什么问题???
您提供的代码存在一些命名问题。可能只是从不同版本复制粘贴?您正在执行
mytopic.name AS name
,但当您的表接收器名为 my_sink时,您的源表名为
my_source
和 INSERT INTO mysink
。
我认为您需要将
'value.fields-include' = 'EXCEPT_KEY'
添加到表定义中。因为您的键和值字段具有不同的数据类型。更多信息在这里:重叠格式字段
示例:
# we sending 4 lines to topic TEST
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"3;2:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"abv;gd:1;2;beh"
--creating source table
CREATE TABLE KafkaTable (
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
`user_id` STRING,
`item_id` STRING,
`user` BIGINT,
`item` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'TEST',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test_key',
'properties.auto.offset.reset' = 'earliest',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'csv',
'key.csv.ignore-parse-errors' = 'true',
'key.csv.field-delimiter' = ';',
'key.fields' = 'user_id;item_id',
'value.format' = 'csv',
'value.csv.ignore-parse-errors' = 'true',
'value.csv.field-delimiter' = ';',
'value.fields-include' = 'ALL'
)
输出1
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op | ts | user_id | item_id | user | item | behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:10:56.387 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:12:14.891 | 1 | 2 | (NULL) | (NULL) | (NULL) |
| +I | 2024-10-03 16:14:36.171 | 1 | 2 | (NULL) | (NULL) | (NULL) |
然后我们设置
'value.fields-include' = 'EXCEPT_KEY'
输出正常:
# Key and Value both showed correctly
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op | ts | user_id | item_id | user | item | behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 | usrid | itId | 1 | 2 | beh |
| +I | 2024-10-03 16:10:56.387 | usrid | itId | 1 | 2 | beh |
| +I | 2024-10-03 16:12:14.891 | 3 | 2 | 1 | 2 | beh |
| +I | 2024-10-03 16:14:36.171 | abv | gd | 1 | 2 | beh |