最近遇到了很多问题,因为我试图观察我的秘密上发生的事件,我修改了我的 go 脚本以实例化 RabbitMQ。添加了一个 EventWatcher 和一个 go routine 来解决这个问题。但是接下来发生的事情是,在“RabbitMQ 客户端已连接”之后,我的客户端似乎完全卡住了 “初始化通道” “换频道乐趣” 日志,什么都没有发生。
我在另一个内部为观看事件部分交互了一个 go 例程,这是有道理的,因为它是不同的线程。
希望你们中的一些人能提供帮助。
// Client stucture for managing a client connection
type Client struct {
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
isReady bool
isReadyChan chan bool
secretWatcher *fsnotify.Watcher
credsPath string
}
// StartClientConnection initialize all needed instance for be prepared to send messages
func StartClientConnection() (*Client, error) {
ilog.Info("Start to launch RabbitMQ client connection...")
client := &Client{
done: make(chan bool),
isReadyChan: make(chan bool),
}
client, err := NewRabbitmqClient(os.Getenv("RABBITMQ_CREDS"))
if err != nil {
ilog.Fatal(logs.NewCommonError("RabbitMQ Event Watcher failed to init: %v", err))
}
go client.connect()
ilog.Info("RabbitMQ client connection started")
return client, nil
}
func NewRabbitmqClient(credsPath string) (*Client, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, fmt.Errorf("Init RabbitMQ client : failed to init watcher, %v", err)
}
err = watcher.Add(credsPath)
if err != nil {
return nil, fmt.Errorf("Init RabbitMQ client : Failed to Watch file, %v", err)
}
ilog.Info("RABBITMQ EVENT WATCHER STARTED")
rmq := Client{secretWatcher: watcher, credsPath: credsPath}
return &rmq, nil
}
func (rmq *Client) connect() {
for {
rmq.isReady = false
rabbitMQServerCredentials := GetRabbitMQServerConfiguration()
rabbitMQUserCredentials := GetRabbitMQUserConfiguration()
uri := formatRabbitMqURIServerconnection(rabbitMQUserCredentials, rabbitMQServerCredentials)
_, err := rmq.connectClient(uri)
fmt.Println(uri)
go rmq.handleClientReconnect()
if err != nil {
ilog.Errorf("Failed to initialize RabbitMQ client connection. Retrying...:%v", err)
select {
case <-rmq.done:
return
case <-time.After(time.Duration(5 * time.Second)):
continue
}
}
if done := rmq.handleChannelReconnect(rmq.connection); done {
break
}
}
return
}
// Watcher
func (rmq *Client) handleClientReconnect() {
for {
select {
case event, ok := <-rmq.secretWatcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Write) {
ilog.Info(event.Name)
}
case err, ok := <-rmq.secretWatcher.Errors:
if !ok {
return
}
ilog.Errorf("Error in watcher: %v", err)
}
}
}
// handleChannelReconnect - will wait for a channel error and then continuously attempt to re-initialize both channels
func (client *Client) handleChannelReconnect(conn *amqp.Connection) bool {
for {
client.isReady = false
err := client.initializeChannel(conn)
if err != nil {
ilog.Error("Failed to initialize RabbitMQ channel. Retrying...")
select {
case <-client.done:
return true
case <-time.After(time.Duration(5 * time.Second)):
}
continue
}
select {
case <-client.done:
return true
case <-client.notifyConnClose:
ilog.Error("Connection closed. Reconnecting RabbitMQ client...")
client.isReadyChan <- false
return false
case <-client.notifyChanClose:
ilog.Error("Channel closed. Reconnecting RabbitMQ channel...")
client.isReadyChan <- false
}
}
}
// connect will create a new AMQP connection
func (client *Client) connectClient(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)
if err != nil {
return nil, err
}
client.changeConnection(conn)
ilog.Info("RabbitMQ client connected")
return conn, nil
}
// init will initialize channel & declare queue
func (client *Client) initializeChannel(conn *amqp.Connection) error {
ilog.Info("Init Channel")
ch, err := conn.Channel()
if err != nil {
return err
}
err = ch.Confirm(false)
if err != nil {
return err
}
client.changeChannel(ch)
client.isReady = true
client.isReadyChan <- true
ilog.Info("RabbitMQ channel connected")
return nil
}
// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
ilog.Info("Change Channel func")
client.channel = channel
client.notifyChanClose = make(chan *amqp.Error, 1)
client.channel.NotifyClose(client.notifyChanClose)
}
// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error, 1)
client.connection.NotifyClose(client.notifyConnClose)
}
// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) Consume(queueName string) <-chan amqp.Delivery {
var err error
var messages <-chan amqp.Delivery
ilog.Info(logs.NewConsumerInfoLog("Start to launch a connection to a new queue", QueueName))
for {
if !client.isReady {
ilog.Error(logs.NewConsumerErrorLog("Cannot listen message on queue, retry...", QueueName, err))
return nil
}
messages, err = client.channel.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
if err == nil {
ilog.Info(logs.NewConsumerInfoLog("RabbitMQ will listen on a new queue", QueueName))
return messages
}
ilog.Error(logs.NewConsumerErrorLog("Cannot listen message on queue, retry...", QueueName, err))
time.Sleep(reconnectConsumerDelay)
}
}
// ShutdownClientConnection close socket connection
func (client *Client) ShutdownClientConnection() {
ilog.Info("Start to shutdown rabbitmq instance...")
if !client.isReady {
ilog.Info("RabbitMQ instance already closed")
return
}
close(client.done)
close(client.isReadyChan)
err := client.channel.Close()
if err != nil {
ilog.Error(logs.NewCommonError("Cannot close channel connection", err))
}
err = client.connection.Close()
if err != nil {
ilog.Error(logs.NewCommonError("Cannot close client connection", err))
}
client.isReady = false
ilog.Info("Rabbitmq instance closed")
}
在您的代码中,您有一个连接到 RabbitMQ 以监视事件的程序。您使用通道和 goroutines 来监听这些事件。但是,连接到RabbitMQ后程序似乎卡住了,什么也没有发生。
原因是您的 goroutine 之一 handleClientReconnect() 正在阻止主 goroutine 继续。这是因为该函数正在监听一个从未关闭的通道上的事件。
要解决此问题,您需要确保通道在不再需要时关闭。你可以通过在函数中添加一个 defer 语句来实现,它会在函数退出时自动关闭通道。
此外,您应该在 handleClientReconnect() 中的 for 循环中添加一个条件,用于检查程序是否正在关闭。如果是,那么该函数应该退出。这将防止程序卡住并允许它按预期进行。
func (rmq *Client) handleClientReconnect() {
defer rmq.secretWatcher.Close()
for {
select {
case event, ok := <-rmq.secretWatcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Write) {
ilog.Info(event.Name)
}
case err, ok := <-rmq.secretWatcher.Errors:
if !ok {
return
}
ilog.Errorf("Error in watcher: %v", err)
case <-rmq.done:
return
}
}
}