在我的 Java Spring Boot 3 应用程序中,我有一个简单的 Oracle AQ
JmsListener
:
import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
@Component
@Log4j2
public class SomeJmsListener {
@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
}
}
可以使用
JmsListenerEndpointRegistry
在应用程序执行的某个时刻停止此侦听器,如下所示:
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Service;
@Service
@Log4j2
@RequiredArgsConstructor
public class JmsListenerControlService {
private final JmsListenerEndpointRegistry registry;
public void stop(String listenerId) {
var container = registry.getListenerContainer(listenerId);
if (container.isRunning()) {
container.stop();
}
}
}
要实现的目标是,如果侦听器容器停止,则必须
listenMessage
的执行必须立即停止(如果它正在运行)并且必须回滚JMS Session
。
我现在找到的唯一解决方案就是观察容器并手动中断
listenMessage
的线程:
import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
@Log4j2
public class SomeJmsListener {
@Autowired
private JmsListenerEndpointRegistry registry;
private ScheduledExecutorService scheduledExecutorService;
@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
observeContainer(Thread.currentThread());
if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
stopObservation();
}
/**
* Scheduled job observes jms listener and if it is not running - interrupts its thread
*/
private void observeContainer (Thread thread){
String listenerId = "id_listener_1";
var listenerContainer = registry.getListenerContainer(listenerId);
log.debug("Start observation for listener container with id: {}", listenerId);
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
() -> {
if (!listenerContainer.isRunning()) {
stopObservation();
thread.interrupt(); // interrupt the thread to prevent further execution of listenMessage
} else {
log.debug("Listener active, id: {}", listenerId);
}
},
10, // 10ms initial delay
100, // execute each 100ms
TimeUnit.MILLISECONDS
);
}
private void stopObservation () {
log.info("Stop observation for listener container with id: {}", "wmstk-listener-endpoint-1");
if (scheduledExecutorService != null) {
scheduledExecutorService.close();
scheduledExecutorService = null;
}
}
}
中断后,消息保留在队列中,并且处理按预期中断,但在日志的最后,我看到此异常,我不确定这是否值得担心:
org.springframework.jms.listener.adapter.ListenerExecutionFailedException: 侦听器方法“listenMessage 抛出 java.lang.Exception”
错误 [er-endpoint-1-1] o.s.j.l.DefaultMessageListenerContainer : 应用程序异常被回滚错误覆盖
原因:java.io.InterruptedIOException: 套接字读取中断: oracle.jakarta.jms.AQjmsException:IO错误:套接字读取中断
所以,这个异常是通过回滚发生的,即在:
package oracle.jakarta.jms;
class AQjmsSession {
synchronized void forceRollback() throws JMSException {
Connection db_conn = null;
db_conn = this.getDBConnection();
try {
db_conn.rollback(); // here exception occurs
this.setConsistency(true);
} catch (SQLException var3) {
throw new AQjmsException(var3);
}
this.restartConsumers();
}
}
尽管异常表明回滚发生了异常,但如果消息保留在队列中并且可供读取,是否值得担心此异常? 也许还有其他方法可以顺利回滚并中断方法的执行?
我可以看到设计有很多问题
手动中断侦听器线程可能会导致不可预测的行为。 Spring JMS 侦听器容器管理线程生命周期。如果需要停止消息处理,正确的方法是停止容器本身而不是中断线程。您正在干预线程池管理 您可以从这里检查 JmsListenerEndpointRegistry https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/jms/config/JmsListenerEndpointRegistry.html 您可能依赖 jmsListener 容器配置和错误处理程序 JmsListener也实现了SmartLifeCycle接口
顺便说一句,您不必为每条消息创建一个 WebClient。定义一个bean并注入它