是否有一种有效的方法来生成 Kafka 消息,同时严格保留消息的顺序并确保使用 confluence-kafka-go 不会丢失消息?

问题描述 投票:0回答:1

我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。

我正在使用 confluence-kafka-go 开发一项服务,需要 Kafka 生产者支持以下功能:

  • 至少一次投递,不允许消息丢失
  • 消息不能重新排序
  • 无限次重试

有没有有效的方法来实现这些?

我已经阅读了 librdkafkaINTRODUCTION.md 并意识到幂等生产者很接近我的需求。但是,队列中的一条或多条消息由于超时而丢失的可能性似乎很小。我需要的是尽可能重试。

目前,我通过同步发送消息来满足这些要求,如下所示:

for {
    // Some retry logic here
    ...

    err := sendMessage(p, message)
    if err != nil {
        continue
    }
    break
}

func sendMessage(p *kafka.Producer, message *kafka.Message) error {
    deliveryChan := make(chan kafka.Event)
    err := p.Produce(message, deliveryChan)
    if err != nil {
        return err
    }
    e := <-deliveryChan
    // Check if the message was delivered; if not, return an error
    ...
    return nil
}

但是,一条一条同步生成消息会严重影响吞吐量。

有人可以提供建议,以更有效的方式使用 confluence-kafka-go 来满足这些要求吗?

go apache-kafka confluent-kafka-go
1个回答
0
投票

这可能是不可能的,除非有适当的后端级支持(目前已知的库中似乎没有这种情况)。

基本上,生产者可以进行批次处理,理论上,之前发送的批次可能会失败,而在成功之后发送的下一个批次可能会成功(破坏您的顺序)。在 Java 中,您可以通过 max in-flight request config 来控制它。

因此,这将是全有或全无的情况,但在批次级别上 - 只有在前一个批次成功后,您才会提交另一批次进行生产。

© www.soinside.com 2019 - 2024. All rights reserved.