我正在编写一个面向消息的中间件,其中订阅者和发布者通过中间类 MessageBroker 中的队列进行通信。如果主题队列未满,发布者应将消息放入其中;如果队列不为空,订阅者应获取订阅主题的消息。当我尝试将 wait() NotifyAll() 包含在订阅者的 receiveMessage() 方法中时,问题就出现了。对于发布者,它可以正常工作,没有任何问题,但是对于订阅者,我遇到的问题是它们没有从等待状态中检索,因此它们什么都不做。
发布方式:
public synchronized void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
try {
// Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
while (topicQueue.remainingCapacity() == 0) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Message m = topic.generateMessage();
mb.publish(m);
if (!gotActive) {
mb.increasePublisherCounter(1);
gotActive = true;
}
System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
notifyAll(); // Alle Threads benachrichtigen
}
}
订阅方式:
public synchronized void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
synchronized (queue) {
try {
// Warten, bis die Warteschlange nicht mehr leer ist
while (queue.isEmpty()) {
queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
}
// Nachricht aus der Warteschlange holen
Message message = queue.peek();
if (message != null) {
System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
incrementProcessedCounter(topic);
if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
queue.remove();
resetProcessedCounter(topic);
queue.notifyAll(); // Benachrichtige andere wartende Threads
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
我必须对这些方法进行哪些更改才能使这些方法起作用?感谢您的帮助。 如果您需要,我还可以添加更多代码:)
当我尝试在订阅者的 receiveMessage() 方法中包含 wait() NotifyAll() 时。对于发布者,它可以正常工作,没有任何问题,但是对于订阅者,我遇到的问题是它们没有从等待状态中检索,因此它们什么也不做。
我对您的代码有很多疑问,但第一条评论是您不必等待/通知自己。
BlockingQueue
的好处之一是它们完全同步。 调用 queue.take()
将阻塞,直到有消息要消费,而 queue.put()
将等待,直到队列中有空间放置它。
public void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
Message m = topic.generateMessage();
topicQueue.put(m);
// ... counters and the like?
}
}
和
public void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
Message message = queue.take();
// ... work with the message
}
}
至于为什么你的代码不起作用,很难说。 以下是一些评论:
topicQueue
中的 sendMessage()
中。peek()
和 remove()
可以通过 poll()
来完成,它可以完成这两件事,如果队列中没有消息则返回 null
。 再次take()
等待。InterruptedException
使用好的图案。 你至少应该在 catch 块中做一个 Thread.currentThread().interrupt();
但你也应该返回或跳出循环或其他东西。希望这里有帮助。