我有一个带有 Kafka 和 Protobuf 的 Spring Cloud Stream 项目,常规版本工作正常。
使用 Graalvm 编译后,无法生成 Protobuf 消息,消费者工作正常。
我的项目 yaml 是:
spring.cloud.stream:
instanceCount: 1
default-binder: kafka
default:
producer.useNativeEncoding: true
consumer.useNativeEncoding: true
kafka:
binder:
auto-create-topics: false
brokers: ${spring.kafka.properties.bootstrap.servers}
consumer-properties:
schema.registry.url: ${spring.kafka.properties.schema.registry.url}
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
specific.protobuf.value.type: proto.dataFusion.VehicleInfo
auto.offset.reset: latest
auto.commit.enable: false
ack.mode: manual_immediate
isolation.level: read_committed
max.poll.records: 150
fetch.min.bytes: 1048576 # 1MB
fetch.max.wait.ms: 1000
producer-properties:
schema.registry.url: ${spring.kafka.properties.schema.registry.url}
value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
enable.idempotence: false
我遇到了一个类似的消费者问题,在设置 Specific.protobuf.value.type 属性后已修复。
如果生产者的 value.serializer 设置为 String 它在编译版本中工作正常,但在 Protobuf Serializer 中失败
但是对于制作人无法修复,我遇到了错误:
2024-01-25T08:50:05.062-03:00 ERROR 8025 --- [sp-sc-cfc] [ main] o.s.cloud.stream.binding.BindingService : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (cfc_plates_to_check_muniz):
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:684) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionInfoForProducer(KafkaTopicProvisioner.java:600) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:422) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:168) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:310) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:102) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[sp-sc-cfc:4.1.0]
at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[sp-sc-cfc:4.1.0]
at [email protected]/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[sp-sc-cfc:4.1.0]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:284) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:467) ~[sp-sc-cfc:6.1.1]
at [email protected]/java.lang.Iterable.forEach(Iterable.java:75) ~[sp-sc-cfc:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:256) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:201) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:965) ~[sp-sc-cfc:6.1.1]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619) ~[sp-sc-cfc:6.1.1]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:753) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:455) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:323) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1342) ~[sp-sc-cfc:3.2.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1331) ~[sp-sc-cfc:3.2.0]
at io.mobi7.sc.cfc.CFCServiceApplication.main(CFCServiceApplication.java:23) ~[sp-sc-cfc:na]
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic cfc_plates_to_check_muniz
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:657) ~[sp-sc-cfc:4.1.0]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[sp-sc-cfc:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) ~[sp-sc-cfc:na]
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:622) ~[sp-sc-cfc:4.1.0]
... 28 common frames omitted
2024-01-25T08:50:05.064-03:00 ERROR 8025 --- [sp-sc-cfc] [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'sp-sc-cfc.platesToCheck-out-0'., failedMessage=GenericMessage [payload=plate: "SIXXXXX"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:501)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:356)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:285)
at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:249)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at [email protected]/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at [email protected]/java.lang.Thread.run(Thread.java:840)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=plate: "SIE7D4K"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 63 more
我尝试为产品设置一个特定的.protobuf.value.type。
我希望制作人能够像解释器那样以编译版本工作
这可能是因为我们缺少
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.
的本机提示,例如,请参阅 Spring for Apache Kafka 中 JsonSerializer
的 hint。您可以尝试在应用程序中添加
KafkaProtobufSerializer
的提示。如果有效,请通过向 Oracle 的 graalvm-reachability-metadata 存储库发送 PR 来将提示添加到那里。具体来说,这就是需要添加提示的地方。这样,并非所有应用程序都不必在应用程序中添加此自定义提示。