我还在 confluenceinc/confluence-kafka-go 上发布了类似的问题。
我正在使用 confluence-kafka-go 开发一项服务,需要 Kafka 生产者支持以下功能:
有没有有效的方法来实现这些?
我已经阅读了 librdkafka 的 INTRODUCTION.md 并意识到幂等生产者很接近我的需求。但是,队列中的一条或多条消息由于超时而丢失的可能性似乎很小。我需要的是尽可能重试。
目前,我通过同步发送消息来满足这些要求,如下所示:
for {
// Some retry logic here
...
err := sendMessage(p, message)
if err != nil {
continue
}
break
}
func sendMessage(p *kafka.Producer, message *kafka.Message) error {
deliveryChan := make(chan kafka.Event)
err := p.Produce(message, deliveryChan)
if err != nil {
return err
}
e := <-deliveryChan
// Check if the message was delivered; if not, return an error
...
return nil
}
但是,一条一条同步生成消息会严重影响吞吐量。
有人可以提供建议,以更有效的方式使用 confluence-kafka-go 来满足这些要求吗?
这可能是不可能的,除非有适当的后端级支持(目前已知的库中似乎没有这种情况)。
基本上,生产者可以进行批次处理,理论上,之前发送的批次可能会失败,而在成功之后发送的下一个批次可能会成功(破坏您的顺序)。在 Java 中,您可以通过 max in-flight request config 来控制它。
因此,这将是全有或全无的情况,但在批次级别上 - 只有在前一个批次成功后,您才会提交另一批次进行生产。