在 artemis 中使用 AMQP 时,有一个注释表明您希望稍后传递消息,称为
x-opt-delivery-time
https://activemq.apache.org/components/artemis/documentation/latest/amqp .html#amqp-调度-消息-传递.
这在创建消息时工作正常,但当您使用修改后的处置框架设置它时,它似乎不起作用。
我创建了这个快速程序来演示这个问题
package main
import (
"context"
"log"
"time"
"github.com/Azure/go-amqp"
)
func main() {
// create connection
opts := &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("admin", "admin"),
}
conn, err := amqp.Dial(context.TODO(), "amqp://0.0.0.0:13001", opts)
if err != nil {
panic(err)
}
// create session
session, err := conn.NewSession(context.TODO(), nil)
if err != nil {
panic(err)
}
// create a new sender
sender, err := session.NewSender(context.TODO(), "test.queue", nil)
if err != nil {
panic(err)
}
// create a new receiver
receiver, err := session.NewReceiver(context.TODO(), "test.queue", nil)
if err != nil {
panic(err)
}
// send test message
msg := amqp.NewMessage([]byte{1})
msg.Annotations = make(amqp.Annotations)
msg.Annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
log.Printf("sending msg")
err = sender.Send(context.TODO(), msg, nil)
if err != nil {
panic(err)
}
// receive the test message
msg, err = receiver.Receive(context.TODO(), nil)
if err != nil {
panic(err)
}
log.Printf("received initial msg")
annotations := msg.Annotations
if annotations == nil {
annotations = make(amqp.Annotations)
}
annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
err = receiver.ModifyMessage(context.TODO(), msg, &amqp.ModifyMessageOptions{
DeliveryFailed: true,
UndeliverableHere: false,
Annotations: annotations,
})
if err != nil {
panic(err)
}
// receive the modified test message
msg, err = receiver.Receive(context.TODO(), nil)
if err != nil {
panic(err)
}
log.Printf("received modified msg")
receiver.AcceptMessage(context.TODO(), msg)
}
它正确地将初始消息的传递延迟了 10 秒,但在使用修改消息配置时,它不会延迟消息的重新传递。
这是我运行脚本时的输出
2024/07/28 21:02:34 sending msg
2024/07/28 21:02:44 received initial msg
2024/07/28 21:02:44 received modified msg
知道这是否是 Artemis 中的一个错误,或者当您希望在特定时间重新发送消息时是否有其他方法可以实现此功能?
Apache ActiveMQ Artemis 会忽略修改后的传递状态下的消息注释,请参阅 https://github.com/apache/activemq-artemis/blob/2.36.0/artemis-protocols/artemis-amqp-protocol/src/main/ java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java#L415
创建问题来请求此新功能