Spring Cloud Stream允许用户使用Spring Integration开发和运行消息传递微服务,并在本地或云中或甚至在Spring Cloud Data Flow上运行它们。只需添加@EnableBinding并将您的应用程序作为Spring Boot应用程序(单个应用程序上下文)运行。您只需要连接到总线的物理代理,如果类路径上有相关的总线实现,则这是自动的。
有Spring Cloud Stream的http绑定器吗?
我有一个服务需要建模为流应用程序(该服务目前为相同的用例提供同步和异步 api )。我正在探索是否可以对实际处理进行建模...
如何动态为流绑定器提供组值,以在使用 Spring Cloud Stream 时实现灵活性。我怎样才能实现这个目标?
Spring Cloud Stream Rabbit Binder 动态组名称
使用Spring Cloud Stream时,我需要动态地将组值赋予绑定器,我该怎么做?
我们可以使用 PostgreSQL 而不是默认的 dynamo 数据库来进行检查点和锁定,以防使用 Binder 方法从 Kinesis 消费数据吗
我有一个 Spring Boot 应用程序,它使用 Amazon Kinesis 来使用数据并将其保存到 PostgreSQL。 由于我已经在我的应用程序中使用一个数据库(PostgreSQL),我想避免使用另一个数据库
我正在为其动态创建云流和消费者,因此无法正确定义消费者,因为有关类型的信息丢失了。 在下面的示例中,使用 @Bean 注释定义的消费者...
Amazon Kinesis Stream 名称未按照提供的 application.yml 文件获取
我有一个使用 Amazon Kinesis 来消费数据的 Spring Boot 应用程序。 它使用了错误的流名称 我需要使用的实际名称是:“test-test.tst.v1” 但它使用:“
Spring Cloud Stream 多个 Kafka 集群配置
在我的项目中,我需要连接到两个不同的Kafka代理。 我的 application.yaml 看起来有点像这样: 春天: 云: 功能: 定义:orderCreatedListener;orderProcessedLi...
我试图弄清楚在使用 Spring Cloud Stream 应用程序构建生产者时是否有办法配置 PublishConfirm 行为。我知道建立生产者有两种方法 -
Spring云流项目,Leyton版本忽略kafka DLQ配置
我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...
Spring 流项目,Leyton 版本忽略 kafka DLQ
我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...
Kafka Streams 主题保留问题:动态主题配置和未发布的数据
我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:
Spring Cloud azure kafka 与多个 eventhub 命名空间绑定
了解如何使用 eventhub kafka 配置 Spring Cloud 以在同一命名空间中发送和接收消息,如下所示 https://github.com/MicrosoftDocs/azure-dev-docs/blob/master/articles/java/spr...
Spring云流批处理生产者抛出`SerializationException`
我正在尝试使用文档中描述的批量生成器。 我创建了以下供应商: @豆 公共供应商>> fetch() { return () -> List.of...
如何处理 org.apache.kafka.common.errors.SslAuthenticationException: SSL 握手失败
我的 Kafka 客户端具有以下配置: 春天: 云: 配置: 启用:假 溪流: 卡夫卡: 活页夹: 经纪人:本地主机:9092 zkNodes:本地主机...
我需要轮询消息并将其发布到 RabbitMQ。设置了100个store,每个store可以轮询5条以上消息并将其发布到RabbitMQ。每分钟进行一次轮询。下面是
调度程序没有频道“unknown.channel.name”的订阅者
我正在尝试使用 spring 绑定来运行发布者和订阅者。 确保目的地(交换)是在rabbit-mq本地创建的。 发布者(app.yml) 春天: 云: 溪流: ...
如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?
使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @豆 公共函数,KStream 使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @Bean public Function<KStream<String, Double>, KStream<String, Double>> sqrt() { return numbers -> numbers.mapValues(Math::sqrt); } @Bean public Consumer<KStream<String, Double>> log() { return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value)); } 其中 sqrt() 输出数字的平方根,然后用 log() 记录。 application.yaml因此看起来像这样: spring: cloud: stream: function: bindings: sqrt-in-0: numbers sqrt-out-0: sqrt-numbers log-in-0: sqrt-numbers kafka: streams: bindings: sqrt: consumer: application-id: sqrtApplicationId log: consumer: application-id: logApplicationId 启动应用程序时,出现以下错误: The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled. Action: Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true 当然,现在将 definition-overriding 设置为 true 并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException。 我该如何解决这个问题? 问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error 假设您有两个名为 numbers 和 sqrt-numbers 的 Kafka 主题,则以下配置应该有效。 spring: cloud: stream: bindings: sqrt-in-0: destination: numbers sqrt-out-0: destination: sqrt-numbers log-in-0: destination: sqrt-numbers kafka: streams: bindings: sqrt-in-0: consumer: application-id: sqrtApplicationId log-in-0: consumer: application-id: logApplicationId 您可以使用 spring.cloud.stream.function.bindings.. 覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0 更改为 input,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input 那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination(通过 spring.cloud.stream.bindings.input.destination)。 您遇到的特定异常是因为您试图重用已创建的绑定名称 - sqrt-numbers。
具有两个输入主题的函数路由似乎破坏了 KafkaBinderMetrics
根据 https://stackoverflow.com/a/66871632/22992363,我正在配置一个功能路由器,它接受来自两个 Kafka 主题的输入。这是我的配置的精简副本: 春天: 云:
我有一个简单的案例,我需要使用 Spring Cloud Stream 3.2.4 版本来实现。 我有一个预定的消息生产者: 公共类 MySourceOfMessagesImpl { 私人随机随机...
Spring Cloud Stream不会针对指定配置自动反序列化kafka消息
我使用 spring-cloud-stream-binder-kafka 和 spring-cloud-stream 以功能方式配置 kafka 流。 我的云依赖项来自 我使用 spring-cloud-stream-binder-kafka 和 spring-cloud-stream 以功能方式配置 kafka 流。 我的云依赖项来自 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> 主要依赖项是 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency> 我的yml配置: kafka-conf: # server: localhost:9101 server: localhost:9092 group: test-streams consume-topic-report-details: report_details_topic report-details-dlq: report_details_dlq produce-topic-report-details: report_details_topic_redirect schema: http://localhost:8081 spring: application: name: ${kafka-conf.group}-streaming cloud: function: definition: reportDetails stream: bindings: reportDetails-in-0: contentType: application/*+avro destination: ${kafka-conf.consume-topic-report-details} group: ${kafka-conf.group}-streaming reportDetails-out-0: contentType: application/*+avro destination: ${kafka-conf.produce-topic-report-details} kafka: streams: binder: deserialization-exception-handler: sendToDlq configuration: commit.interval.ms: 100 default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde bindings: reportDetails-in-0: consumer: dlqName: ${kafka-conf.report-details-dlq} binder: brokers: ${kafka-conf.server} schemaRegistryClient: endpoint: ${kafka-conf.schema} 问题详情 使用io.confluent.kafka.serializers.KafkaAvroSerializer序列化的消费消息。 我希望我的服务自动对流中的每条消息使用 io.confluent.kafka.serializers.KafkaAvroDeserializer,但我的 yml 配置中似乎忽略了某些内容。 结果我的蒸汽失败了 @Bean Function<ReportDetails, ReportDetails> reportDetails() { return data -> { log.info("input reportDetails: {}", data); return data; }; } 有例外 Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app') at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958) 另一方面,我尝试直接反序列化它(它有效)。 @Bean Function<byte[], byte[]> filterAbsence() { return dto -> { SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5); try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) { AbsenceDto obj = deserializer.deserialize("report_details_topic", dto); log.info("received message: {}", obj); } log.info("received message: {}", dto); return dto; }; } 我怀疑我的 DQL 配置也不正确。 当我的消费者失败时,我希望消息将被重定向到 DLQ,但其中一个是空的。 问题: 如何修复我的 yml 配置以使反序列化自动工作? 如何修复我的 DQL 配置或获得确认(如果配置正确)? 您检查过应用程序日志吗? StreamsConfig 已在启动时登录,因此您可以验证您的配置是否已被拾取,看起来没有。 我不是 spring 人,也不是 yml 专家,但你为什么使用: default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde 这些配置被称为 default.key.serde 和 default.value.serde 所以不应该是 default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde 类似于你设置的commit.interal.ms?您可以验证它是否检测到提交间隔更改吗?