Spring Cloud Stream允许用户使用Spring Integration开发和运行消息传递微服务,并在本地或云中或甚至在Spring Cloud Data Flow上运行它们。只需添加@EnableBinding并将您的应用程序作为Spring Boot应用程序(单个应用程序上下文)运行。您只需要连接到总线的物理代理,如果类路径上有相关的总线实现,则这是自动的。
可以使用 Solace Spring Cloud Stream Binder 设置“立即确认”吗?
根据 https://solace.com/blog/inside-a-solace-message-using-header-properties, 我们可以为消息头设置“立即确认”等属性。 相关API可以在
无法使用 Avro(自定义 Serdes)序列化 Spring Cloud Streams
我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。下面是我的配置 @豆 公共双功能 我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。以下是我的配置 @Bean public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() { return (walletCreated, placementCreated) -> placementCreated .selectKey((key, val) -> val.getClientId()) .join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)), StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord())); } 两个主题都将输入流以产生相同结构的数据。以下是我定义的自定义 Serdes。 public class CustomSerdes { public static Serde<TestRecord> TestRecord() { return new TestRecordSerde(); } public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> { public TestRecordSerde() { super(new TestRecordSerializer(), new TestRecordDeserializer()); } } public static class TestRecordSerializer implements Serializer<TestRecord> { private final KafkaAvroSerializer inner; public TestRecordSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, TestRecord data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class TestRecordDeserializer implements Deserializer<TestRecord> { private final KafkaAvroDeserializer inner; public TestRecordDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public TestRecord deserialize(String topic, byte[] data) { return (TestRecord) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } public static Serde<ModifiedTestPayload> ModifiedTestPayload() { return new ModifiedTestPayloadSerde(); } public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> { public ModifiedTestPayloadSerde() { super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer()); } } public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> { private final KafkaAvroSerializer inner; public ModifiedTestPayloadSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, ModifiedTestPayload data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> { private final KafkaAvroDeserializer inner; public ModifiedTestPayloadDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public ModifiedTestPayload deserialize(String topic, byte[] data) { return (ModifiedTestPayload) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } } 和我的 application.yml 下面 spring: kafka: streams: application-id: test-app cloud: function: definition: process stream: kafka: streams: bindings: process-in-0: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-in-1: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-out-0: producer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde configuration: schema.registry.url: http://localhost:8081 binder: brokers: localhost:9092 configuration: schema.registry.url: http://localhost:8081 specific.avro.reader: true default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde bindings: process-in-0: destination: testkf.placements.created process-in-1: destination: testkf.wallet.created process-out-0: destination: testkf.client.profile_completed logging: level: root: info 当我启动我的应用程序时,我收到如下错误(我有一些关于该应用程序尝试处理的 kafka 主题的数据) org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na] Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient 通过创建自定义 serde 的 bean 修复了这个问题 @Bean public Serde<TestRecord> testRecordSerde() { Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); var r = CustomSerdes.TestRecord(); r.configure(config, false); return r; }
GraalVM 使用 Protobuf 编译的 Spring Cloud Stream Kafka 版本无法序列化器
我有一个带有Kafka和Protobuf的Spring Cloud Stream项目,常规版本工作正常。 使用 Graalvm 编译后,无法生成 Protobuf 消息,消费者工作正常。 我的项目...
升级到 Spring Cloud Stream Binder for Kinesis 4.0.0 后遇到批处理错误
使用 Spring Cloud Stream Binder for Kinesis 版本 4.0.0 升级批处理处理器后,我们遇到以下异常。请注意,我们一直在使用 2.2.0 版本的活页夹...
Kafka 版本:3.0.1- 重复创建 kafka 管理客户端 - 内存泄漏
我们有 springboot 应用程序,它从单个主题消费并生成多个主题的记录。 最近将此应用程序升级到了 Sprinboot-2.6.7 以及 gradle 项目中相应的其他依赖项....
如何重写Spring Cloud Stream的rabbitConnectionFactory?
我有一个使用 Spring Cloud Stream 和自定义 Rabbit 绑定器的应用程序。现在我必须向活页夹添加一些代理配置,但我不知道如何配置活页夹。绑定...
Spring云流kafka流binder——函数式编程模型中的@Transactional注解
如果在事件处理过程中发生异常,我无法执行数据库事务回滚。我正在使用 Spring Cloud Stream 和 Kafka Stream Binder - 以及函数式编程模型......
我有一个微服务,它使用自定义绑定器 CustomBinderA 和声明性方法。在代码中,你可以找到几个带注解的类,如@EnableBinding、@Input、@StreamListen...
使用 Spring Cloud Function 测试 Spring Cloud Stream 时未映射通道
我正在将当前使用基于注释的编程模型的现有流处理代码转换为使用 Spring Cloud Function。使用测试活页夹描述...
KStreams 在 Spring Cloud Stream 中运行速度非常慢
我已经在Spring boot中使用SpringCloudStreams实现了KStreams。 我已准备好从具有 20 个分区的主题写入具有相同分区数的主题。我有 2 个 Pod 正在运行。 平均...
如何将KPL/KCL与spring-cloud-stream-binder-aws-kinesis lib完美结合使用?
我决定在 4.0.1 版本上使用 spring-cloud-stream-binder-aws-kinesis lib,但我对如何使用它实现 KPL/KCL 有一些疑问。我目前正在遵循本指南,了解如何使用...
如何停止 org.apache.kafka.common.errors.SslAuthenticationException:SSL 握手失败
我的 Kafka 客户端具有以下配置: 春天: 云: 配置: 启用:假 溪流: 卡夫卡: 活页夹: 经纪人:本地主机:9092 zkNodes:本地主机...
从 Kafka 到 Rabbit 的 Spring Cloud Stream
我正在尝试使用Kafka消息密钥作为rabbitmq消息的路由密钥将消息从kafka的主题发送到多个rabbitMQ队列。 管道如下所示:kafka 的主题 -> spring cloud
将 org.springframework.cloud.stream.messaging 的 Spring Cloud Stream v2 迁移到 Spring Cloud Stream v4
简介 我目前正在使用 Spring-cloud-stream v2 的 Sink 接口:https://javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/index.html 我已经编写了单元测试......
如何在Spring Cloud Kafka上配置安全协议SASL_SSL?
我有这个application.yml配置: 春天: 云: 溪流: 绑定: kafka演示主题: 目的地:kafka_demo_topic 卡夫卡: 活页夹:
假设它有缺陷或设计错误......但只是想仔细检查我是否在这里遗漏了任何东西。 春季版本:spring-cloud-stream-binder-kafka-reactive:4.1.0 Spring 文档说,在此
Spring Boot 应用程序在我的 application.yml 中使用外部属性
我有一个模块,可以在应用程序 yaml 中启用执行器重启端点并添加一些云流事件。该模块应该通过 m...
我正在使用 Spring Boot 和 Kafka。迁移到 Spring Boot 和其他版本的最新版本时,我遇到了以下问题:@StreamListener 已弃用并从库中删除。我有一个主题...
如何使用在测试容器中运行的RabbitMQ代理将消息发送到Spring Cloud Stream创建的队列?
我想为我的包和标签方法(管道组成)创建一个集成测试,它基于Spring Cloud Function。我知道配置的输入绑定正在等待来自...的消息
根据此处的讨论,@sobychacko 建议创建一个新问题,链接到该讨论。那么,有人可以回答该帖子中发布的最后一个问题吗? 但在...