我正在尝试使用docker和spring boot来实现微服务架构。我有一个正在运行的 kafka 容器,以及一个正在运行的消费者容器。还有一个生产者面临着同样的问题,但我确信如果我修复了消费者,我可以为生产者做同样的事情。消费者是一个独立的 Spring 模块,有自己的配置和 dockerfile。消费者、kafka 和生产者容器是使用 docker compose 创建和运行的。在我的 docker-compose 文件中,我尝试将 bootstrap-server 配置连接到 kafka:9092 而不是 localhost:29092,这是消费者 application.yml 中的默认值 我遇到的问题是消费者容器无法连接到 kafka 容器。在消费者容器中,我收到这些日志:
2024-11-08 12:47:21 2024-11-08T20:47:21.708Z INFO 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Node -1 disconnected.
2024-11-08 12:47:21 2024-11-08T20:47:21.708Z WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-order-consumer-group-1, groupId=order-consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Node may not be available.
一切都在我的本地计算机上运行。
我相信问题涉及消费者尝试使用“(localhost/127.0.0.1:29092)”连接到kafka服务器。
我尝试手动更改 application.yml spring: kafka: Consumer: bootstrap-servers: localhost:29092 到 kafka:9092,但在运行消费者时我得到了 bootstrap-servers:
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
OrderConsumer.Java 文件:
package com.Memorium.order_consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.Memorium.order_consumer.payload.Order;
@Slf4j
@Component
public class OrderConsumer {
@KafkaListener(topics = "order", groupId = "order-consumer-group")
public void orderConsumer(Order order) {
log.info("Consumer consume Kafka message -> {}", order);
}
}
OrderConsumer 的 application.yml:
server:
port: 9002
spring:
kafka:
consumer:
auto-offset-reset: earliest
bootstrap-servers: localhost:29092 # we need this to be kafka:2902, but it will not work
group-id: order-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: com.Memorium.order_consumer.payload
spring.json.value.default.type: com.Memorium.order_consumer.payload.Order
logging:
level:
root: INFO
org.springframework.kafka: DEBUG
org.apache.kafka: DEBUG
org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
org.apache.kafka.clients.NetworkClient: TRACE
OrderConsumer 配置打印到日志中:
2024-11-08 12:44:33 2024-11-08T20:44:33.029Z INFO 1 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
2024-11-08 12:44:33 allow.auto.create.topics = true
2024-11-08 12:44:33 auto.commit.interval.ms = 5000
2024-11-08 12:44:33 auto.include.jmx.reporter = true
2024-11-08 12:44:33 auto.offset.reset = earliest
2024-11-08 12:44:33 bootstrap.servers = [localhost:29092]
2024-11-08 12:44:33 check.crcs = true
2024-11-08 12:44:33 client.dns.lookup = use_all_dns_ips
2024-11-08 12:44:33 client.id = consumer-order-consumer-group-1
2024-11-08 12:44:33 client.rack =
2024-11-08 12:44:33 connections.max.idle.ms = 540000
2024-11-08 12:44:33 default.api.timeout.ms = 60000
2024-11-08 12:44:33 enable.auto.commit = false
2024-11-08 12:44:33 enable.metrics.push = true
2024-11-08 12:44:33 exclude.internal.topics = true
2024-11-08 12:44:33 fetch.max.bytes = 52428800
2024-11-08 12:44:33 fetch.max.wait.ms = 500
2024-11-08 12:44:33 fetch.min.bytes = 1
2024-11-08 12:44:33 group.id = order-consumer-group
2024-11-08 12:44:33 group.instance.id = null
2024-11-08 12:44:33 group.protocol = classic
2024-11-08 12:44:33 group.remote.assignor = null
2024-11-08 12:44:33 heartbeat.interval.ms = 3000
2024-11-08 12:44:33 interceptor.classes = []
2024-11-08 12:44:33 internal.leave.group.on.close = true
2024-11-08 12:44:33 internal.throw.on.fetch.stable.offset.unsupported = false
2024-11-08 12:44:33 isolation.level = read_uncommitted
2024-11-08 12:44:33 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2024-11-08 12:44:33 max.partition.fetch.bytes = 1048576
2024-11-08 12:44:33 max.poll.interval.ms = 300000
2024-11-08 12:44:33 max.poll.records = 500
2024-11-08 12:44:33 metadata.max.age.ms = 300000
2024-11-08 12:44:33 metric.reporters = []
2024-11-08 12:44:33 metrics.num.samples = 2
2024-11-08 12:44:33 metrics.recording.level = INFO
2024-11-08 12:44:33 metrics.sample.window.ms = 30000
2024-11-08 12:44:33 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
2024-11-08 12:44:33 receive.buffer.bytes = 65536
2024-11-08 12:44:33 reconnect.backoff.max.ms = 1000
2024-11-08 12:44:33 reconnect.backoff.ms = 50
2024-11-08 12:44:33 request.timeout.ms = 30000
2024-11-08 12:44:33 retry.backoff.max.ms = 1000
2024-11-08 12:44:33 retry.backoff.ms = 100
2024-11-08 12:44:33 sasl.client.callback.handler.class = null
2024-11-08 12:44:33 sasl.jaas.config = null
2024-11-08 12:44:33 sasl.kerberos.kinit.cmd = /usr/bin/kinit
2024-11-08 12:44:33 sasl.kerberos.min.time.before.relogin = 60000
2024-11-08 12:44:33 sasl.kerberos.service.name = null
2024-11-08 12:44:33 sasl.kerberos.ticket.renew.jitter = 0.05
2024-11-08 12:44:33 sasl.kerberos.ticket.renew.window.factor = 0.8
2024-11-08 12:44:33 sasl.login.callback.handler.class = null
2024-11-08 12:44:33 sasl.login.class = null
2024-11-08 12:44:33 sasl.login.connect.timeout.ms = null
2024-11-08 12:44:33 sasl.login.read.timeout.ms = null
2024-11-08 12:44:33 sasl.login.refresh.buffer.seconds = 300
2024-11-08 12:44:33 sasl.login.refresh.min.period.seconds = 60
2024-11-08 12:44:33 sasl.login.refresh.window.factor = 0.8
2024-11-08 12:44:33 sasl.login.refresh.window.jitter = 0.05
2024-11-08 12:44:33 sasl.login.retry.backoff.max.ms = 10000
2024-11-08 12:44:33 sasl.login.retry.backoff.ms = 100
2024-11-08 12:44:33 sasl.mechanism = GSSAPI
2024-11-08 12:44:33 sasl.oauthbearer.clock.skew.seconds = 30
2024-11-08 12:44:33 sasl.oauthbearer.expected.audience = null
2024-11-08 12:44:33 sasl.oauthbearer.expected.issuer = null
2024-11-08 12:44:33 sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
2024-11-08 12:44:33 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
2024-11-08 12:44:33 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
2024-11-08 12:44:33 sasl.oauthbearer.jwks.endpoint.url = null
2024-11-08 12:44:33 sasl.oauthbearer.scope.claim.name = scope
2024-11-08 12:44:33 sasl.oauthbearer.sub.claim.name = sub
2024-11-08 12:44:33 sasl.oauthbearer.token.endpoint.url = null
2024-11-08 12:44:33 security.protocol = PLAINTEXT
2024-11-08 12:44:33 security.providers = null
2024-11-08 12:44:33 send.buffer.bytes = 131072
2024-11-08 12:44:33 session.timeout.ms = 45000
2024-11-08 12:44:33 socket.connection.setup.timeout.max.ms = 30000
2024-11-08 12:44:33 socket.connection.setup.timeout.ms = 10000
2024-11-08 12:44:33 ssl.cipher.suites = null
2024-11-08 12:44:33 ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
2024-11-08 12:44:33 ssl.endpoint.identification.algorithm = https
2024-11-08 12:44:33 ssl.engine.factory.class = null
2024-11-08 12:44:33 ssl.key.password = null
2024-11-08 12:44:33 ssl.keymanager.algorithm = SunX509
2024-11-08 12:44:33 ssl.keystore.certificate.chain = null
2024-11-08 12:44:33 ssl.keystore.key = null
2024-11-08 12:44:33 ssl.keystore.location = null
2024-11-08 12:44:33 ssl.keystore.password = null
2024-11-08 12:44:33 ssl.keystore.type = JKS
2024-11-08 12:44:33 ssl.protocol = TLSv1.3
2024-11-08 12:44:33 ssl.provider = null
2024-11-08 12:44:33 ssl.secure.random.implementation = null
2024-11-08 12:44:33 ssl.trustmanager.algorithm = PKIX
2024-11-08 12:44:33 ssl.truststore.certificates = null
2024-11-08 12:44:33 ssl.truststore.location = null
2024-11-08 12:44:33 ssl.truststore.password = null
2024-11-08 12:44:33 ssl.truststore.type = JKS
2024-11-08 12:44:33 value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
OrderProducer.java 文件:
package com.Memorium.order_producer;
import com.Memorium.order_producer.payload.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class OrderProducer {
public static final String TOPIC = "order";
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
public void sendOrder(Order order){
Message<Order> message = MessageBuilder
.withPayload(order)
.setHeader(KafkaHeaders.TOPIC, "order")
.build();
kafkaTemplate.send(message);
log.info("Producer produced Kafka message -> {}", message);
}
}
ProducerController.java 文件:
package com.Memorium.order_producer;
import com.Memorium.order_producer.payload.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RequestMapping("/api/order")
@RestController
public class ProducerController {
@Autowired
private OrderProducer orderProducer;
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody Order order) {
orderProducer.sendOrder(order);
return ResponseEntity.ok(String.format("Order published, %s", order));
}
}
订单生产者的application.yml:
server:
port: 9003
spring:
kafka:
producer:
acks: -1
bootstrap-servers: localhost:29092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false # GOD DAM THIS TOOK ME HOURS TO FIX, this is why consumer keeps trying to access the producer's order class
logging:
level:
root: INFO
org.springframework.kafka: DEBUG
org.apache.kafka: DEBUG
org.apache.kafka.clients.consumer.ConsumerConfig: DEBUG
org.apache.kafka.clients.NetworkClient: TRACE
docker-compose:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
networks:
- kafka_network
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka_network
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8090:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- kafka_network
producer:
build:
context: ../order-producer
ports:
- "9003:9003"
depends_on:
- kafka
environment:
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
networks:
- kafka_network
consumer:
build:
context: ../order-consumer
ports:
- "9002:9002"
depends_on:
- kafka
environment:
- SPRING_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
networks:
- kafka_network
networks:
kafka_network:
driver: bridge
如果我尝试发布给生产者,我会收到类似的消息:
2024-11-08T21:10:21.179Z WARN 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2024-11-08T21:10:22.085Z INFO 1 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Node -1 disconnected.
我尝试手动将 application.yml bootstrap-servers 更改为 kafka:9092,但如果这样做,我无法构建 .jar 文件。我还尝试查看是否可以从消费者访问本地计算机的本地主机,并尝试以这种方式访问 kafka,但没有成功。
如果我当时不进行容器化,而只是在 intelliJ 中运行它们,我就能够运行消费者并生产得很好。 Kafka 仍将位于容器中。
任何详细的解释将不胜感激,我仍在学习 Docker。
经过许多辛苦的时间,我通过更改 application.yml 修复了它:
server:
port: 9002
spring:
kafka:
consumer:
auto-offset-reset: earliest
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:29092}
group-id: order-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: com.Memorium.order_consumer.payload
spring.json.value.default.type: com.Memorium.order_consumer.payload.Order
只需将引导更改 bootstrap-servers 更改为
bootstrap-servers: ${SPRING_KAFKA_BOOTSTRAP_SERVERS:localhost:29092}