我有一个Java类,它通过GUI上的特定操作启动与RabbitMQ服务器的连接(使用发布/订阅模式),并监听新事件。
我想添加一个新功能,允许用户设置一个“结束时间”,该时间将阻止我的应用程序监听新事件(停止消耗队列而不关闭队列)。
我试图利用basicCancel方法,但是我找不到一种方法可以使它在预定义的日期工作。在我的Subscribe类中启动一个新线程,在到达给定日期时将调用basicCancel是一个好主意,还是有更好的方法呢?
听新事件
private void listenToEvents(String queueName) {
try {
logger.info(" [*] Waiting for events. Subscribed to : " + queueName);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
TypeOfEvent event = null;
String message = new String(body);
// process the payload
InteractionEventManager eventManager = new InteractionEventManager();
event = eventManager.toCoreMonitorFormatObject(message);
if(event!=null){
String latestEventOpnName = event.getType().getOperationMessage().getOperationName();
if(latestEventOpnName.equals("END_OF_PERIOD"))
event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);
latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();
ndaec.receiveTypeOfEventObject(event);
}
}
};
channel.basicConsume(queueName, true, consumer);
//Should I add the basicCancel here?
}
catch (Exception e) {
logger.info("The Monitor could not reach the EventBus. " +e.toString());
}
}
启动连接
public String initiateConnection(Timestamp endTime) {
Properties props = new Properties();
try {
props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
}catch(IOException e){
e.printStackTrace();
}
RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());
ConnectionFactory factory = new ConnectionFactory();
exchangeTopic = new HashMap<String,String>();
String exchangeMerged = config.getExchange();
logger.info("Exchange=" + exchangeMerged);
String[] couples = exchangeMerged.split(";");
for(String couple : couples)
{
String[] infos = couple.split(":");
if (infos.length == 2)
{
exchangeTopic.put(infos[0], infos[1]);
}
else
{
logger.error("Invalid Exchange Detail: " + couple);
}
}
for(Entry<String, String> entry : exchangeTopic.entrySet()) {
String exchange = entry.getKey();
String topic = entry.getValue();
factory.setHost(config.getHost());
factory.setPort(Integer.parseInt(config.getPort()));
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
try {
connection1= factory.newConnection();
channel = connection1.createChannel();
channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
/*Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", endTime.getTime());*/
channel.queueDeclare(config.getQueue(),false,false,false,null);
channel.queueBind(config.getQueue(),exchange,topic);
logger.info("Connected to RabbitMQ.\n Exchange: " + exchange + " Topic: " + topic +"\n Queue Name is: "+ config.getQueue());
return config.getQueue();
} catch (IOException e) {
logger.error(e.getMessage());
e.printStackTrace();
} catch (TimeoutException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
return null;
}
您可以创建一个延迟的队列,设置离开时间,这样,您要发送给您的消息就会在您要停止使用消费者时立即被完全清除。
然后,您必须将死信交换绑定到一个队列,该队列的使用者将在收到消息后立即停止另一个队列。
[永远不要在使用RabbitMq时使用线程,因为延迟消息,您可以做很多有趣的事情!