我在我的一个 go 服务中实现了以下 RPC 客户端,遵循官方 教程
使用方法很简单,
func DoSomethingWithRpc() {
...
rpcResponse, err := rmq_helpers.SendRpcMessage(
ts.publisher, // Just a struct that keep conn details
data,
)
// Do something with `rpcResponse`
...
}
这就是SendRpcMessage函数。问题是这只有效一次,如果我第二次触发
DoSomethingWithRpc()
,它就会超时(由于 SendRpcMessage()
中的超时逻辑,否则只是继续挂起它们阻挡胎面)
但是,在第二个
SendRpcMessage
调用中,我可以看到我的 RPC 服务器接收到该事件,并且它获得了正确的 CorrelationID
和 ReplyTo
队列值。我什至可以看到 RMQ 队列中的回复消息(从 rmq 仪表板)
func SendRpcMessage(
client *rmq.Client,
body []byte,
) (res *amqp.Delivery, err error) {
ch := client.GetChannel()
if ch == nil {
err = fmt.Errorf("Channel is nil")
return
}
replyQueue, err := client.GetChannel().QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
if err != nil {
fmt.Println("Error declaring queue: ", err)
return nil, err
}
msgs, err := client.GetChannel().Consume(
replyQueue.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println("Error consuming messages: ", err)
return nil, err
}
corrId := fmt.Sprintf("%d", time.Now().UnixNano())
fmt.Println("Correlation ID: ", corrId, "ReplyTo: ", replyQueue.Name)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = client.GetChannel().PublishWithContext(ctx,
"",
client.GetQueueName(),
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
ReplyTo: replyQueue.Name,
CorrelationId: corrId,
},
)
if err != nil {
return nil, err
}
timeout := time.After(5 * time.Second)
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-timeout:
err = fmt.Errorf("Timeout")
return
}
}
}
我在这里做错了什么?任何帮助将非常感激。
您没有重新初始化超时。尝试更换这个:
timeout := time.After(5 * time.Second) // <--- move this below
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-timeout:
err = fmt.Errorf("Timeout")
return
}
}
这样:
for {
select {
case d := <-msgs:
if d.CorrelationId == corrId {
// client.GetChannel().QueueDelete(replyQueue.Name, false, false, false)
res = &d
return
}
case <-time.After(5 * time.Second):
err = fmt.Errorf("Timeout")
return
}
}