GraalVM 使用 Protobuf 编译的 Spring Cloud Stream Kafka 版本无法序列化器

问题描述 投票:0回答:1

我有一个带有 Kafka 和 Protobuf 的 Spring Cloud Stream 项目,常规版本工作正常。

使用 Graalvm 编译后,无法生成 Protobuf 消息,消费者工作正常。

我的项目 yaml 是:

spring.cloud.stream:
    instanceCount: 1
    default-binder: kafka
    default:
        producer.useNativeEncoding: true
        consumer.useNativeEncoding: true
    kafka:
        binder:
            auto-create-topics: false
            brokers: ${spring.kafka.properties.bootstrap.servers}
            consumer-properties:
                schema.registry.url: ${spring.kafka.properties.schema.registry.url}
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
                specific.protobuf.value.type: proto.dataFusion.VehicleInfo
                auto.offset.reset: latest
                auto.commit.enable: false
                ack.mode: manual_immediate
                isolation.level: read_committed
                max.poll.records: 150
                fetch.min.bytes: 1048576 # 1MB
                fetch.max.wait.ms: 1000
            producer-properties:
                schema.registry.url: ${spring.kafka.properties.schema.registry.url}
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                enable.idempotence: false

我遇到了一个类似的消费者问题,在设置 Specific.protobuf.value.type 属性后已修复。

如果生产者的 value.serializer 设置为 String 它在编译版本中工作正常,但在 Protobuf Serializer 中失败

但是对于制作人无法修复,我遇到了错误:

2024-01-25T08:50:05.062-03:00 ERROR 8025 --- [sp-sc-cfc] [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (cfc_plates_to_check_muniz):
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:684) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionInfoForProducer(KafkaTopicProvisioner.java:600) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:422) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:168) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:310) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:102) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[sp-sc-cfc:4.1.0]
    at [email protected]/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[sp-sc-cfc:4.1.0]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:284) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:467) ~[sp-sc-cfc:6.1.1]
    at [email protected]/java.lang.Iterable.forEach(Iterable.java:75) ~[sp-sc-cfc:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:256) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:201) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:965) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619) ~[sp-sc-cfc:6.1.1]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:753) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:455) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:323) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1342) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1331) ~[sp-sc-cfc:3.2.0]
    at io.mobi7.sc.cfc.CFCServiceApplication.main(CFCServiceApplication.java:23) ~[sp-sc-cfc:na]
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic cfc_plates_to_check_muniz
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:657) ~[sp-sc-cfc:4.1.0]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[sp-sc-cfc:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) ~[sp-sc-cfc:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:622) ~[sp-sc-cfc:4.1.0]
    ... 28 common frames omitted


2024-01-25T08:50:05.064-03:00 ERROR 8025 --- [sp-sc-cfc] [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'sp-sc-cfc.platesToCheck-out-0'., failedMessage=GenericMessage [payload=plate: "SIXXXXX"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:501)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:356)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:285)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:249)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at [email protected]/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at [email protected]/java.lang.Thread.run(Thread.java:840)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=plate: "SIE7D4K"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 63 more

我尝试为产品设置一个特定的.protobuf.value.type。

我希望制作人能够像解释器那样以编译版本工作

spring-cloud-stream spring-cloud-stream-binder-kafka spring-cloud-stream-binder s
1个回答
0
投票

这可能是因为我们缺少

io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.
的本机提示,例如,请参阅 Spring for Apache Kafka 中 JsonSerializer
hint
。您可以尝试在应用程序中添加
KafkaProtobufSerializer
的提示。如果有效,请通过向 Oracle 的 graalvm-reachability-metadata 存储库发送 PR 来将提示添加到那里。具体来说,这就是需要添加提示的地方。这样,并非所有应用程序都不必在应用程序中添加此自定义提示。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.