Kafka 源连接器编码问题 | Oracle JDBC 连接器

问题描述 投票:0回答:1

我正在尝试使用 Kafka connect JDBC 连接器连接到 Oracle 服务器,这是我的连接器配置:

CREATE SOURCE CONNECTOR hsr_source_connector2
WITH (
  'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.user' = '{User}',
  'connection.password' = '{PASS}',
  'mode' = 'bulk',
  'query' = 'select * from <Tablename>',
  'topic.prefix' = 'newuser_',
  'characterEncoding'='UTF-8',
  'transforms' = 'createKey,extractInt',
  'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'USER_ID',
  'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
  'transforms.extractInt.field' = 'USER_ID'
);

主题已创建,它也读取数据,但数据出现垃圾字符。

rowtime: 2024/04/18 17:08:51.028 Z, key: VN185082, value: ☺►VN185082►VN185082♠FSC☻☻W
rowtime: 2024/04/18 17:08:51.028 Z, key: MD250626, value: ☺►MD250626►MD250626♠SME☻☻R

流定义:

CREATE STREAM hsr_users
 (
 user_id varchar,
 user_name varchar,
 pswd varchar,
 user_role varchar,
 access_level varchar
 )WITH (kafka_topic='newuser_', value_format='JSON', partitions=1);

您能否建议如何应用编码并为此获取正确的文档。参考这篇文档https://docs.confluence.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

提前致谢!

apache-kafka apache-kafka-connect ksqldb
1个回答
0
投票

得到答案了, 这可以通过添加以下属性来纠正:

CREATE SOURCE CONNECTOR hsr_source_connector3
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
  'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.user' = '{User}',
  'connection.password' = '{PASS}',
  'mode' = 'bulk',
  'query' = 'select * from <Tablename>',
  'topic.prefix' = 'newEncode_',
  'characterEncoding' = 'AL32UTF8', -- Updated: Added charset parameter
  'transforms' = 'createKey,extractInt',
  'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
  'transforms.createKey.fields' = 'USER_ID',
  'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
  'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
  'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
  'transforms.extractInt.field' = 'USER_ID',
  'key.converter.schemas.enable'='false',
  'value.converter.schemas.enable'='false',
  'errors.tolerance' = 'all',
  'errors.log.enable' = 'true',
  'errors.log.include.messages' = 'true'
);

这将给出以下格式的数据:

Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2024/04/19 11:08:19.755 Z, key: "HG185035", value: {"USER_ID":"HG185035","USER_NAME":"HG185035","PSWD":null,"USER_ROLE":"SME","ACCESS_LEVEL":"R"}

基本上,我错过了 key.convertor 和 value.convertor 属性。我从汇合文档中配置了正确的属性: https://www.confluence.io/en-gb/blog/kafka-connect-deep-dive-converters-serialization-explained/

© www.soinside.com 2019 - 2024. All rights reserved.