我有一个已成功连接并使用来自 RabbitMQ 的消息的代码。 但过了一段时间,当这个问题发生时,消费者无法接收消息,无论其连接如何。
package rabbitmq
import (
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
conn *amqp.Connection
queues map[string]amqp.Queue
connString string
rabbitCloseError chan *amqp.Error
recoveryConsumer []RecoveryConsumer
// ch *amqp.Channel
// exchange_name string
}
type RecoveryConsumer struct {
queueName string
routingKey string
handler func(d amqp.Delivery)
concurrency int8
}
type (
Delivery = amqp.Delivery
)
func (r *RabbitMQ) IfExist(queueName string) bool {
for _, item := range r.recoveryConsumer {
if item.queueName == queueName {
return false
}
}
return true
}
func (r *RabbitMQ) RecoverConsumers() {
for _, i := range r.recoveryConsumer {
go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
log.Infof("Consumer for %v successfully recovered", i.queueName)
}
}
func (r *RabbitMQ) Reconnector() {
for { //nolint
select {
case err := <-r.rabbitCloseError:
log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
log.Debug("Reconnecting after connection closed")
sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
r.connection()
r.RecoverConsumers()
}
}
}
func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
if virthost != "/" || len(virthost) > 0 {
r.connString += virthost
}
r.connection()
go r.Reconnector()
}
func (r *RabbitMQ) connection() {
if r.conn != nil {
if !r.conn.IsClosed() {
return
} else {
log.Info("Reconnecting to RabbitMQ...")
}
}
var err error
r.conn, err = amqp.Dial(r.connString)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
r.conn.Config.Heartbeat = 5 * time.Second
r.queues = make(map[string]amqp.Queue)
r.rabbitCloseError = make(chan *amqp.Error)
r.conn.NotifyClose(r.rabbitCloseError)
log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}
func (r *RabbitMQ) CreateChannel() *amqp.Channel {
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
return nil
}
return ch
}
func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
r.queues[name] = q
// r.ch.ExchangeDeclare()
}
func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
_, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
}
}
func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
span := sentry.StartSpan(context.TODO(), "publish message")
defer span.Finish()
err := ch.Publish(
"", // exchange
r.queues[queue].Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: map[string]interface{}{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent,
Priority: 0,
CorrelationId: "",
ReplyTo: "",
Expiration: "",
MessageId: "",
Timestamp: time.Now().UTC(),
Type: "",
UserId: "",
AppId: "",
Body: body,
})
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Debugf("Send message: %s", string(body))
}
func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
// prefetch 4x as many messages as we can handle at once
ok := r.IfExist(queueName)
if ok {
r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
queueName: queueName,
routingKey: routingKey,
handler: handler,
concurrency: int8(concurrency),
})
}
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
}
prefetchCount := concurrency * 1
err = ch.Qos(prefetchCount, 0, false)
if err != nil {
sentry.CaptureException(err)
log.Errorf("%s: %s", "Failed QOS", err)
}
r.QueueAttach(ch, queueName)
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed consume message", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
os.Exit(1)
}
go func() {
for msg := range msgs {
handler(msg)
}
}()
}
func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
st := time.Now()
for time.Since(st).Seconds() < 1 {
msg, ok, err := ch.Get(queueName, true)
if err != nil {
log.Errorf("Can't consume queue. Error: %s", err.Error())
sentry.CaptureException(err)
return nil
}
if ok {
return msg.Body
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
这可能是什么原因? 我知道它应该在 Rabbit 端,但客户端库无法显示任何错误......
因为开始工作后,消费持续聆听,工作成功。
我唯一可以建议你尝试的是使用心跳。这将检测连接是否由于网络故障等原因而中断。
您可以在这里查看:https://www.rabbitmq.com/heartbeats.html.
我对此不太确定,自从我使用它以来已经很长时间了,但是如果你在接收消息位周围放置一个 try catch,当连接断开时它可能会出现在 catch 中。
希望这对您解决问题有一点帮助。
我也有类似的问题:
我发现Consume方法中设置的自动确认标志和消息接收器中的手动确认有问题。
也许这会对某人有所帮助。
我想如果没有的话,下一步将是使用wireshark调试AMQP协议:)