容器化 Kafka Consumer 无法连接到 Kafka 容器

问题描述 投票:0回答:1

我正在尝试使用docker和spring boot来实现微服务架构。我有一个正在运行的 kafka 容器,以及一个正在运行的消费者容器。还有一个生产者面临着同样的问题,但我确信如果我修复了消费者,我可以为生产者做同样的事情。消费者是一个独立的 Spring 模块,有自己的配置和 dockerfile。消费者、kafka 和生产者容器是使用 docker compose 创建和运行的。在我的 docker-compose 文件中,我尝试将 bootstrap-server 配置连接到 kafka:9092 而不是 localhost:29092,这是消费者 application.yml 中的默认值 我遇到的问题是消费者容器无法连接到 kafka 容器。在消费者容器中,我收到这些日志:

2024-11-08 12:47:21 2024-11-08T20:47:21.708Z  INFO 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Node -1 disconnected.
2024-11-08 12:47:21 2024-11-08T20:47:21.708Z  WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Node may not be available.

一切都在我的本地计算机上运行。

我相信问题涉及消费者尝试使用“(localhost/127.0.0.1:29092)”连接到kafka服务器。

我尝试手动更改 application.yml spring: kafka: Consumer: bootstrap-servers: localhost:29092 到 kafka:9092,但在运行消费者时我得到了 bootstrap-servers:

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

OrderConsumer.Java 文件:

package com.Memorium.order_consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.Memorium.order_consumer.payload.Order;

@Slf4j
@Component
public class OrderConsumer {
    @KafkaListener(topics = "order", groupId = "order-consumer-group")
    public void orderConsumer(Order order) {
        log.info("Consumer consume Kafka message -> {}", order);
    }
}

OrderConsumer 的 application.yml:

server:
  port: 9002

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      bootstrap-servers: localhost:29092 # we need this to be kafka:2902, but it will not work
      group-id: order-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: com.Memorium.order_consumer.payload
        spring.json.value.default.type: com.Memorium.order_consumer.payload.Order

logging:
  level:
    root: INFO
    org.springframework.kafka: DEBUG
    org.apache.kafka: DEBUG
    org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
    org.apache.kafka.clients.NetworkClient: TRACE

OrderConsumer 配置打印到日志中:

2024-11-08 12:44:33 2024-11-08T20:44:33.029Z  INFO 1 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
2024-11-08 12:44:33     allow.auto.create.topics = true
2024-11-08 12:44:33     auto.commit.interval.ms = 5000
2024-11-08 12:44:33     auto.include.jmx.reporter = true
2024-11-08 12:44:33     auto.offset.reset = earliest
2024-11-08 12:44:33     bootstrap.servers = [localhost:29092]
2024-11-08 12:44:33     check.crcs = true
2024-11-08 12:44:33     client.dns.lookup = use_all_dns_ips
2024-11-08 12:44:33     client.id = consumer-order-consumer-group-1
2024-11-08 12:44:33     client.rack = 
2024-11-08 12:44:33     connections.max.idle.ms = 540000
2024-11-08 12:44:33     default.api.timeout.ms = 60000
2024-11-08 12:44:33     enable.auto.commit = false
2024-11-08 12:44:33     enable.metrics.push = true
2024-11-08 12:44:33     exclude.internal.topics = true
2024-11-08 12:44:33     fetch.max.bytes = 52428800
2024-11-08 12:44:33     fetch.max.wait.ms = 500
2024-11-08 12:44:33     fetch.min.bytes = 1
2024-11-08 12:44:33     group.id = order-consumer-group
2024-11-08 12:44:33     group.instance.id = null
2024-11-08 12:44:33     group.protocol = classic
2024-11-08 12:44:33     group.remote.assignor = null
2024-11-08 12:44:33     heartbeat.interval.ms = 3000
2024-11-08 12:44:33     interceptor.classes = []
2024-11-08 12:44:33     internal.leave.group.on.close = true
2024-11-08 12:44:33     internal.throw.on.fetch.stable.offset.unsupported = false
2024-11-08 12:44:33     isolation.level = read_uncommitted
2024-11-08 12:44:33     key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2024-11-08 12:44:33     max.partition.fetch.bytes = 1048576
2024-11-08 12:44:33     max.poll.interval.ms = 300000
2024-11-08 12:44:33     max.poll.records = 500
2024-11-08 12:44:33     metadata.max.age.ms = 300000
2024-11-08 12:44:33     metric.reporters = []
2024-11-08 12:44:33     metrics.num.samples = 2
2024-11-08 12:44:33     metrics.recording.level = INFO
2024-11-08 12:44:33     metrics.sample.window.ms = 30000
2024-11-08 12:44:33     partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
2024-11-08 12:44:33     receive.buffer.bytes = 65536
2024-11-08 12:44:33     reconnect.backoff.max.ms = 1000
2024-11-08 12:44:33     reconnect.backoff.ms = 50
2024-11-08 12:44:33     request.timeout.ms = 30000
2024-11-08 12:44:33     retry.backoff.max.ms = 1000
2024-11-08 12:44:33     retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.client.callback.handler.class = null
2024-11-08 12:44:33     sasl.jaas.config = null
2024-11-08 12:44:33     sasl.kerberos.kinit.cmd = /usr/bin/kinit
2024-11-08 12:44:33     sasl.kerberos.min.time.before.relogin = 60000
2024-11-08 12:44:33     sasl.kerberos.service.name = null
2024-11-08 12:44:33     sasl.kerberos.ticket.renew.jitter = 0.05
2024-11-08 12:44:33     sasl.kerberos.ticket.renew.window.factor = 0.8
2024-11-08 12:44:33     sasl.login.callback.handler.class = null
2024-11-08 12:44:33     sasl.login.class = null
2024-11-08 12:44:33     sasl.login.connect.timeout.ms = null
2024-11-08 12:44:33     sasl.login.read.timeout.ms = null
2024-11-08 12:44:33     sasl.login.refresh.buffer.seconds = 300
2024-11-08 12:44:33     sasl.login.refresh.min.period.seconds = 60
2024-11-08 12:44:33     sasl.login.refresh.window.factor = 0.8
2024-11-08 12:44:33     sasl.login.refresh.window.jitter = 0.05
2024-11-08 12:44:33     sasl.login.retry.backoff.max.ms = 10000
2024-11-08 12:44:33     sasl.login.retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.mechanism = GSSAPI
2024-11-08 12:44:33     sasl.oauthbearer.clock.skew.seconds = 30
2024-11-08 12:44:33     sasl.oauthbearer.expected.audience = null
2024-11-08 12:44:33     sasl.oauthbearer.expected.issuer = null
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
2024-11-08 12:44:33     sasl.oauthbearer.jwks.endpoint.url = null
2024-11-08 12:44:33     sasl.oauthbearer.scope.claim.name = scope
2024-11-08 12:44:33     sasl.oauthbearer.sub.claim.name = sub
2024-11-08 12:44:33     sasl.oauthbearer.token.endpoint.url = null
2024-11-08 12:44:33     security.protocol = PLAINTEXT
2024-11-08 12:44:33     security.providers = null
2024-11-08 12:44:33     send.buffer.bytes = 131072
2024-11-08 12:44:33     session.timeout.ms = 45000
2024-11-08 12:44:33     socket.connection.setup.timeout.max.ms = 30000
2024-11-08 12:44:33     socket.connection.setup.timeout.ms = 10000
2024-11-08 12:44:33     ssl.cipher.suites = null
2024-11-08 12:44:33     ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
2024-11-08 12:44:33     ssl.endpoint.identification.algorithm = https
2024-11-08 12:44:33     ssl.engine.factory.class = null
2024-11-08 12:44:33     ssl.key.password = null
2024-11-08 12:44:33     ssl.keymanager.algorithm = SunX509
2024-11-08 12:44:33     ssl.keystore.certificate.chain = null
2024-11-08 12:44:33     ssl.keystore.key = null
2024-11-08 12:44:33     ssl.keystore.location = null
2024-11-08 12:44:33     ssl.keystore.password = null
2024-11-08 12:44:33     ssl.keystore.type = JKS
2024-11-08 12:44:33     ssl.protocol = TLSv1.3
2024-11-08 12:44:33     ssl.provider = null
2024-11-08 12:44:33     ssl.secure.random.implementation = null
2024-11-08 12:44:33     ssl.trustmanager.algorithm = PKIX
2024-11-08 12:44:33     ssl.truststore.certificates = null
2024-11-08 12:44:33     ssl.truststore.location = null
2024-11-08 12:44:33     ssl.truststore.password = null
2024-11-08 12:44:33     ssl.truststore.type = JKS
2024-11-08 12:44:33     value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer

OrderProducer.java 文件:

package com.Memorium.order_producer;

import com.Memorium.order_producer.payload.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class OrderProducer {
    public static final String TOPIC = "order";

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

    public void sendOrder(Order order){
        Message<Order> message = MessageBuilder
                .withPayload(order)
                .setHeader(KafkaHeaders.TOPIC, "order")
                .build();
        kafkaTemplate.send(message);
        log.info("Producer produced Kafka message -> {}", message);
    }
}

ProducerController.java 文件:

package com.Memorium.order_producer;

import com.Memorium.order_producer.payload.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RequestMapping("/api/order")
@RestController
public class ProducerController {
    @Autowired
    private OrderProducer orderProducer;

    @PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody Order order) {
        orderProducer.sendOrder(order);
        return ResponseEntity.ok(String.format("Order published, %s", order));
    }
}

订单生产者的application.yml:

server:
  port: 9003

spring:
  kafka:
    producer:
      acks: -1
      bootstrap-servers: localhost:29092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.add.type.headers: false # GOD DAM THIS TOOK ME HOURS TO FIX, this is why consumer keeps trying to access the producer's order class

logging:
  level:
    root: INFO
    org.springframework.kafka: DEBUG
    org.apache.kafka: DEBUG
    org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
    org.apache.kafka.clients.NetworkClient: TRACE

docker-compose:

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "22181:2181"
    networks:
      - kafka_network

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      - kafka_network

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8090:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
    networks:
      - kafka_network

  producer:
    build:
      context: ../order-producer
    ports:
      - "9003:9003"
    depends_on:
      - kafka
    environment:
      - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    networks:
      - kafka_network

  consumer:
    build:
      context: ../order-consumer
    ports:
      - "9002:9002"
    depends_on:
      - kafka
    environment:
      - SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
    networks:
      - kafka_network

networks:
  kafka_network:
    driver: bridge

如果我尝试发布给生产者,我会收到类似的消息:

2024-11-08T21:10:21.179Z  WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2024-11-08T21:10:22.085Z  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Node -1 disconnected.

我尝试手动将 application.yml bootstrap-servers 更改为 kafka:9092,但如果这样做,我无法构建 .jar 文件。我还尝试查看是否可以从消费者访问本地计算机的本地主机,并尝试以这种方式访问 kafka,但没有成功。

如果我当时不进行容器化,而只是在 intelliJ 中运行它们,我就能够运行消费者并生产得很好。 Kafka 仍将位于容器中。

任何详细的解释将不胜感激,我仍在学习 Docker。

java docker microservices spring-kafka
1个回答
0
投票

经过许多辛苦的时间,我通过更改 application.yml 修复了它:

server:
  port: 9002

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:29092} 
      group-id: order-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.trusted.packages: com.Memorium.order_consumer.payload
        spring.json.value.default.type: com.Memorium.order_consumer.payload.Order

只需将引导更改 bootstrap-servers 更改为

bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:29092}
© www.soinside.com 2019 - 2024. All rights reserved.