Rabbitmq 客户端陷入循环 golang

问题描述 投票:0回答:1

最近遇到了很多问题,因为我试图观察我的秘密上发生的事件,我修改了我的 go 脚本以实例化 RabbitMQ。添加了一个 EventWatcher 和一个 go routine 来解决这个问题。但是接下来发生的事情是,在“RabbitMQ 客户端已连接”之后,我的客户端似乎完全卡住了 “初始化通道” “换频道乐趣” 日志,什么都没有发生。

我在另一个内部为观看事件部分交互了一个 go 例程,这是有道理的,因为它是不同的线程。

希望你们中的一些人能提供帮助。

// Client stucture for managing a client connection
type Client struct {
    connection      *amqp.Connection
    channel         *amqp.Channel
    done            chan bool
    notifyConnClose chan *amqp.Error
    notifyChanClose chan *amqp.Error
    isReady         bool
    isReadyChan     chan bool
    secretWatcher   *fsnotify.Watcher
    credsPath       string
}

// StartClientConnection initialize all needed instance for be prepared to send messages
func StartClientConnection() (*Client, error) {
    ilog.Info("Start to launch RabbitMQ client connection...")

    client := &Client{
        done:        make(chan bool),
        isReadyChan: make(chan bool),
    }

    client, err := NewRabbitmqClient(os.Getenv("RABBITMQ_CREDS"))
    if err != nil {
        ilog.Fatal(logs.NewCommonError("RabbitMQ Event Watcher failed to init: %v", err))
    }

    go client.connect()

    ilog.Info("RabbitMQ client connection started")
    return client, nil
}

func NewRabbitmqClient(credsPath string) (*Client, error) {
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        return nil, fmt.Errorf("Init RabbitMQ client : failed to init watcher, %v", err)
    }
    err = watcher.Add(credsPath)
    if err != nil {
        return nil, fmt.Errorf("Init RabbitMQ client : Failed to Watch file, %v", err)
    }
    ilog.Info("RABBITMQ EVENT WATCHER STARTED")
    rmq := Client{secretWatcher: watcher, credsPath: credsPath}
    return &rmq, nil
}

func (rmq *Client) connect() {
    for {
        rmq.isReady = false
        rabbitMQServerCredentials := GetRabbitMQServerConfiguration()
        rabbitMQUserCredentials := GetRabbitMQUserConfiguration()

        uri := formatRabbitMqURIServerconnection(rabbitMQUserCredentials, rabbitMQServerCredentials)

        _, err := rmq.connectClient(uri)
        fmt.Println(uri)
        go rmq.handleClientReconnect()

        if err != nil {
            ilog.Errorf("Failed to initialize RabbitMQ client connection. Retrying...:%v", err)
            select {
            case <-rmq.done:
                return

            case <-time.After(time.Duration(5 * time.Second)):
                continue
            }
        }

        if done := rmq.handleChannelReconnect(rmq.connection); done {
            break
        }
    }
    return
}

// Watcher
func (rmq *Client) handleClientReconnect() {
    for {
        select {
        case event, ok := <-rmq.secretWatcher.Events:
            if !ok {
                return
            }
            if event.Has(fsnotify.Remove) || event.Has(fsnotify.Write) {
                ilog.Info(event.Name)
            }
        case err, ok := <-rmq.secretWatcher.Errors:
            if !ok {
                return
            }
            ilog.Errorf("Error in watcher: %v", err)
        }
    }
}

// handleChannelReconnect - will wait for a channel error and then continuously attempt to re-initialize both channels
func (client *Client) handleChannelReconnect(conn *amqp.Connection) bool {
    for {
        client.isReady = false

        err := client.initializeChannel(conn)

        if err != nil {
            ilog.Error("Failed to initialize RabbitMQ channel. Retrying...")
            select {
            case <-client.done:
                return true
            case <-time.After(time.Duration(5 * time.Second)):
            }
            continue
        }

        select {
        case <-client.done:
            return true
        case <-client.notifyConnClose:
            ilog.Error("Connection closed. Reconnecting RabbitMQ client...")
            client.isReadyChan <- false
            return false
        case <-client.notifyChanClose:
            ilog.Error("Channel closed. Reconnecting RabbitMQ channel...")
            client.isReadyChan <- false
        }
    }
}

// connect will create a new AMQP connection
func (client *Client) connectClient(addr string) (*amqp.Connection, error) {
    conn, err := amqp.Dial(addr)

    if err != nil {
        return nil, err
    }

    client.changeConnection(conn)
    ilog.Info("RabbitMQ client connected")
    return conn, nil
}

// init will initialize channel & declare queue
func (client *Client) initializeChannel(conn *amqp.Connection) error {
    ilog.Info("Init Channel")
    ch, err := conn.Channel()

    if err != nil {
        return err
    }

    err = ch.Confirm(false)

    if err != nil {
        return err
    }

    client.changeChannel(ch)

    client.isReady = true
    client.isReadyChan <- true

    ilog.Info("RabbitMQ channel connected")
    return nil
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
    ilog.Info("Change Channel func")
    client.channel = channel
    client.notifyChanClose = make(chan *amqp.Error, 1)
    client.channel.NotifyClose(client.notifyChanClose)
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
    client.connection = connection
    client.notifyConnClose = make(chan *amqp.Error, 1)
    client.connection.NotifyClose(client.notifyConnClose)
}

// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) Consume(queueName string) <-chan amqp.Delivery {

    var err error
    var messages <-chan amqp.Delivery

    ilog.Info(logs.NewConsumerInfoLog("Start to launch a connection to a new queue", QueueName))

    for {
        if !client.isReady {
            ilog.Error(logs.NewConsumerErrorLog("Cannot listen message on queue, retry...", QueueName, err))
            return nil
        }

        messages, err = client.channel.Consume(
            queueName,
            "",
            true,
            false,
            false,
            false,
            nil,
        )

        if err == nil {
            ilog.Info(logs.NewConsumerInfoLog("RabbitMQ will listen on a new queue", QueueName))
            return messages
        }

        ilog.Error(logs.NewConsumerErrorLog("Cannot listen message on queue, retry...", QueueName, err))

        time.Sleep(reconnectConsumerDelay)

    }

}

// ShutdownClientConnection close socket connection
func (client *Client) ShutdownClientConnection() {
    ilog.Info("Start to shutdown rabbitmq instance...")

    if !client.isReady {
        ilog.Info("RabbitMQ instance already closed")
        return
    }

    close(client.done)
    close(client.isReadyChan)

    err := client.channel.Close()
    if err != nil {
        ilog.Error(logs.NewCommonError("Cannot close channel connection", err))
    }
    err = client.connection.Close()
    if err != nil {
        ilog.Error(logs.NewCommonError("Cannot close client connection", err))
    }
    client.isReady = false
    ilog.Info("Rabbitmq instance closed")
}
go rabbitmq
1个回答
0
投票

在您的代码中,您有一个连接到 RabbitMQ 以监视事件的程序。您使用通道和 goroutines 来监听这些事件。但是,连接到RabbitMQ后程序似乎卡住了,什么也没有发生。

原因是您的 goroutine 之一 handleClientReconnect() 正在阻止主 goroutine 继续。这是因为该函数正在监听一个从未关闭的通道上的事件。

要解决此问题,您需要确保通道在不再需要时关闭。你可以通过在函数中添加一个 defer 语句来实现,它会在函数退出时自动关闭通道。

此外,您应该在 handleClientReconnect() 中的 for 循环中添加一个条件,用于检查程序是否正在关闭。如果是,那么该函数应该退出。这将防止程序卡住并允许它按预期进行。

func (rmq *Client) handleClientReconnect() {
    defer rmq.secretWatcher.Close()

    for {
        select {
        case event, ok := <-rmq.secretWatcher.Events:
            if !ok {
                return
            }
            if event.Has(fsnotify.Remove) || event.Has(fsnotify.Write) {
                ilog.Info(event.Name)
            }
        case err, ok := <-rmq.secretWatcher.Errors:
            if !ok {
                return
            }
            ilog.Errorf("Error in watcher: %v", err)
        case <-rmq.done:
            return
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.