我写了这个类来读写RabbitMQ频道我删除了一些行以缩短代码。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class RabbitMQ {
/**
* Creates a new RabbitMQ Channel binding to localhost
* @param channel The Queue name the Channel binds to
* @return The newly created Channel
* @throws IOException
* @throws TimeoutException
*/
private static Channel createChannel(String channel) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection(); Channel newChannel = connection.createChannel()) {
newChannel.queueDeclare(channel, true, false, false, null);
return newChannel;
}
}
/**
* Writes to a Queue
* @param channel The name of the queue
* @param message The message to send
* @throws IOException
* @throws TimeoutException
*/
public static void write(String channel, String message) throws IOException, TimeoutException {
Channel channelOut = createChannel(channel);
channelOut.basicPublish("", message, null, message.getBytes());
}
}
为了测试我称之为的功能:
new Thread(() -> {
try {
RabbitMQ.write("test", "testmessage");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}).start();
执行控制台时会抛出此异常:
Exception in thread "Thread-0" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685)
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:197)
at de.flow.command.rabbitmq.RabbitMQ.write(RabbitMQ.java:52)
[RabbitMQ.java:52
指的是newChannel.queueDeclare(channel, true, false, false, null);
当我列出所有带有./rabbitmqctl.bat list_queues
的活动队列时,将列出按代码创建的队列。
我不明白为什么它在创建时会关闭而没有任何错误。
我通过在createChannel-Method中删除“ try”解决了该问题
private static Channel createChannel(String channel) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel newChannel = connection.createChannel();
newChannel.queueDeclare(channel, false, false, false, null);
return newChannel;
}