如何立即停止Spring JmsListener的执行?

问题描述 投票:0回答:1

在我的 Java Spring Boot 3 应用程序中,我有一个简单的 Oracle AQ

JmsListener

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

@Component
@Log4j2
public class SomeJmsListener {

    @JmsListener(
            id = "id_listener_1",
            destination = "wmstk_queue",
            containerFactory = "jmsListenerContainerFactory"
    )
    public void listenMessage(Message message) throws Exception {
        if (message instanceof ObjectMessage objectMessage) {
            MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
            
            WebClient.create()
                    .post()
                    .uri("some_url")
                    .bodyValue(messageInfo)
                    .retrieve()
                    .toEntity(String.class)
                    .block();
        }
    }
}

可以使用

JmsListenerEndpointRegistry
在应用程序执行的某个时刻停止此侦听器,如下所示:

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Service;

@Service
@Log4j2
@RequiredArgsConstructor
public class JmsListenerControlService {

    private final JmsListenerEndpointRegistry registry;
    
    
    public void stop(String listenerId) {
        var container = registry.getListenerContainer(listenerId);
        
        if (container.isRunning()) {
            container.stop();
        }
    }
}

要实现的目标是,如果侦听器容器停止,则必须

listenMessage
的执行必须立即停止(如果它正在运行)并且必须回滚JMS
Session

我现在找到的唯一解决方案就是观察容器并手动中断

listenMessage
的线程:

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
@Log4j2
public class SomeJmsListener {

    @Autowired
    private JmsListenerEndpointRegistry registry;

    private ScheduledExecutorService scheduledExecutorService;

    @JmsListener(
            id = "id_listener_1",
            destination = "wmstk_queue",
            containerFactory = "jmsListenerContainerFactory"
    )
    public void listenMessage(Message message) throws Exception {
        observeContainer(Thread.currentThread());

        if (message instanceof ObjectMessage objectMessage) {
            MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();

            WebClient.create()
                    .post()
                    .uri("some_url")
                    .bodyValue(messageInfo)
                    .retrieve()
                    .toEntity(String.class)
                    .block();
        }
        stopObservation();
    }

    /**
     * Scheduled job observes jms listener and if it is not running - interrupts its thread
     */
    private void observeContainer (Thread thread){
        String listenerId = "id_listener_1";
        var listenerContainer = registry.getListenerContainer(listenerId);

        log.debug("Start observation for listener container with id: {}", listenerId);

        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        scheduledExecutorService.scheduleAtFixedRate(
                () -> {
                    if (!listenerContainer.isRunning()) {
                        stopObservation();
                        thread.interrupt(); // interrupt the thread to prevent further execution of listenMessage
                    } else {
                        log.debug("Listener active, id: {}", listenerId);
                    }
                },
                10, // 10ms initial delay
                100, // execute each 100ms
                TimeUnit.MILLISECONDS
        );
    }

    private void stopObservation () {
        log.info("Stop observation for listener container with id: {}", "wmstk-listener-endpoint-1");
        if (scheduledExecutorService != null) {
            scheduledExecutorService.close();
            scheduledExecutorService = null;
        }
    }
}

中断后,消息保留在队列中,并且处理按预期中断,但在日志的最后,我看到此异常,我不确定这是否值得担心:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException: 侦听器方法“listenMessage 抛出 java.lang.Exception”

错误 [er-endpoint-1-1] o.s.j.l.DefaultMessageListenerContainer : 应用程序异常被回滚错误覆盖

原因:java.io.InterruptedIOException: 套接字读取中断: oracle.jakarta.jms.AQjmsException:IO错误:套接字读取中断

所以,这个异常是通过回滚发生的,即在:

package oracle.jakarta.jms;

class AQjmsSession {

     synchronized void forceRollback() throws JMSException {
        Connection db_conn = null;
        db_conn = this.getDBConnection();

        try {
            db_conn.rollback(); // here exception occurs
            this.setConsistency(true);
        } catch (SQLException var3) {
            throw new AQjmsException(var3);
        }

        this.restartConsumers();
    }
}

尽管异常表明回滚发生了异常,但如果消息保留在队列中并且可供读取,是否值得担心此异常? 也许还有其他方法可以顺利回滚并中断方法的执行?

java sql spring spring-boot jms
1个回答
0
投票

我可以看到设计有很多问题

手动中断侦听器线程可能会导致不可预测的行为。 Spring JMS 侦听器容器管理线程生命周期。如果需要停止消息处理,正确的方法是停止容器本身而不是中断线程。您正在干预线程池管理 您可以从这里检查 JmsListenerEndpointRegistry https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jms/config/JmsListenerEndpointRegistry.html 您可能依赖 jmsListener 容器配置和错误处理程序 JmsListener也实现了SmartLifeCycle接口

顺便说一句,您不必为每条消息创建一个 WebClient。定义一个bean并注入它

© www.soinside.com 2019 - 2024. All rights reserved.