我有一个Ids
类型的int64
数组这是我试图发布的我的Nsq消息。
nsqMsg := st{
Action : "insert",
Ids : Ids
GID : Gids
}
msg, err := json.Marshal(nsqMsg)
if err != nil {
log.Println(err)
return err
}
err = nsqProducer.Publish(TOPIC-NAME, msg)
if err != nil {
log.Println(err)
return err
}
在我的消费者中,我逐个获取每个Id并从我的数据存储区中获取基于我的Id的信息。
因此,在获取时,如果我的CreateObject方法返回错误,则可能出现这种情况,因此我通过重新排列msg(这是错误)来处理该情况,因此可以重试它。
for i := 0; i < len(data.Ids); i++ {
Object, err := X.CreateObject(data.Ids[i)
if err != nil {
requeueMsgData = append(requeueMsgData, data.Ids[i])
continue
}
DataList = append(DataList, Object)
}
if len(requeueMsgData) > 0 {
msg, err := json.Marshal(requeueMsgData)
if err != nil {
log.Println(err)
return err
}
message.Body = msg
message.Requeue(60 * time.Second)
log.Println("error while creating Object", err)
return n
}
那么,这是正确的做法吗?他们有这种情况的任何缺点吗?再次发布它会更好吗?
某些队列(如Kafka)支持确认,在消费者实际确认成功收到该项目之前,队列中的所有项目都不会从队列中删除。
此模型的优点是,如果消费者在消费后但在确认之前死亡,则该项目将自动重新排队。模型的缺点是在这种情况下项目可能会丢失。
确认模型的风险是项目现在可以被双重消耗。如果消费者尝试具有副作用的消费(例如递增计数器或改变数据库)但不承认,则重试可能无法创建所需的结果。 (请注意,通过nsq文档阅读,即使您不重新排队数据也无法保证重试,因此无论如何您的代码都可能必须采取防御措施)。
如果您想更深入地了解这一点,您应该查看“完全一次”与“最多一次”处理的主题。
通过nsq文档阅读,看起来不支持确认,因此如果您有义务使用nsq,这可能是您的最佳选择。
与dolan所说的一致,有几种情况你可以遇到:
因为nsq可以(并且大多数def将:p)多次传递消息CreateObjects
可能/应该是idempotent以处理这种情况。
此外,重新传递是一种重要的安全机制,原始消息不应该在所有单个ID或已确认创建或成功重新排队之前完成,这可确保不会丢失任何数据。
IMO处理它的方式看起来非常好,但最重要的考虑因素是IMO在接收重复消息的环境中处理正确性/数据完整性。
另一种选择可能是批处理Requeue,以便它尝试生成失败的id的单个输出消息,这可以在任何给定时间减少队列中的消息数:
考虑一个包含3个ID的消息:
消息ID:[id1,id2,id3]
id1成功创建,id2和id3失败:
程序可以尝试所有操作并发出单个重新排队消息,id2,id3。
但也要与此进行权衡。