我是 kafka 和 kafka-connect 的新手。
我想通过 MongoDB Kafka Connect 将数据从 MongoDB 迁移到 Atlas MongoDB。 我使用的是 kafka 2.12-2.3.0 版本,并使用
mongo-kafka-connect-1.13.0-all.jar
作为 kafka-connect。
启动zookeeper和kafka后,我正在运行
connect-distributed
,例如:
cd /opt/kafka_2.12-2.3.0
sudo bash bin/connect-distributed.sh config/connect-distributed.properties &disown
启动或运行后
connect-distributed
我得到的日志显示 ProducerConfig
和 ConsumerConfig
生产者配置
[2024-09-25 07:13:37,742] INFO ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 2147483647
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
(org.apache.kafka.clients.producer.ProducerConfig:347)
消费者配置
[2024-09-25 07:13:37,750] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-cluster
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig:347)
我想要实现的是更新
ProducerConfig's max.request.size = 1048576
中的默认值并将其增加到类似 max.request.size =10485760
的值。ConsumerConfig's max.partition.fetch.bytes = 1048576
相同,并像 max.partition.fetch.bytes = 10485760
一样增加它。
我现在的问题是在哪里可以找到这些属性文件。
以下是
/opt/kafka_2.12-2.3.0/config
中的配置列表:
butitoy@my-server:/opt/kafka_2.12-2.3.0$ ls -l config/
total 68
-rw-r--r-- 1 root root 906 Jun 19 2019 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Jun 19 2019 connect-console-source.properties
-rw-r--r-- 1 root root 5482 Sep 25 02:36 connect-distributed.properties
-rw-r--r-- 1 root root 883 Jun 19 2019 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Jun 19 2019 connect-file-source.properties
-rw-r--r-- 1 root root 1552 Jun 19 2019 connect-log4j.properties
-rw-r--r-- 1 root root 2290 Sep 24 04:03 connect-standalone.properties
-rw-r--r-- 1 root root 1344 Sep 25 05:33 consumer.properties
-rw-r--r-- 1 root root 4727 Jun 19 2019 log4j.properties
-rw-r--r-- 1 root root 1933 Sep 25 05:33 producer.properties
-rw-r--r-- 1 root root 6925 Sep 24 15:24 server.properties
-rw-r--r-- 1 root root 1032 Jun 19 2019 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jun 19 2019 trogdor.conf
-rw-r--r-- 1 root root 1023 Jun 19 2019 zookeeper.properties
butitoy@my-server:/opt/kafka_2.12-2.3.0$
我尝试在
max.request.size
和server.properties
中添加producer.properties
,以及在max.partition.fetch.bytes
中添加consumer.properties
。即使在 connect-distributed.properties
中,更改也没有反映在 ProducerConfig
和 ConsumerConfig
中。
如何更改默认值以及需要更改哪些配置/属性文件?
Kafka Connect 不读取服务器、消费者或生产者属性文件
如果必须设置客户端配置,则可以在 connect-distributed.properties 文件中进行,
正如文档中所写。
对于 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者的配置,可以使用相同的参数 但需要分别加上
和producer.
前缀consumer.
或者在单独的连接器级别
从 2.3.0 开始,可以使用前缀
和producer.override.
为每个连接器单独配置客户端配置覆盖consumer.override.