大家!感谢您尝试提供帮助,我会很简短。
为了学习 Kafka,我正在尝试以下操作:
我读过有关 Kafka 的内容,并在 YouTube 上观看了 Confluence 的视频。
我仍然无法在使用同步或异步 Kafka 生产者之间做出自信的决定。
这就是我需要你帮助的地方。在继续之前,请允许我提供一些代码:
func SomeGinHandler(c *gin.Context) {
// assume we extracted data from request's JSON
// data is stuffed into someValue byte array
/* what comes is a very sensitive spot,
because Publish() may place data on the queue,
but still somehow fail; now we have "dirty" data in the queue
and I see no way to remove it from queue at this moment
*/
err = kafkaProducer.Publish(c.Request.Context(), []byte(someKey), someValue))
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
// the point is that above Publish() call should not block but also be reliable
// I may not lose message from this HTTP handler
// reading through Kafka so far, I fear this is not possible
// I guess I must make a tradeoff, but what is the correct choice??
// stick with synchronous producer, with ack = waitALL ??
// or maybe async producer can somehow work here ?? how ??
c.JSON(http.StatusAccepted, gin.H{"message" : "request accepted"})
}
正如我在代码注释中所描述的,我希望 Kafka 发布代码立即返回,这样 REST 处理程序就不会被阻塞。
我将使用 Kafka 消费者组在单独的微服务中处理数据。
我可能无法承受消息丢失,在阅读了异步生产者之后,我发现它只是“触发并忘记”消息,因此消息丢失是可能的。
显然,我必须做出权衡,那么考虑到 REST 端点将承受重负载,您在上述场景中建议采用什么方法?
关于图书馆,我倾向于 franz-go,但 sarama 似乎也不错。我想避免对 CGO 的依赖,但如果需要的话 confluence 的库也可以。如果您有其他图书馆的想法,我也会考虑在内。我面临的问题本质上是设计问题,所以我不指望图书馆会神奇地解决它。
我建议使用带有“等待所有”ack 的同步生产者(这实际上是为代理配置的最小同步副本)来满足您的要求,以保证消息传递到 Kafka。这将使您能够通知客户端正确的状态代码以及生成 Kafka 时出现的任何错误。
此外,让您的消息幂等地使用,以防产生重复的消息。