我正在使用kafka连接器docker image confluentinc / cp-kafka-connect:5.4.0-beta1来自confluent。
我想按照本文中描述的方式设置RabbitMQ源连接器:
https://rmoff.net/2020/01/08/streaming-messages-from-rabbitmq-into-kafka-with-kafka-connect/,
但我不想使用本地的kafka经纪人,而是使用我根据https://www.cloudkarafka.com/设置的经纪人>
在docker-compose.yml中,我通过以下设置成功建立了从连接器容器到cloudkarafka代理的连接:
kafka-connect-01: image: confluentinc/cp-kafka-connect:5.4.0-beta1 container_name: kafka-connect-01 hostname: kafka-connect-01 ports: - 8083:8083 environment: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CUB_KAFKA_TIMEOUT: 300 CONNECT_BOOTSTRAP_SERVERS: "my-cloudkafka-brokers:9094" CONNECT_SECURITY_PROTOCOL: "SASL_SSL" CONNECT_SASL_MECHANISM: "SCRAM-SHA-256" CONNECT_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="hidden";' CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01' CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: username-c-cluster CONNECT_CONFIG_STORAGE_TOPIC: username-c-configs CONNECT_OFFSET_STORAGE_TOPIC: username-c-offsets CONNECT_STATUS_STORAGE_TOPIC: username-c-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter #CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter #CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.storage.StringConverter' CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter' CONNECT_LOG4J_ROOT_LOGLEVEL: 'DEBUG' #CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR' CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR,org.eclipse.jetty.server=DEBUG' CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1' CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1' CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1' CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'
连接问题:
[source-rabbitmq-00|task-0] [Producer clientId=connector-producer-source-rabbitmq-00-0] Connection with cloudkafka-broker disconnected (org.apache.kafka.common.network.Selector:607) java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:330) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.lang.Thread.run(Thread.java:748)
在尝试将消息发布到CloudKafka代理时(来自代理的DNS被故意隐藏),从RabbitMQ源连接器出现。当我使用以下RabbitMQ连接器配置时,我遇到了同样的问题:
配置1:
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/source-rabbitmq-01/config \ -d '{ "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "kafka.topic" : "username-myrabbitsinktopic", "rabbitmq.queue" : "test-queue-01", "rabbitmq.username": "user", "rabbitmq.password": "pw", "rabbitmq.host": "rabbitmq", "rabbitmq.port": "5672", "rabbitmq.virtual.host": "/", "confluent.license":"", "confluent.topic.bootstrap.servers":"cloudkafka-brokers:9094", "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"userpassword\";", "confluent.topic.security.protocol":"SASL_SSL", "confluent.topic.sasl.mechanism":"SCRAM-SHA-256", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "offset.flush.timeout.ms":1200000000 }'
配置2
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/source-rabbitmq-00/config \ -d '{ "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "kafka.topic" : "username-myrabbitmqsourcetopic", "rabbitmq.queue" : "test-queue-01", "rabbitmq.username": "user", "rabbitmq.password": "pw", "rabbitmq.host": "rabbitmq", "rabbitmq.port": "5672", "rabbitmq.virtual.host": "/", "bootstrap.servers":"cloudkafkabroker:9094", "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";", "security.protocol":"SASL_SSL", "sasl.mechanism":"SCRAM-SHA-256", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "offset.flush.timeout.ms":1200000000 }'
请提供有关如何配置此Rabbitmq连接器以使用cloudkafka提供的代理的建议。谢谢。
我正在使用kafka连接器docker image confluentinc / cp-kafka-connect:5.4.0-beta1来自confluent。我想以本文描述的方式设置RabbitMQ源连接器:https:// ...
我已通过使用以下配置成功将RabbitMQ连接到CloudKafka: