Rabbit消费者已连接,但一段时间后无法接收队列中的消息

问题描述 投票:0回答:2

我有一个已成功连接并使用来自 RabbitMQ 的消息的代码。 但过了一段时间,当这个问题发生时,消费者无法接收消息,无论其连接如何。

package rabbitmq

import (
    "context"
    "fmt"
    "os"
    "runtime"
    "time"

    "github.com/getsentry/sentry-go"
    log "github.com/sirupsen/logrus"
    "github.com/streadway/amqp"
)

type RabbitMQ struct {
    conn             *amqp.Connection
    queues           map[string]amqp.Queue
    connString       string
    rabbitCloseError chan *amqp.Error
    recoveryConsumer []RecoveryConsumer
    // ch         *amqp.Channel
    // exchange_name string
}

type RecoveryConsumer struct {
    queueName   string
    routingKey  string
    handler     func(d amqp.Delivery)
    concurrency int8
}

type (
    Delivery = amqp.Delivery
)

func (r *RabbitMQ) IfExist(queueName string) bool {
    for _, item := range r.recoveryConsumer {
        if item.queueName == queueName {
            return false
        }
    }
    return true
}

func (r *RabbitMQ) RecoverConsumers() {
    for _, i := range r.recoveryConsumer {
        go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
        log.Infof("Consumer for %v successfully recovered", i.queueName)
    }
}

func (r *RabbitMQ) Reconnector() {
    for { //nolint
        select {
        case err := <-r.rabbitCloseError:
            log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
            log.Debug("Reconnecting after connection closed")
            sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
            r.connection()
            r.RecoverConsumers()
        }
    }
}

func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
    r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
    if virthost != "/" || len(virthost) > 0 {
        r.connString += virthost
    }
    r.connection()
    go r.Reconnector()
}

func (r *RabbitMQ) connection() {
    if r.conn != nil {
        if !r.conn.IsClosed() {
            return
        } else {
            log.Info("Reconnecting to RabbitMQ...")
        }
    }

    var err error
    r.conn, err = amqp.Dial(r.connString)
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
    }
    r.conn.Config.Heartbeat = 5 * time.Second
    r.queues = make(map[string]amqp.Queue)

    r.rabbitCloseError = make(chan *amqp.Error)
    r.conn.NotifyClose(r.rabbitCloseError)
    log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
    log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}

func (r *RabbitMQ) CreateChannel() *amqp.Channel {
    ch, err := r.conn.Channel()
    if err != nil {
        log.Error(err)
        return nil
    }
    return ch
}

func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
    q, err := ch.QueueDeclare(
        name,  // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        log.Fatalf("%s: %s", "Failed to declare a queue", err)
    }
    r.queues[name] = q
    // r.ch.ExchangeDeclare()
}

func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
    _, err := ch.QueueDeclare(
        name,  // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        ch.Close()
        log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
        sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
    }
}

func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
    span := sentry.StartSpan(context.TODO(), "publish message")
    defer span.Finish()
    err := ch.Publish(
        "",                   // exchange
        r.queues[queue].Name, // routing key
        false,                // mandatory
        false,                // immediate
        amqp.Publishing{
            Headers:         map[string]interface{}{},
            ContentType:     "application/json",
            ContentEncoding: "",
            DeliveryMode:    amqp.Persistent,
            Priority:        0,
            CorrelationId:   "",
            ReplyTo:         "",
            Expiration:      "",
            MessageId:       "",
            Timestamp:       time.Now().UTC(),
            Type:            "",
            UserId:          "",
            AppId:           "",
            Body:            body,
        })
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed to publish a message", err)
    }

    log.Debugf("Send message: %s", string(body))
}

func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
    // prefetch 4x as many messages as we can handle at once
    ok := r.IfExist(queueName)
    if ok {
        r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
            queueName:   queueName,
            routingKey:  routingKey,
            handler:     handler,
            concurrency: int8(concurrency),
        })
    }

    ch, err := r.conn.Channel()
    if err != nil {
        log.Error(err)
    }
    prefetchCount := concurrency * 1
    err = ch.Qos(prefetchCount, 0, false)
    if err != nil {
        sentry.CaptureException(err)
        log.Errorf("%s: %s", "Failed QOS", err)
    }
    r.QueueAttach(ch, queueName)

    msgs, err := ch.Consume(
        queueName, // queue
        "",        // consumer
        true,      // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if err != nil {
        sentry.CaptureException(err)
        log.Fatalf("%s: %s", "Failed consume message", err)
        sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
        os.Exit(1)
    }

    go func() {
        for msg := range msgs {
            handler(msg)
        }
    }()
}

func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
    st := time.Now()
    for time.Since(st).Seconds() < 1 {
        msg, ok, err := ch.Get(queueName, true)
        if err != nil {
            log.Errorf("Can't consume queue. Error: %s", err.Error())
            sentry.CaptureException(err)
            return nil
        }
        if ok {
            return msg.Body
        }
        time.Sleep(50 * time.Millisecond)
    }
    return nil
}

这可能是什么原因? 我知道它应该在 Rabbit 端,但客户端库无法显示任何错误......

因为开始工作后,消费持续聆听,工作成功。

go rabbitmq message-queue
2个回答
0
投票

我唯一可以建议你尝试的是使用心跳。这将检测连接是否由于网络故障等原因而中断。

您可以在这里查看:https://www.rabbitmq.com/heartbeats.html.

我对此不太确定,自从我使用它以来已经很长时间了,但是如果你在接收消息位周围放置一个 try catch,当连接断开时它可能会出现在 catch 中。

希望这对您解决问题有一点帮助。


0
投票

我也有类似的问题:

  1. 如果启动时队列中存在消息连接从队列接收消息。 如果启动时队列中没有消息:连接收到单个消息发送到队列(启动后发送)
  2. 过了一会儿,连接停止接收新消息

我发现Consume方法中设置的自动确认标志和消息接收器中的手动确认有问题。

也许这会对某人有所帮助。

我想如果没有的话,下一步将是使用wireshark调试AMQP协议:)

© www.soinside.com 2019 - 2024. All rights reserved.