org.apache.activemq.transport.InactivityIOException:无法发送,通道已经失败

问题描述 投票:0回答:4

我正在使用apache的activemq进行排队。当向队列写入数据时,我们开始经常看到以下异常:

Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: 
    at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:282)
    at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:271)
    at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
    at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
    at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1366)

我不知道是什么原因导致了这个问题——或者坦率地说,我什至不知道从哪里开始调试导致这个问题的原因。

这是队列设置代码:

    camelContext = new DefaultCamelContext();
    camelContext.setErrorHandlerBuilder(new LoggingErrorHandlerBuilder());
    camelContext.getShutdownStrategy().setTimeout(SHUTDOWN_TIMEOUT_SECONDS);

    routePolicy = new RoutePolicy();
    routePolicy.setCamelContext(camelContext);

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(queueUri);
    // use a pooled connection factory between the module and the queue
    pooledConnectionFactory = new PooledConnectionFactory(connectionFactory);

    // how many connections should there be in the session pool?
    pooledConnectionFactory.setMaxConnections(this.maxConnections);
    pooledConnectionFactory.setMaximumActiveSessionPerConnection(this.maxActiveSessionPerConnection);
    pooledConnectionFactory.setCreateConnectionOnStartup(true);
    pooledConnectionFactory.setBlockIfSessionPoolIsFull(false);

    JmsConfiguration jmsConfiguration = new JmsConfiguration(pooledConnectionFactory);
    jmsConfiguration.setDeliveryPersistent(false); // do not store a copy of the messages on the queue

    ActiveMQComponent activeMQComponent = ActiveMQComponent.activeMQComponent(queueUri);
    activeMQComponent.setConfiguration(jmsConfiguration);
    camelContext.addComponent("activemq", activeMQComponent);
    Component activemq = camelContext.getComponent("activemq");

    // register endpoints for queues and topics
    Endpoint queueEndpoint = activemq.createEndpoint("activemq:queue:polaris.*");
    Endpoint topicEndpoint = activemq.createEndpoint("activemq:topic:polaris.*");
    producerTemplate = camelContext.createProducerTemplate();

    camelContext.start();
    queueEndpoint.start();
    topicEndpoint.start();

就像我说的,该错误并不建议任何调试方向,而且在我可以确定我的配置设置不正确的情况下,100% 的情况下都不会发生这种情况。

java apache-camel activemq-classic
4个回答
4
投票
  • 管理端口:61616(默认)
  • 服务端口:8161(默认)

将您的代理 URL 端口更改为 61616 并运行

参考这个


3
投票

最近我遇到了同样的问题。我发现了这个https://issues.apache.org/jira/browse/AMQ-6600

当类路径中缺少其中一个 jar 时,Apache ActiveMQ 客户端会抛出 InactivityIOException。就我而言,它是 hawtbuf-1.11.jar。当我将此 jar 添加到类路径时,它开始正常工作,没有错误。


1
投票

检查是否有非 Jms 客户端 ping 您的 JMS 代理。这可能是外部监控工具、负载平衡工具(例如 keepalived)或其他工具。


0
投票

我正在使用 Artemis 2.33.0 并编写了一个生成短信的应用程序。这些是我需要的最小罐子才能工作:

  • jakarta.jms-api-2.0.3.jar
  • activemq-client-5.18.3.jar
  • slf4j-api-2.0.11.jar
  • hawtbuf-1.11.jar

我从 artemis 安装的 lib 文件夹中获取了 jar。 删除 slf4j-api-2.0.11.jar 后,应用程序可以编译,但在运行时抛出异常。 删除 hawtbuf-1.11.jar 后,应用程序可以编译,但在运行时抛出异常:“...javax.jms.JMSException:无法发送,通道已失败:tcp://127.0.0.1:61616...”

这是代码:

package firstartemis;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

public class ArtemisProducer {

private static final Logger LOG = Logger.getLogger(ArtemisProducer.class.getName());

private void go() throws NamingException, JMSException {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    
    ActiveMQConnection connection = (org.apache.activemq.ActiveMQConnection) connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    ActiveMQQueue destination = (org.apache.activemq.command.ActiveMQQueue) session.createQueue("qname");
    MessageProducer producer = (MessageProducer) session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    for (int i = 0; i < 120; i++) {
        String text = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(Calendar.getInstance().getTime());
        TextMessage message = (TextMessage) session.createTextMessage(text);
        producer.send(message);
        LOG.info("mesage sent");
        try {
            Thread.sleep(500);
        } catch (InterruptedException ex) {
            LOG.info("interrupted.");
        }
    }
    
    session.close();
    connection.close();
}

public static void main(String[] args) {
    try {
        new ArtemisProducer().go();
    } catch (Exception ex) {
        Logger.getLogger(ArtemisProducer.class.getName()).log(Level.SEVERE, null, ex);
    }
}

}

© www.soinside.com 2019 - 2024. All rights reserved.