我有一个 springboot 应用程序,它将 kafka 事件发送到具有特定 avro 架构的主题。演示代码如下:
@Configuration
@Slf4j
public class KafkaListener {
@Bean
public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
return messages -> {
List<DummyClass> output = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output field
});
// some more work
return output;
};
}
}
并且
DummyClass
类是由 avro 模式生成的类(.avsc 文件在 resources folder
中定义并由 avro 插件生成)。
这是一个相当标准的设置。现在,我希望能够将事件发送到 2 个主题,而不是像我现在使用上面的代码那样将事件发送到单个主题。
在网上搜索后,我找到了一个spring文档,说明了使用元组的示例。所以我尝试了一下并实现了除了助焊剂部分之外的所有内容(因为我不想要它)。新代码最终如下:
@Configuration
@Slf4j
public class KafkaListener {
@Bean
public Function<List<DummyClass>, Tuple2<List<DummyClass>,List<DummyClass>>> acceptEvent() {
return messages -> {
List<DummyClass> output1 = new ArrayList<>();
List<DummyClass> output2 = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output1 and output2 fields
});
// some more work
return Tuples.of(output1,output2);
};
}
}
但是,这似乎不起作用,因为我收到以下异常:
java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments
我怀疑这是因为我没有使用助焊剂。 这个答案表明可能使用了错误的导入,但我验证了我使用的所有内容都来自正确的导入。
稍微不同的实现遇到了本文中指定的类似问题。但是,此人通过将包含输出参数的自定义对象作为字段传递来解决了该问题。我尝试执行相同的操作,但运行应用程序会导致以下异常:
Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
至此,我现在已经用尽了所有选择。我的问题如下: 如何实现将事件写入多个主题?如果我的实现之一就是这样,我做错了什么?还有其他事情可以尝试吗? - 我见过 KStream 据说可以做我想做的事情,但我似乎找不到好的代码示例。
经过一些来回,我发现streamBridge是实现这一目标的最佳选择。
@Configuration
@Slf4j
@AllArgsContstractor
public class KafkaListener {
private final StreamBridge streamBridge;
@Bean
public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
return messages -> {
List<DummyClass> output = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output field you want to be sent to topic 1
if(condition) {
Message<DummyClass> msgToSendViaStreamBridge = createMsg(); // method to create an event for streamBridge to send to the topic 2
streamBridge.send("acceptEvnet-out-1", msgToSendViaStreamBridge);
}
});
// some more work
return output;
};
}
}
所以现在我的
Function
将向现有主题(由 acceptEvnet-out-0
文件中的 application.yml
绑定定义)发送事件,并将其放入 output
列表中。我们将事件发送到 acceptEvnet-out-1
绑定中定义的不同主题。这可用于将事件发送到任意数量的主题。