我正在用kafka spring集成Java DSL做一个poc 我正在从数据库(DB)读取一行并将该行作为消息发送到 Kafka 主题。请找到下面的代码。 代码正在编译,我可以从数据库中获取记录,但我没有在主题中看到任何消息。
@Configuration
public class KafkaProduceConfig {
@Bean
public IntegrationFlow pollingAdapterFlow(EntityManagerFactory entityManagerFactory, MyTransformer transformer) {
return IntegrationFlow
.from(Jpa.inboundAdapter(entityManagerFactory).entityClass(MyRecord.class),
e -> e.poller(p -> p.cron("*/1 * * * * *").maxMessagesPerPoll(1).transactional())
.autoStartup(true))
.log(message -> "Polled DB Records from KafkaProduceConfig : " + message.getPayload())
.split()
.log(message -> "Record after split : " + message.getPayload())
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("myRecord", "payload",true))
.transform(transformer,"getCustomeRecord")
.enrichHeaders(hrdSpec ->hrdSpec.headerExpression("customeRecord","payload",true))
.log(message -> "Transformed Record : " + message.getPayload() +",topic :" +message.getHeaders().get("topic"))
.channel("sendToKafka")
.get();
}
@Bean
public IntegrationFlow outboundChannelAdapterFlow() {
return IntegrationFlow.from("sendToKafka")
.log(message -> "outboundChannelAdapterFlow received payload : " + message.getPayload() +",topic :"
+message.getHeaders().get("topic")+"key :"+message.getHeaders().get("key"))
.handle(m->Kafka.outboundChannelAdapter(producerFactory()).topic(m.getHeaders().get("topic").toString())
.messageKey(m.getHeaders().get("key").toString())
// .headerMapper(mapper())
.partitionId((Integer) m.getHeaders().get("partitionId")))
.get();
}
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
}
消息应发布到 Kafka 主题。
配置
.handle(m->Kafka.outboundChannelAdapter(producerFactory())
不正确。该 lambda 创建一个新的 MessageHandler
,其主体只是在新消息到达时使用该 Kafka
工厂。此代码只是不处理此消息。
您必须研究一个
handle()
变体,其中您提供的是工厂提供的 MessageHandler
,而不是 lambda 的新产品。
所以,像这样:
.handle(Kafka.outboundChannelAdapter(producerFactory())
.topic(m -> m.getHeaders().get("topic").toString())
.messageKey(m -> m.getHeaders().get("key").toString())
// .headerMapper(mapper())
.partitionId(m -> (Integer) m.getHeaders().get("partitionId")))
这样,
MessageHandler
将在配置阶段创建。在运行时,将根据请求消息调用其 handleMessage()
方法。所有这些选项现在都是在运行时调用的 lambda。
附注请编辑您的问题以获得更易读的代码片段。