RMQ RPC 仅返回一次响应

问题描述 投票:0回答:1

我在我的一个 go 服务中实现了以下 RPC 客户端,遵循官方 教程

使用方法很简单,

func DoSomethingWithRpc() {
 ...
 rpcResponse, err := rmq_helpers.SendRpcMessage(
    ts.publisher, // Just a struct that keep conn details
    data,
 )
 // Do something with `rpcResponse`
 ...
}

这就是SendRpcMessage函数。问题是这只有效一次,如果我第二次触发

DoSomethingWithRpc()
,它就会超时(由于
SendRpcMessage()
中的超时逻辑,否则只是继续挂起它们阻挡胎面)

但是,在第二个

SendRpcMessage
调用中,我可以看到我的 RPC 服务器接收到该事件,并且它获得了正确的
CorrelationID
ReplyTo
队列值。我什至可以看到 RMQ 队列中的回复消息(从 rmq 仪表板)

func SendRpcMessage(
    client *rmq.Client,
    body []byte,
) (res *amqp.Delivery, err error) {
    ch := client.GetChannel()

    if ch == nil {
        err = fmt.Errorf("Channel is nil")
        return
    }

    replyQueue, err := client.GetChannel().QueueDeclare(
        "",
        false,
        false,
        true,
        false,
        nil,
    )

    if err != nil {
        fmt.Println("Error declaring queue: ", err)
        return nil, err
    }

    msgs, err := client.GetChannel().Consume(
        replyQueue.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    if err != nil {
        fmt.Println("Error consuming messages: ", err)
        return nil, err
    }

    corrId := fmt.Sprintf("%d", time.Now().UnixNano())
    fmt.Println("Correlation ID: ", corrId, "ReplyTo: ", replyQueue.Name)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    err = client.GetChannel().PublishWithContext(ctx,
        "",
        client.GetQueueName(),
        false,
        false,
        amqp.Publishing{
            ContentType:   "application/json",
            Body:          body,
            ReplyTo:       replyQueue.Name,
            CorrelationId: corrId,
        },
    )

    if err != nil {
        return nil, err
    }

    timeout := time.After(5 * time.Second)

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-timeout:
            err = fmt.Errorf("Timeout")
            return
        }

    }
}

我在这里做错了什么?任何帮助将非常感激。

go rabbitmq rpc
1个回答
0
投票

您没有重新初始化超时。尝试更换这个:

    timeout := time.After(5 * time.Second)   // <--- move this below

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-timeout:
            err = fmt.Errorf("Timeout")
            return
        }

    }

这样:

    for {
        select {
        case d := <-msgs:
            if d.CorrelationId == corrId {
                // client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
                res = &d
                return  
            }
        case <-time.After(5 * time.Second):
            err = fmt.Errorf("Timeout")
            return
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.