Kafka JDBC Sink连接器,批量插入值

问题描述 投票:1回答:1

我每秒收到许多消息(通过http协议)(50000-100000),并希望将它们保存到PostgreSql。我决定为此目的使用Kafka JDBC Sink。

消息是通过一条记录而不是成批保存到数据库的。我想以500-1000条记录的大小批量在PostgreSQL中插入记录。

我发现了有关此问题的一些答案:How to use batch.size?

我尝试在配置中使用相关选项,但似乎它们没有任何作用。

我的Kafka JDBC Sink PostgreSql配置(etc/kafka-connect-jdbc/postgres.properties):

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3

# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs

connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false

insert.mode=insert
connection.user=postgres
table.name.format=${topic}

connection.password=pwd

batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000

我还在connect-distributed.properties中添加了选项:

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500

尽管每个分区每秒可获取1000条以上的记录,但记录会被一个保存到PostgreSQL。

编辑:使用者选项已添加到其他具有正确名称的文件中

我还在etc/schema-registry/connect-avro-standalone.properties中添加了选项:

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
jdbc apache-kafka apache-kafka-connect
1个回答
0
投票

我意识到我误解了文档。记录通过一条记录插入数据库中。在一个事务中插入的记录数取决于batch.sizeconsumer.max.poll.records。我希望以其他方式实现批量插入。我希望有机会插入这样的记录:

INSERT INTO table1 (First, Last)
VALUES
    ('Fred', 'Smith'),
    ('John', 'Smith'),
    ('Michael', 'Smith'),
    ('Robert', 'Smith');

但是似乎不可能。

© www.soinside.com 2019 - 2024. All rights reserved.