我正在尝试使用 Kafka connect JDBC 连接器连接到 Oracle 服务器,这是我的连接器配置:
CREATE SOURCE CONNECTOR hsr_source_connector2
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.user' = '{User}',
'connection.password' = '{PASS}',
'mode' = 'bulk',
'query' = 'select * from <Tablename>',
'topic.prefix' = 'newuser_',
'characterEncoding'='UTF-8',
'transforms' = 'createKey,extractInt',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'USER_ID',
'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'transforms.extractInt.field' = 'USER_ID'
);
主题已创建,它也读取数据,但数据出现垃圾字符。
rowtime: 2024/04/18 17:08:51.028 Z, key: VN185082, value: ☺►VN185082►VN185082♠FSC☻☻W
rowtime: 2024/04/18 17:08:51.028 Z, key: MD250626, value: ☺►MD250626►MD250626♠SME☻☻R
流定义:
CREATE STREAM hsr_users
(
user_id varchar,
user_name varchar,
pswd varchar,
user_role varchar,
access_level varchar
)WITH (kafka_topic='newuser_', value_format='JSON', partitions=1);
您能否建议如何应用编码并为此获取正确的文档。参考这篇文档https://docs.confluence.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
提前致谢!
得到答案了, 这可以通过添加以下属性来纠正:
CREATE SOURCE CONNECTOR hsr_source_connector3
WITH (
'connection.url' = 'jdbc:oracle:thin:@//HOSTIP:PORT/Service',
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.user' = '{User}',
'connection.password' = '{PASS}',
'mode' = 'bulk',
'query' = 'select * from <Tablename>',
'topic.prefix' = 'newEncode_',
'characterEncoding' = 'AL32UTF8', -- Updated: Added charset parameter
'transforms' = 'createKey,extractInt',
'transforms.createKey.type' = 'org.apache.kafka.connect.transforms.ValueToKey',
'transforms.createKey.fields' = 'USER_ID',
'transforms.extractInt.type' = 'org.apache.kafka.connect.transforms.ExtractField$Key',
'key.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'transforms.extractInt.field' = 'USER_ID',
'key.converter.schemas.enable'='false',
'value.converter.schemas.enable'='false',
'errors.tolerance' = 'all',
'errors.log.enable' = 'true',
'errors.log.include.messages' = 'true'
);
这将给出以下格式的数据:
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2024/04/19 11:08:19.755 Z, key: "HG185035", value: {"USER_ID":"HG185035","USER_NAME":"HG185035","PSWD":null,"USER_ROLE":"SME","ACCESS_LEVEL":"R"}
基本上,我错过了 key.convertor 和 value.convertor 属性。我从汇合文档中配置了正确的属性: https://www.confluence.io/en-gb/blog/kafka-connect-deep-dive-converters-serialization-explained/