我有一个从队列接收消息的客户端。我目前有一个实现
MessageListener
的 onMessage()
。
收到消息后,将对其进行进一步处理,然后通过
onMessage()
方法将其保存到数据库中;然后客户端确认收到消息。
只要数据库启动了就没有问题。但如果数据库宕机,客户端将不会确认。
为了满足这一点,我希望客户端按计划的时间间隔向队列发送计划的请求,以获取任何未确认的消息。
事实上,我执行此操作的唯一方法是重新启动客户端,这并不理想。有没有办法触发队列重新发送未确认的消息而不需要重新启动?
我有什么
onMessage()
:
//code to connect to queue
try {
if (DB is available){
//process message
//save required details to DB
msg.acknowledge();
}
else{
//schedule to request same message later from queue
}
} catch (Exception e) {}
我认为标准行为已经在做你想要的事情:如果消息代理使用相同的数据库并且数据库不可用,它将不会接受消息,因此客户端将假脱机它们,直到消息代理再次准备好。
如果它们不共享相同的数据库并且消息代理已打开,则它将对消息进行假脱机处理,并在
onMessage
引发异常时重试。
消息代理将尝试根据其可配置的策略重新发送。
经过更多研究,我偶然发现了 session.recover(),我可以用它来触发重新传递。我看到有 RedeliveryPolicy 类,我可以用它来设置消息重新发送选项。现在我的代码看起来像:
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setBackOffMultiplier((short) 2);
policy.setRedeliveryDelay(30000);
policy.setInitialRedeliveryDelay(60000);
policy.setUseExponentialBackOff(true);
((ActiveMQConnectionFactory)factory).setRedeliveryPolicy(policy);
final Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
...
...
...
..
//inside onMessage()
try {
if (DB is available){
//process message
//save required details to DB
msg.acknowledge();
}
else{
session.recover();
}
} catch (Exception e) {}