我想使用春季云流kafka活页夹。我使用以下docker-compose文件启动了2个kafka代理和1个zookeeper。
version: '3'
services:
kafka-0:
image: confluentinc/cp-kafka:5.2.1
container_name: kafka-0
restart: always
ports:
- "9094:9092"
expose:
- "9094"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-0:9094
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
- KAFKA_ADVERTISED_HOST_NAME=kafka-0
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
kafka-1:
image: confluentinc/cp-kafka:5.2.1
container_name: kafka-1
restart: always
ports:
- "9095:9093"
expose:
- "9095"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-1:9095
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2182
- KAFKA_ADVERTISED_HOST_NAME=kafka-1
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
container_name: zookeeper
ports:
- "2182:2181"
expose:
- "2182"
environment:
- ZOOKEEPER_CLIENT_PORT=2182
在我的spring boot应用程序中,我的属性文件是
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9094,127.0.0.1:9095
我的春季云流源类如下
@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {
@Autowired
private Source source;
private String[] users = {"Glenn", "Sabby", "Mark", "Janne", "Ilaya"};
@Scheduled(fixedDelay = 1000)
public void sendEvents() {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
}
}
从AdminClientConfig值,看来我能够连接到2个kafka代理。
bootstrap.servers = [127.0.0.1:9094, 127.0.0.1:9095]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
但是,我收到此错误。我该如何解决?
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
2020-04-15 20:32:20.303 INFO 10384 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited.
2020-04-15 20:32:20.314 ERROR 10384 --- [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:290) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:137) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:78) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:193) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:151) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:268) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:243) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindOutputs(BindableProxyFactory.java:287) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:163) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) ~[spring-boot-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at io.spring.dataflow.sample.usagedetailsender.UsageDetailSenderApplication.main(UsageDetailSenderApplication.java:10) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131) ~[idea_rt.jar:na]
Caused by: java.util.concurrent.TimeoutException: null
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274) ~[kafka-clients-2.0.1.jar:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:323) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:299) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:281) ~[spring-cloud-stream-binder-kafka-core-2.1.2.RELEASE.jar:2.1.2.RELEASE]
... 32 common frames omitted
您需要localhost:...
上的监听器。
请参见here。