我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。
我正在使用 confluence-kafka-go 开发一项服务,需要 Kafka 生产者支持以下功能:
我已经阅读了 librdkafka 的 INTRODUCTION.md 并意识到幂等生产者很接近我的需求。然而,队列中的一条或多条消息由于超时而丢失的可能性似乎很小。我需要的是尽可能重试。
目前,我通过同步发送消息来满足这些要求,如下所示:
for {
// Some retry logic here
...
err := sendMessage(p, message)
if err != nil {
continue
}
break
}
func sendMessage(p *kafka.Producer, message *kafka.Message) error {
deliveryChan := make(chan kafka.Event)
err := p.Produce(message, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
// Check if the message was delivered; if not, return an error
...
return nil
}
但是,一条一条同步生成消息会严重影响吞吐量。
任何人都可以推荐一种使用 confluence-kafka-go 来满足这些要求而不使其被同步调用阻塞的方法吗?
这可能是不可能的,除非有适当的后端级支持(目前已知的库中似乎没有这种情况)。
基本上,生产者可以进行批次处理,理论上,之前发送的批次可能会失败,而在成功之后发送的下一个批次可能会成功(破坏您的顺序)。在 Java 中,您可以通过 max in-flight request config 来控制它。
因此,这将是全有或全无的情况,但在批次级别上 - 只有在前一个批次成功后,您才会提交另一批次进行生产。
这也意味着您需要仔细注意您的生产者一次仅处理一批 - API 并不完美(因为它需要单个消息,然后决定自行批处理),但您可以例如分叉并增强它。
您不希望发生的情况是您提交时的情况,例如5(大)记录,它们分为 [1, 2, 3] 和 [4, 5] 批次,第一批失败,第二批成功。您可能需要对生产者的内部批处理机工作原理有一些额外的了解(和/或自己增强它)。
说了这么多,为什么不实现业务级别的序列id并在消费者级别进行处理呢?