FlinkSQL:访问kafka密钥

问题描述 投票:0回答:1

假设我想为我的 Flink SQL 使用 kafka 源...由 aiven 管理。

如何获取消息的密钥?

问题

我用一点java在源主题上生成消息,kafka消息看起来像这样:(我使用

avro
作为消息的
value
,一个简单的
string
作为
key 

{
  key: "key12",               << string
  topic: "mytopic",
  value: { ... content ... }  << avro
}

一切似乎都已正确序列化。例如,当使用 aiven 控制台时,我可以看到主题

mytopic
中消息的键。

我期待 flink 将它放入我的另一个主题“mysink”中......它有效......但键都是空的!

更多详情

我的源定义如下所示:

CREATE TABLE my_source (
    name STRING,
    the_kafka_key STRING 
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'mytopic',
    'key.format' = 'raw',
    'key.fields' = 'the_kafka_key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '...', 
    'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'value.avro-confluent.basic-auth.user-info' = '...'
)

我的水槽是这样的:

CREATE TABLE my_sink (
    name STRING,
    key STRING
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'scan.startup.mode' = 'earliest-offset',
    'topic' = 'mysink',
    'key.format' = 'raw',
    'key.fields' = 'key',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '...',
    'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
    'value.avro-confluent.basic-auth.user-info' = '...'
)

我的 FlinkSQL 语句(这个词正确吗?)如下所示:

INSERT INTO mysink
SELECT
    mytopic.name AS name,
    mytopic.the_kafka_key AS key
FROM mytopic

当我查看我的主题时:我可以看到按键。 当我查看我的水槽时:我看不到钥匙!他们是空的! 不过消息本身已被正确解码...

我错过了什么?我可以做什么检查出了什么问题???

apache-flink avro flink-sql aiven
1个回答
0
投票

您提供的代码存在一些命名问题。可能只是从不同版本复制粘贴?您正在执行

mytopic.name AS name
,但当您的表接收器名为
my_sink
时,您的源表名为
my_source
INSERT INTO mysink

我认为您需要将

'value.fields-include' = 'EXCEPT_KEY'
添加到表定义中。因为您的键和值字段具有不同的数据类型。更多信息在这里:重叠格式字段

示例:

# we sending 4 lines to topic TEST
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"usrid;itId:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"3;2:1;2;beh"
kfeed TEST --property "parse.key=true" --property "key.separator=:" <<<"abv;gd:1;2;beh"
--creating source table
CREATE TABLE KafkaTable (
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  `user_id` STRING,
  `item_id` STRING,
  `user` BIGINT,
  `item` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'TEST',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test_key',
  'properties.auto.offset.reset' = 'earliest',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'csv',
  'key.csv.ignore-parse-errors' = 'true',
  'key.csv.field-delimiter' = ';',
  'key.fields' = 'user_id;item_id',
  'value.format' = 'csv',
  'value.csv.ignore-parse-errors' = 'true',
  'value.csv.field-delimiter' = ';',
  'value.fields-include' = 'ALL'
)

输出1


+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op |                      ts |                        user_id |                        item_id |                 user |                 item |                       behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
| +I | 2024-10-03 16:10:56.387 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
| +I | 2024-10-03 16:12:14.891 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |
| +I | 2024-10-03 16:14:36.171 |                              1 |                              2 |               (NULL) |               (NULL) |                         (NULL) |

然后我们设置

'value.fields-include' = 'EXCEPT_KEY'

输出正常:

# Key and Value both showed correctly

+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op |                      ts |                        user_id |                        item_id |                 user |                 item |                       behavior |
+----+-------------------------+--------------------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 2024-10-03 15:54:35.668 |                          usrid |                           itId |                    1 |                    2 |                            beh |
| +I | 2024-10-03 16:10:56.387 |                          usrid |                           itId |                    1 |                    2 |                            beh |
| +I | 2024-10-03 16:12:14.891 |                              3 |                              2 |                    1 |                    2 |                            beh |
| +I | 2024-10-03 16:14:36.171 |                            abv |                             gd |                    1 |                    2 |                            beh |

© www.soinside.com 2019 - 2024. All rights reserved.