数据管道'MSSQL -> Kafka -> CH'不工作。

问题描述 投票:1回答:1

我在Kafka中创建了一个JDBC连接到SQL Server,数据成功加载到主题中。

/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-testsql-topic --from-beginning

我如何在Clickhouse中读取这个主题?

我用Kafka引擎创建了一个新的表(在这个主题上)并实现了视图,但不成功。

CH脚本。

CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100)) 
ENGINE = Kafka 
SETTINGS 
    kafka_broker_list = localhost:9092, 
    kafka_topic_list = my-testsql-topic, 
    kafka_group_name = test-consumer-group, 
    kafka_format = JSONEachRow, 
    kafka_skip_broken_messages = 99999 

CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100)) 
ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192 

CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS 
SELECT Id, Name FROM default.test_topic

描述组。

GROUP                TOPIC              PARTITION    CURRENT-OFFSET   LOG-END-OFFSET  LAG 
test-consumer-group  my-testsql-topic   0            -                0               - 

clickhouse-server.log。

2020.05.21 12:07:35.704680 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already subscribed to topics: [ my-testsql-topic ] 
2020.05.21 12:07:35.704697 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already assigned to : [ my-testsql-topic[0:#] ]    
2020.05.21 12:22:36.898540 [ 11946 ] {} <Trace> StorageKafka (test_topic): Stalled 
2020.05.21 12:22:36.898729 [ 11946 ] {} <Trace> StorageKafka (test_topic): Polled offset INVALID (topic: my-testsql-topic, partition: 0) 
2020.05.21 12:22:36.898741 [ 11946 ] {} <Trace> StorageKafka (test_topic): Nothing to commit. 
2020.05.21 12:22:36.899433 [ 11946 ] {} <Trace> StorageKafka (test_topic): Committed offset INVALID (topic: my-testsql-topic, partition: 0) 
2020.05.21 12:22:36.899504 [ 11946 ] {} <Trace> StorageKafka (test_topic): Execution took 501 ms.
sql-server jdbc apache-kafka clickhouse
1个回答
1
投票

在jdbc配置中需要添加。

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
© www.soinside.com 2019 - 2024. All rights reserved.