使用 AMQP 注释的 ActiveMQ Artemis 重新交付延迟

问题描述 投票:0回答:1

在 artemis 中使用 AMQP 时,有一个注释表明您希望稍后传递消息,称为

x-opt-delivery-time
https://activemq.apache.org/components/artemis/documentation/latest/amqp .html#amqp-调度-消息-传递.

这在创建消息时工作正常,但当您使用修改后的处置框架设置它时,它似乎不起作用。

我创建了这个快速程序来演示这个问题

package main

import (
    "context"
    "log"
    "time"

    "github.com/Azure/go-amqp"
)

func main() {
    // create connection
    opts := &amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain("admin", "admin"),
    }
    conn, err := amqp.Dial(context.TODO(), "amqp://0.0.0.0:13001", opts)
    if err != nil {
        panic(err)
    }
    // create session
    session, err := conn.NewSession(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    // create a new sender
    sender, err := session.NewSender(context.TODO(), "test.queue", nil)
    if err != nil {
        panic(err)
    }
    // create a new receiver
    receiver, err := session.NewReceiver(context.TODO(), "test.queue", nil)
    if err != nil {
        panic(err)
    }

    // send test message
    msg := amqp.NewMessage([]byte{1})
    msg.Annotations = make(amqp.Annotations)
    msg.Annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
    log.Printf("sending msg")
    err = sender.Send(context.TODO(), msg, nil)
    if err != nil {
        panic(err)
    }

    // receive the test message
    msg, err = receiver.Receive(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    log.Printf("received initial msg")
    annotations := msg.Annotations
    if annotations == nil {
        annotations = make(amqp.Annotations)
    }
    annotations["x-opt-delivery-time"] = time.Now().Add(time.Second * 10).UnixMilli()
    err = receiver.ModifyMessage(context.TODO(), msg, &amqp.ModifyMessageOptions{
        DeliveryFailed:    true,
        UndeliverableHere: false,
        Annotations:       annotations,
    })
    if err != nil {
        panic(err)
    }

    // receive the modified test message
    msg, err = receiver.Receive(context.TODO(), nil)
    if err != nil {
        panic(err)
    }
    log.Printf("received modified msg")
    receiver.AcceptMessage(context.TODO(), msg)
}

它正确地将初始消息的传递延迟了 10 秒,但在使用修改消息配置时,它不会延迟消息的重新传递。

这是我运行脚本时的输出

2024/07/28 21:02:34 sending msg
2024/07/28 21:02:44 received initial msg
2024/07/28 21:02:44 received modified msg

知道这是否是 Artemis 中的一个错误,或者当您希望在特定时间重新发送消息时是否有其他方法可以实现此功能?

amqp activemq-artemis
1个回答
© www.soinside.com 2019 - 2024. All rights reserved.