如何为融合的rabbitmq源连接器设置cloudkarafka代理?

问题描述 投票:0回答:1

我正在使用kafka连接器docker image confluentinc / cp-kafka-connect:5.4.0-beta1来自confluent。

我想按照本文中描述的方式设置RabbitMQ源连接器:

https://rmoff.net/2020/01/08/streaming-messages-from-rabbitmq-into-kafka-with-kafka-connect/

但我不想使用本地的kafka经纪人,而是使用我根据https://www.cloudkarafka.com/设置的经纪人>

在docker-compose.yml中,我通过以下设置成功建立了从连接器容器到cloudkarafka代理的连接:

kafka-connect-01:
    image: confluentinc/cp-kafka-connect:5.4.0-beta1
    container_name: kafka-connect-01
    hostname: kafka-connect-01
    ports:
      - 8083:8083
    environment:
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CUB_KAFKA_TIMEOUT: 300
      CONNECT_BOOTSTRAP_SERVERS: "my-cloudkafka-brokers:9094"
      CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
      CONNECT_SASL_MECHANISM: "SCRAM-SHA-256"
      CONNECT_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="hidden";'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-01'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: username-c-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: username-c-configs
      CONNECT_OFFSET_STORAGE_TOPIC: username-c-offsets
      CONNECT_STATUS_STORAGE_TOPIC: username-c-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      #CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter
      #CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.storage.StringConverter'
      CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
      CONNECT_LOG4J_ROOT_LOGLEVEL: 'DEBUG'
      #CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
      CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=DEBUG,org.reflections=ERROR,org.eclipse.jetty.server=DEBUG'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'

连接问题:

[source-rabbitmq-00|task-0] [Producer clientId=connector-producer-source-rabbitmq-00-0] Connection with cloudkafka-broker disconnected (org.apache.kafka.common.network.Selector:607)
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:330)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at java.lang.Thread.run(Thread.java:748)

在尝试将消息发布到CloudKafka代理时(来自代理的DNS被故意隐藏),从RabbitMQ源连接器出现。当我使用以下RabbitMQ连接器配置时,我遇到了同样的问题:

配置1:

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-rabbitmq-01/config \
    -d '{
        "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
        "kafka.topic" : "username-myrabbitsinktopic",
        "rabbitmq.queue" : "test-queue-01",
        "rabbitmq.username": "user",
        "rabbitmq.password": "pw",
        "rabbitmq.host": "rabbitmq",
        "rabbitmq.port": "5672",
        "rabbitmq.virtual.host": "/",
        "confluent.license":"",
        "confluent.topic.bootstrap.servers":"cloudkafka-brokers:9094",
    "confluent.topic.sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"userpassword\";",
        "confluent.topic.security.protocol":"SASL_SSL",
        "confluent.topic.sasl.mechanism":"SCRAM-SHA-256",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "offset.flush.timeout.ms":1200000000
 }'

配置2

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/source-rabbitmq-00/config \
    -d '{
        "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
        "kafka.topic" : "username-myrabbitmqsourcetopic",
        "rabbitmq.queue" : "test-queue-01",
        "rabbitmq.username": "user",
        "rabbitmq.password": "pw",
        "rabbitmq.host": "rabbitmq",
        "rabbitmq.port": "5672",
        "rabbitmq.virtual.host": "/",
        "bootstrap.servers":"cloudkafkabroker:9094",
    "sasl.jaas.config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";",
        "security.protocol":"SASL_SSL",
        "sasl.mechanism":"SCRAM-SHA-256",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "offset.flush.timeout.ms":1200000000
 }'

请提供有关如何配置此Rabbitmq连接器以使用cloudkafka提供的代理的建议。谢谢。

我正在使用kafka连接器docker image confluentinc / cp-kafka-connect:5.4.0-beta1来自confluent。我想以本文描述的方式设置RabbitMQ源连接器:https:// ...

apache-kafka rabbitmq apache-kafka-connect
1个回答
0
投票

我已通过使用以下配置成功将RabbitMQ连接到CloudKafka:

© www.soinside.com 2019 - 2024. All rights reserved.