能否使用confluence-kafka-go异步有序、无损地生成Kafka消息? [已关闭]

问题描述 投票:0回答:1

我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。

我正在使用 confluence-kafka-go 开发一项服务,需要 Kafka 生产者支持以下功能:

  • 至少一次投递,不允许消息丢失
  • 消息不能重新排序
  • 无限次重试

我已经阅读了 librdkafkaINTRODUCTION.md 并意识到幂等生产者很接近我的需求。然而,队列中的一条或多条消息由于超时而丢失的可能性似乎很小。我需要的是尽可能重试。

目前,我通过同步发送消息来满足这些要求,如下所示:

for {
    // Some retry logic here
    ...

    err := sendMessage(p, message)
    if err != nil {
        continue
    }
    break
}

func sendMessage(p *kafka.Producer, message *kafka.Message) error {
    deliveryChan := make(chan kafka.Event)
    err := p.Produce(message, deliveryChan)
    if err != nil {
        return err
    }
    e := <-deliveryChan
    // Check if the message was delivered; if not, return an error
    ...
    return nil
}

但是,一条一条同步生成消息会严重影响吞吐量。

任何人都可以推荐一种使用 confluence-kafka-go 来满足这些要求而不使其被同步调用阻塞的方法吗?

go apache-kafka confluent-kafka-go
1个回答
0
投票

这可能是不可能的,除非有适当的后端级支持(目前已知的库中似乎没有这种情况)。

基本上,生产者可以进行批次处理,理论上,之前发送的批次可能会失败,而在成功之后发送的下一个批次可能会成功(破坏您的顺序)。在 Java 中,您可以通过 max in-flight request config 来控制它。

因此,这将是全有或全无的情况,但在批次级别上 - 只有在前一个批次成功后,您才会提交另一批次进行生产。

这也意味着您需要仔细注意您的生产者一次仅处理一批 - API 并不完美(因为它需要单个消息,然后决定自行批处理),但您可以例如分叉并增强它。

您不希望发生的情况是您提交时的情况,例如5(大)记录,它们分为 [1, 2, 3] 和 [4, 5] 批次,第一批失败,第二批成功。您可能需要对生产者的内部批处理机工作原理有一些额外的了解(和/或自己增强它)。


说了这么多,为什么不实现业务级别的序列id并在消费者级别进行处理呢?

© www.soinside.com 2019 - 2024. All rights reserved.