NSQ重新排队半消息

问题描述 投票:1回答:2

我有一个Ids类型的int64数组这是我试图发布的我的Nsq消息。

nsqMsg := st{
    Action  :     "insert",
    Ids     :     Ids
    GID     :     Gids
}

msg, err := json.Marshal(nsqMsg)
if err != nil {
    log.Println(err)
    return err
}
err = nsqProducer.Publish(TOPIC-NAME, msg)
if err != nil {
    log.Println(err)
    return err
}

在我的消费者中,我逐个获取每个Id并从我的数据存储区中获取基于我的Id的信息。

因此,在获取时,如果我的CreateObject方法返回错误,则可能出现这种情况,因此我通过重新排列msg(这是错误)来处理该情况,因此可以重试它。

for i := 0; i < len(data.Ids); i++ {

    Object, err := X.CreateObject(data.Ids[i)
        if err != nil {
            requeueMsgData = append(requeueMsgData, data.Ids[i])
            continue
        }
        DataList = append(DataList, Object)
    }

    if len(requeueMsgData) > 0 {
        msg, err := json.Marshal(requeueMsgData)
        if err != nil {
            log.Println(err)
            return err
        }
        message.Body = msg
        message.Requeue(60 * time.Second)
        log.Println("error while creating Object", err)
        return n
}

那么,这是正确的做法吗?他们有这种情况的任何缺点吗?再次发布它会更好吗?

go nsq
2个回答
1
投票

某些队列(如Kafka)支持确认,在消费者实际确认成功收到该项目之前,队列中的所有项目都不会从队列中删除。

此模型的优点是,如果消费者在消费后但在确认之前死亡,则该项目将自动重新排队。模型的缺点是在这种情况下项目可能会丢失。

确认模型的风险是项目现在可以被双重消耗。如果消费者尝试具有副作用的消费(例如递增计数器或改变数据库)但不承认,则重试可能无法创建所需的结果。 (请注意,通过nsq文档阅读,即使您不重新排队数据也无法保证重试,因此无论如何您的代码都可能必须采取防御措施)。

如果您想更深入地了解这一点,您应该查看“完全一次”与“最多一次”处理的主题。

通过nsq文档阅读,看起来不支持确认,因此如果您有义务使用nsq,这可能是您的最佳选择。


1
投票

与dolan所说的一致,有几种情况你可以遇到:

  • 主消息心跳/租约超时,您再次收到所有ID(来自原始消息)。 NSQ提供"at least once"语义。
  • 任何单个消息的重新排队超时且永远不会完成(回退到主IDS)

因为nsq可以(并且大多数def将:p)多次传递消息CreateObjects可能/应该是idempotent以处理这种情况。

此外,重新传递是一种重要的安全机制,原始消息不应该在所有单个ID或已确认创建或成功重新排队之前完成,这可确保不会丢失任何数据。


IMO处理它的方式看起来非常好,但最重要的考虑因素是IMO在接收重复消息的环境中处理正确性/数据完整性。


另一种选择可能是批处理Requeue,以便它尝试生成失败的id的单个输出消息,这可以在任何给定时间减少队列中的消息数:

考虑一个包含3个ID的消息:

消息ID:[id1,id2,id3]

id1成功创建,id2和id3失败:

程序可以尝试所有操作并发出单个重新排队消息,id2,id3。

但也要与此进行权衡。

© www.soinside.com 2019 - 2024. All rights reserved.