在Alpakka中使用Transactional.Sink无法向Kafka主题生成消息,但我看到idempotent producer被启用了。

问题描述 投票:0回答:1

你好,我试图使用Alpakka文档中显示的Producer api,我能够使用Transactional source消耗记录,并且创建了Producer,但无法将消息放到topic中,无法使用Alpakka中的Transactional.Sink向topic生产,但我看到idempotent producer已经启用,我看到日志显示它进入了逻辑,但它没有向myTopic生产事件。

[info] o.a.k.c.p.KafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c,transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] 启用了幂等生产者。

请你帮我理解为什么它可能不会产生消息到主题。

我在本地使用docker运行我的代码。

以下是我的代码

``` Transactional.source(consumerSettings,
            Subscriptions.topics(topicNames))
            .mapMaterializedValue(innerControl = _)
            .map(consumerRecord => {

              handleBusiness(consumerRecord)
                .flatMap(res => Source.single(res)
                       .runWith(Transactional.sink(producerSettings, 
                                           UUID.randomUUID().toString)))

            })

        }
          source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:

   ```
 private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {

       (conversion of consumedMessage ) map { message =>

            ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)

     }


 }```


apache-kafka apache-kafka-streams kafka-producer-api alpakka exactly-once
1个回答
0
投票

我也可以用一个Flow来实现,但是交易源需要有一个SinkFlow,如下所示。

Transactional.source(consumerSettings,
                      Subscriptions.topics(topicNames))
                      .mapMaterializedValue(innerControl = _)
                      .mapAsync(5) { msg => business(msg)}
                      .via(Transactional.flow(producersettings, transactions-id))
© www.soinside.com 2019 - 2024. All rights reserved.