你好,我试图使用Alpakka文档中显示的Producer api,我能够使用Transactional source消耗记录,并且创建了Producer,但无法将消息放到topic中,无法使用Alpakka中的Transactional.Sink向topic生产,但我看到idempotent producer已经启用,我看到日志显示它进入了逻辑,但它没有向myTopic生产事件。
[info] o.a.k.c.p.KafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c,transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] 启用了幂等生产者。
请你帮我理解为什么它可能不会产生消息到主题。
我在本地使用docker运行我的代码。
以下是我的代码
``` Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
.runWith(Transactional.sink(producerSettings,
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)
And my handleBusiness logics looks like below:
```
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]] = {
(conversion of consumedMessage ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}
}```
我也可以用一个Flow来实现,但是交易源需要有一个SinkFlow,如下所示。
Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.mapAsync(5) { msg => business(msg)}
.via(Transactional.flow(producersettings, transactions-id))