在RabbitMQ中给定日期后从特定队列退订

问题描述 投票:1回答:1

我有一个Java类,它通过GUI上的特定操作启动与RabbitMQ服务器的连接(使用发布/订阅模式),并监听新事件。

我想添加一个新功能,允许用户设置一个“结束时间”,该时间将阻止我的应用程序监听新事件(停止消耗队列而不关闭队列)。

我试图利用basicCancel方法,但是我找不到一种方法可以使它在预定义的日期工作。在我的Subscribe类中启动一个新线程,在到达给定日期时将调用basicCancel是一个好主意,还是有更好的方法呢?

听新事件

    private void listenToEvents(String queueName) {
        try {
              logger.info(" [*] Waiting for events. Subscribed to : " + queueName);

              Consumer consumer = new DefaultConsumer(channel) {

                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body) throws IOException {

                    TypeOfEvent event = null;

                    String message = new String(body);


                    // process the payload
                    InteractionEventManager eventManager = new InteractionEventManager();
                    event = eventManager.toCoreMonitorFormatObject(message);


                    if(event!=null){    
                        String latestEventOpnName = event.getType().getOperationMessage().getOperationName();

                        if(latestEventOpnName.equals("END_OF_PERIOD"))
                            event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);            

                        latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();                                    
                        ndaec.receiveTypeOfEventObject(event);                  
                    }
                  }
                };

                channel.basicConsume(queueName, true, consumer);   
             //Should I add the basicCancel here?
        }
        catch (Exception e) {
            logger.info("The Monitor could not reach the EventBus. " +e.toString());
        }     

    }

启动连接

  public String initiateConnection(Timestamp endTime) {

        Properties props = new Properties();
        try {
            props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
         }catch(IOException e){
             e.printStackTrace();
        }                       

        RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());

        ConnectionFactory factory = new ConnectionFactory();

        exchangeTopic = new HashMap<String,String>();
        String exchangeMerged = config.getExchange();
        logger.info("Exchange=" + exchangeMerged);
        String[] couples = exchangeMerged.split(";");

        for(String couple : couples)
        {
            String[] infos = couple.split(":");
            if (infos.length == 2)
            {
                exchangeTopic.put(infos[0], infos[1]);
            }
            else
            {
                logger.error("Invalid Exchange Detail: " + couple);
            }
        }

        for(Entry<String, String> entry : exchangeTopic.entrySet()) {

            String exchange = entry.getKey();
            String topic = entry.getValue();

            factory.setHost(config.getHost());
            factory.setPort(Integer.parseInt(config.getPort()));
            factory.setUsername(config.getUsername());
            factory.setPassword(config.getPassword());

            try {
                connection1= factory.newConnection();
                channel = connection1.createChannel();
                channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
                /*Map<String, Object> args = new HashMap<String, Object>();
                args.put("x-expires", endTime.getTime());*/
                channel.queueDeclare(config.getQueue(),false,false,false,null);
                channel.queueBind(config.getQueue(),exchange,topic);            
                logger.info("Connected to RabbitMQ.\n Exchange: " + exchange + " Topic: " + topic +"\n Queue Name is: "+ config.getQueue());
                return config.getQueue();
            } catch (IOException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            } catch (TimeoutException e) {
                logger.error(e.getMessage());
                e.printStackTrace();
            }
        }
        return null;
    }
java rabbitmq
1个回答
0
投票

您可以创建一个延迟的队列,设置离开时间,这样,您要发送给您的消息就会在您要停止使用消费者时立即被完全清除。

然后,您必须将死信交换绑定到一个队列,该队列的使用者将在收到消息后立即停止另一个队列。

[永远不要在使用RabbitMq时使用线程,因为延迟消息,您可以做很多有趣的事情!

© www.soinside.com 2019 - 2024. All rights reserved.