我在Kafka中创建了一个JDBC连接到SQL Server,数据成功加载到主题中。
/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic my-testsql-topic --from-beginning
我如何在Clickhouse中读取这个主题?
我用Kafka引擎创建了一个新的表(在这个主题上)并实现了视图,但不成功。
CH脚本。
CREATE TABLE default.test_topic (Id UInt32, Name FixedString(100))
ENGINE = Kafka
SETTINGS
kafka_broker_list = localhost:9092,
kafka_topic_list = my-testsql-topic,
kafka_group_name = test-consumer-group,
kafka_format = JSONEachRow,
kafka_skip_broken_messages = 99999
CREATE TABLE default.test_topic_hist (Id UInt32, Name FixedString(100))
ENGINE = MergeTree ORDER BY Id SETTINGS index_granularity = 8192
CREATE MATERIALIZED VIEW default.load_test_topic_hist TO default.test_topic_hist (Id UInt32, Name FixedString(100)) AS
SELECT Id, Name FROM default.test_topic
描述组。
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test-consumer-group my-testsql-topic 0 - 0 -
clickhouse-server.log。
2020.05.21 12:07:35.704680 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already subscribed to topics: [ my-testsql-topic ]
2020.05.21 12:07:35.704697 [ 11942 ] {} <Trace> StorageKafka (test_topic): Already assigned to : [ my-testsql-topic[0:#] ]
2020.05.21 12:22:36.898540 [ 11946 ] {} <Trace> StorageKafka (test_topic): Stalled
2020.05.21 12:22:36.898729 [ 11946 ] {} <Trace> StorageKafka (test_topic): Polled offset INVALID (topic: my-testsql-topic, partition: 0)
2020.05.21 12:22:36.898741 [ 11946 ] {} <Trace> StorageKafka (test_topic): Nothing to commit.
2020.05.21 12:22:36.899433 [ 11946 ] {} <Trace> StorageKafka (test_topic): Committed offset INVALID (topic: my-testsql-topic, partition: 0)
2020.05.21 12:22:36.899504 [ 11946 ] {} <Trace> StorageKafka (test_topic): Execution took 501 ms.
在jdbc配置中需要添加。
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false