我在从本地运行的 Spring 应用程序连接到 Kafka 集群时遇到问题。应用程序成功将 localhost 解析为 127.0.0.1,但随后立即断开连接,并且不再发生进一步的通信。以下是相关日志和我的设置详细信息。
日志
INFO [AdminClient clientId=authentication-service-admin-0] Completed connection to node -1. Fetching API versions.
DEBUG [AdminClient clientId=authentication-service-admin-0] Initiating API versions fetch from node -1.
DEBUG [AdminClient clientId=authentication-service-admin-0] Sending API_VERSIONS request with header ...
DEBUG [AdminClient clientId=authentication-service-admin-0] Connection with localhost/127.0.0.1 (channelId=-1) disconnected
docker-compose
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
networks:
- microservices-net
kafka-1:
image: confluentinc/cp-kafka:latest
container_name: kafka-1
ports:
- '19092:19092'
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_LOG4J_LOGGERS: "kafka.server=DEBUG,kafka.network=TRACE"
depends_on:
- zookeeper
networks:
- microservices-net
kafka-2:
image: confluentinc/cp-kafka:latest
container_name: kafka-2
ports:
- '29092:29092'
- '9093:9093'
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-2:29092,LISTENER_EXTERNAL://localhost:9093
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-2:29092,LISTENER_EXTERNAL://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
depends_on:
- zookeeper
networks:
- microservices-net
Spring Kafka 配置(application.yml):
spring:
kafka:
producer:
bootstrap-servers: localhost:9092, localhost:9093
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
acks: all
Kafka-2 容器日志
INFO [ControllerEventThread controllerId=2] Starting (kafka.controller.ControllerEventManager$ControllerEventThread)
INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.group.GroupCoordinator)
INFO [TransactionCoordinator id=2] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
INFO [TxnMarkerSenderThread-2]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
DEBUG [Controller id=2] Broker 1 was elected as controller instead of broker 2 (kafka.controller.KafkaController)
ERROR org.apache.kafka.common.errors.ControllerMovedException: Controller moved to another broker. Aborting controller startup procedure
INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Enabling request processing. (kafka.network.SocketServer)
INFO Awaiting socket connections on kafka-2:29092. (kafka.network.DataPlaneAcceptor)
INFO Awaiting socket connections on localhost:9093. (kafka.network.DataPlaneAcceptor)
INFO Kafka version: 7.7.0-ccs (org.apache.kafka.common.utils.AppInfoParser)
INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
INFO [zk-broker-2-to-controller-alter-partition-channel-manager]: Recorded new ZK controller, from now on will use node kafka-1:19092 (id: 1 rack: null) (kafka.server.NodeToControllerRequestThread)
我使用 Test-NetConnection 检查了连接,并确认 Kafka 端口(9092 和 9093)均可访问。我期望 Spring 应用程序与 Kafka 建立稳定的连接,但连接短暂建立,然后立即断开,如日志所示。
我通过将 KAFKA_LISTENERS 配置中的主机从 localhost 更改为 0.0.0.0 解决了该问题。之后一切开始正常工作。
kafka-1:
environment:
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-1:19092,LISTENER_EXTERNAL://localhost:9092
kafka-2:
environment:
KAFKA_LISTENERS: LISTENER_INTERNAL://kafka-2:29092,LISTENER_EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: LISTENER_INTERNAL://kafka-2:29092,LISTENER_EXTERNAL://localhost:9093
此配置允许客户端正确连接。