Spring Cloud Stream允许用户使用Spring Integration开发和运行消息传递微服务,并在本地或云中或甚至在Spring Cloud Data Flow上运行它们。只需添加@EnableBinding并将您的应用程序作为Spring Boot应用程序(单个应用程序上下文)运行。您只需要连接到总线的物理代理,如果类路径上有相关的总线实现,则这是自动的。
适用于 Azure 服务总线的 Spring Cloud Stream Binder 破坏了测试
添加依赖项 com.azure.spring spring-cloud-azure-stream-binder-servicebus 添加依赖项 <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency> 突然使我的 RestTemplate 客户端测试的 some 失败,并出现以下错误: java.lang.AssertionError: JSON path "$" Expected: (a collection containing <152> and a collection containing <153>) but: a collection containing <152> was String "<UnmodifiableRandomAccessList><item>152</item><item>153</item></UnmodifiableRandomAccessList>" 或 Expected :application/json Actual :application/xml;charset=UTF-8 我的测试如下: @Autowired private RestTemplate restTemplate; private MockRestServiceServer mockServer; @BeforeEach public void init() { mockServer = MockRestServiceServer.createServer(restTemplate); } public void test() throws Exception { mockServer.expect(requestTo(url)) .andExpect(method(HttpMethod.POST)) .andExpect(jsonPath("$", hasItems(152, 153))) .andRespond(withStatus(HttpStatus.OK) .contentType(MediaType.APPLICATION_JSON) .body("{}")); // actual code removed for brevity ResultDTO result = client.makeRestCall(Set.of(1L)); // beneath makes a RestTemplate call to the url assertNotNull(result); } 奇怪的是,测试类中只有部分测试失败,即该类有 8 个测试,其中 4 个测试失败,且没有明显的原因。我看到的是,我删除了 Azure 依赖项,测试通过了。有谁知道这种依赖会带来什么可能破坏我的代码? 似乎这是已知问题。问题在于 Azure 依赖项引入了 jackson-dataformat-xml 依赖项,它破坏了其余控制器响应的格式(使它们成为 XML 而不是默认的 JSON)。解决该问题的简单方法是排除依赖项: <exclusion> <artifactId>jackson-dataformat-xml</artifactId> <groupId>com.fasterxml.jackson.dataformat</groupId> </exclusion> 该线程提到依赖项已变为可选,但我找不到发生这种情况的版本。 您必须通过添加扩展来使您的 spring-cloud-azure-stream-binder-servicebus 依赖项如下: <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </exclusion> </exclusions> </dependency>
如何与Spring Cloud Streams供应商发送消息/数据?
当特定表记录中有插入时,我尝试在kafka上发送消息。 我认为申请的这一部分将被视为供应商/生产商。 我有以下c...
创建名为“functionBindingRegistrar”的 bean 时出错
我的应用程序无法启动,出现以下异常: org.springframework.beans.factory.BeanCreationException:创建名称为“functionBindingRegistrar”的bean时出错 组织。
在使用通用类型的事件负载时获取 java.lang.ClassCastException
我有 Spring Boot 3.1.1 kotlin 应用程序,它使用 Spring Cloud 2022.0.4 来消费 Kafka 事件。 消费者功能: @豆 fun ipConsumer(): 消费者 我有 Spring Boot 3.1.1 kotlin 应用程序,它使用 Spring Cloud 2022.0.4 来消费 Kafka 事件。 消费功能: @Bean fun ipConsumer(): Consumer<Message<PayloadWithEmbeddedHeaders<InstructionalPlanCreatedEvent>>> { return Consumer { if (it.headers[PublisherKafka.EVENT_TYPE_HEADER_NAME] == InstructionalPlanCreatedEvent::class.java.simpleName) { process(ip = it.payload.body) } } } 通用类型类: data class PayloadWithEmbeddedHeaders<T>(val headers: Map<String, Any> = emptyMap(), val body: T) 活动类别: data class InstructionalPlanCreatedEvent( val id: UUID, val organisationId: UUID, val organisationName: String? = null, val curriculumId: Long, val curriculumName: String? = null, val subjectId: Long, val subjectName: String? = null, val gradeId: Long, val gradeName: String? = null, val academicYearId: Long, val academicYear: String? = null ) 在我的消费者函数中,当访问 it.payload.body 时,它会抛出 ClassCastException,因为它以 LinkedHashMap 的形式出现。 此外,它并不是在活页夹测试中失败,而是在实际的 kafka 事件传入时失败。 这个问题已通过 this commit 解决。 您可以明确地将其添加为依赖项吗? <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-context</artifactId> <version>4.0.6-SNAPSHOT</version> </dependency>
我有一个流“dest”,我想要两个消费者,并且两个流应该消耗相同的消息。 但是,如果一个流消耗了该消息,那么该消息就会丢失并且不会发送到...
未解析的参考:来源 - org.springframework.cloud.stream.messaging.source
由于存在漏洞,我正在更新一些项目库,更新后我无法进行此导入工作: 导入 org.springframework.cloud.stream.messaging.Source (“来源”我...
Spring Cloud Stream / Spring Boot 3 中的 test-binder 发生了什么?
我找不到任何相关文档,但是 Spring Cloud Stream / Spring Boot 3 的测试绑定程序发生了什么? 此方法可以很好地处理 2021 bom 导入,但现在出现此错误:Cou...
RabbitMQ 超级流 |使用 RabbitMQ 的 Spring Cloud Stream 消费者组
我有 2 个服务(服务 A 和服务 B)需要使用相同的数据。我可以通过使用队列类型作为流来实现这一点。但同时,在自动缩放期间我不想要服务 A1 和 s...
在函数中使用SendToDlqAndContinue而不使用处理器spring-cloud-stream-binder-kafka-streams
当我在函数中处理的消息不遵循某些逻辑时,我正在尝试向 DLQ 发送消息。 对于某些上下文,我在 application.yaml 中有此配置 我的功能看起来像...
我正在尝试实现 Turbine AMQP 来整合从多个服务到 Hystrix Dashboard 的所有流。 所以我在 gradle 文件中添加了几个依赖项,之后我无法...
我正在使用 AWS Kinesis 和 Spring Cloud 从数据事件中心读取数据。部署结束,日志如下: 2023-09-26 14:07:30.476 信息 [,,] 9331 --- [esis-consumer-1] a.i.k.
在 Spring Cloud Stream 中全局启用 RabbitMQ 仲裁队列
是否有可能将每个要配置的队列定义为仲裁队列而不是经典队列? spring.cloud.stream.rabbit.bindings..consumer.quorum.enabled=true 我...
带有RabbitMQ绑定器的Spring云流抛出java.lang.IllegalStateException:消息体太大
我正在使用 Spring Cloud Stream 4.0.4 和 Rabbitmq Binder 4.0.4 来处理 Spring Boot 3.1.3 应用程序中的消息。我也在使用rabbitMQ 3.10.0。问题是,当
由于配置中存在 @EnableBinding 注释,功能绑定被禁用
我将 Spring Boot 升级到 2.3.0.RELEASE,Spring Boot 应用程序卡在最后一行并且永远不会启动。感谢任何帮助,谢谢! 09:26 23:57:52.276 [主要] [警告] ...
SpringCloudStream 函数.SpringBoot 3 的定义
将 spring 从 2.x 升级到 3.x 后,配置变量的解析发生了某种变化。 在 SpringBoot 2.x.x 中,变量 spring.cloud.stream.function.definition 可以由 Spring 解析...
KafkaStream 在代理滚动升级时达到 ERROR 状态
我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...
Spring Cloud Stream 应用程序在测试时未连接到 EmbeddedKafka
尝试测试基本的 Spring Cloud Stream 应用程序,EmbeddedKafka 代理在运行测试时永远不会收到已发布的消息。 这是我当前代码的 github 存储库:https://gith...
我们正在使用 spring cloud dataflow 2.10.3,在 kubernetes 集群中使用 helm 部署。 我不确定这是一个错误还是做错了什么,但我们已经看到它发生在所有类型的流中,
Spring Cloud Stream Kafka Binder 中的消费者和生产者并发配置?
spring.cloud.stream.binding.myInputProcess.consumer.concurrency:3 我的应用程序中有这样的配置,它从 kafka 获取输入并处理它,然后放回另一个 kafka。 那么,我...
Spring Cloud Streams 中错误处理的不同方法的示例很少,并且通过文档部分提供的少数示例似乎也不起作用。 我有一个测试仓库...