运行 connect-distributed.sh 时 ProducerConfig 和 ConsumerConfig 的属性文件位置

问题描述 投票:0回答:1

我是 kafka 和 kafka-connect 的新手。

我想通过 MongoDB Kafka Connect 将数据从 MongoDB 迁移到 Atlas MongoDB。 我使用的是 kafka 2.12-2.3.0 版本,并使用

mongo-kafka-connect-1.13.0-all.jar
作为 kafka-connect。

启动zookeeper和kafka后,我正在运行

connect-distributed
,例如:

cd /opt/kafka_2.12-2.3.0
sudo bash bin/connect-distributed.sh config/connect-distributed.properties &disown

启动或运行后

connect-distributed
我得到的日志显示
ProducerConfig
ConsumerConfig

生产者配置

[2024-09-25 07:13:37,742] INFO ProducerConfig values: 
  acks = all
  batch.size = 16384
  bootstrap.servers = [localhost:9092]
  buffer.memory = 33554432
  client.dns.lookup = default
  client.id = 
  compression.type = none
  connections.max.idle.ms = 540000
  delivery.timeout.ms = 2147483647
  enable.idempotence = false
  interceptor.classes = []
  key.serializer = class org.apache.kafka.common.serialization.StringSerializer
  linger.ms = 0
  max.block.ms = 60000
  max.in.flight.requests.per.connection = 1
  max.request.size = 1048576
  metadata.max.age.ms = 300000
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  receive.buffer.bytes = 32768
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  request.timeout.ms = 30000
  retries = 2147483647
  retry.backoff.ms = 100
  sasl.client.callback.handler.class = null
  sasl.jaas.config = null
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 60000
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.login.callback.handler.class = null
  sasl.login.class = null
  sasl.login.refresh.buffer.seconds = 300
  sasl.login.refresh.min.period.seconds = 60
  sasl.login.refresh.window.factor = 0.8
  sasl.login.refresh.window.jitter = 0.05
  sasl.mechanism = GSSAPI
  security.protocol = PLAINTEXT
  send.buffer.bytes = 131072
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  ssl.endpoint.identification.algorithm = https
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLS
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  transaction.timeout.ms = 60000
  transactional.id = null
  value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:347)

消费者配置

[2024-09-25 07:13:37,750] INFO ConsumerConfig values: 
  allow.auto.create.topics = true
  auto.commit.interval.ms = 5000
  auto.offset.reset = earliest
  bootstrap.servers = [localhost:9092]
  check.crcs = true
  client.dns.lookup = default
  client.id = 
  client.rack = 
  connections.max.idle.ms = 540000
  default.api.timeout.ms = 60000
  enable.auto.commit = false
  exclude.internal.topics = true
  fetch.max.bytes = 52428800
  fetch.max.wait.ms = 500
  fetch.min.bytes = 1
  group.id = connect-cluster
  group.instance.id = null
  heartbeat.interval.ms = 3000
  interceptor.classes = []
  internal.leave.group.on.close = true
  isolation.level = read_uncommitted
  key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  max.partition.fetch.bytes = 1048576
  max.poll.interval.ms = 300000
  max.poll.records = 500
  metadata.max.age.ms = 300000
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  receive.buffer.bytes = 65536
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  request.timeout.ms = 30000
  retry.backoff.ms = 100
  sasl.client.callback.handler.class = null
  sasl.jaas.config = null
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 60000
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.login.callback.handler.class = null
  sasl.login.class = null
  sasl.login.refresh.buffer.seconds = 300
  sasl.login.refresh.min.period.seconds = 60
  sasl.login.refresh.window.factor = 0.8
  sasl.login.refresh.window.jitter = 0.05
  sasl.mechanism = GSSAPI
  security.protocol = PLAINTEXT
  send.buffer.bytes = 131072
  session.timeout.ms = 10000
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  ssl.endpoint.identification.algorithm = https
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLS
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:347)

我想要实现的是更新

ProducerConfig's max.request.size = 1048576
中的默认值并将其增加到类似
max.request.size =10485760
的值。
ConsumerConfig's max.partition.fetch.bytes = 1048576
相同,并像
max.partition.fetch.bytes = 10485760
一样增加它。

我现在的问题是在哪里可以找到这些属性文件。

以下是

/opt/kafka_2.12-2.3.0/config
中的配置列表:

butitoy@my-server:/opt/kafka_2.12-2.3.0$ ls -l config/
total 68
-rw-r--r-- 1 root root  906 Jun 19  2019 connect-console-sink.properties
-rw-r--r-- 1 root root  909 Jun 19  2019 connect-console-source.properties
-rw-r--r-- 1 root root 5482 Sep 25 02:36 connect-distributed.properties
-rw-r--r-- 1 root root  883 Jun 19  2019 connect-file-sink.properties
-rw-r--r-- 1 root root  881 Jun 19  2019 connect-file-source.properties
-rw-r--r-- 1 root root 1552 Jun 19  2019 connect-log4j.properties
-rw-r--r-- 1 root root 2290 Sep 24 04:03 connect-standalone.properties
-rw-r--r-- 1 root root 1344 Sep 25 05:33 consumer.properties
-rw-r--r-- 1 root root 4727 Jun 19  2019 log4j.properties
-rw-r--r-- 1 root root 1933 Sep 25 05:33 producer.properties
-rw-r--r-- 1 root root 6925 Sep 24 15:24 server.properties
-rw-r--r-- 1 root root 1032 Jun 19  2019 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jun 19  2019 trogdor.conf
-rw-r--r-- 1 root root 1023 Jun 19  2019 zookeeper.properties
butitoy@my-server:/opt/kafka_2.12-2.3.0$ 

我尝试在

max.request.size
server.properties
中添加
producer.properties
,以及在
max.partition.fetch.bytes
中添加
consumer.properties
。即使在
connect-distributed.properties
中,更改也没有反映在
ProducerConfig
ConsumerConfig
中。

如何更改默认值以及需要更改哪些配置/属性文件?

apache-kafka apache-kafka-connect
1个回答
0
投票

Kafka Connect 不读取服务器、消费者或生产者属性文件

如果必须设置客户端配置,则可以在 connect-distributed.properties 文件中进行,
正如文档中所写。

对于 Kafka Source 任务使用的生产者和 Kafka Sink 任务使用的消费者的配置,可以使用相同的参数 但需要分别加上

producer.
consumer.
前缀

或者在单独的连接器级别

从 2.3.0 开始,可以使用前缀

producer.override.
consumer.override.

为每个连接器单独配置客户端配置覆盖

https://kafka.apache.org/documentation/#connect_running

© www.soinside.com 2019 - 2024. All rights reserved.