Wait() 和 NotifyAll() 在程序中不起作用

问题描述 投票:0回答:1

我正在编写一个面向消息的中间件,其中订阅者和发布者通过中间类 MessageBroker 中的队列进行通信。如果主题队列未满,发布者应将消息放入其中;如果队列不为空,订阅者应获取订阅主题的消息。当我尝试将 wait() NotifyAll() 包含在订阅者的 receiveMessage() 方法中时,问题就出现了。对于发布者,它可以正常工作,没有任何问题,但是对于订阅者,我遇到的问题是它们没有从等待状态中检索,因此它们什么都不做。

发布方式:

public synchronized void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            try {
                // Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
                while (topicQueue.remainingCapacity() == 0) {
                    wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Message m = topic.generateMessage();
            mb.publish(m);
            if (!gotActive) {
                mb.increasePublisherCounter(1);
                gotActive = true;
            }
            System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
            notifyAll(); // Alle Threads benachrichtigen
        }
    }

订阅方式:

public synchronized void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            synchronized (queue) {
                try {
                    // Warten, bis die Warteschlange nicht mehr leer ist
                    while (queue.isEmpty()) {
                        queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
                    }

                    // Nachricht aus der Warteschlange holen
                    Message message = queue.peek();
                    if (message != null) {
                        System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
                        incrementProcessedCounter(topic);
                        if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
                            queue.remove();
                            resetProcessedCounter(topic);
                            queue.notifyAll(); // Benachrichtige andere wartende Threads
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

我必须对这些方法进行哪些更改才能使这些方法起作用?感谢您的帮助。 如果您需要,我还可以添加更多代码:)

java multithreading synchronization wait notify
1个回答
0
投票

当我尝试在订阅者的 receiveMessage() 方法中包含 wait() NotifyAll() 时。对于发布者,它可以正常工作,没有任何问题,但是对于订阅者,我遇到的问题是它们没有从等待状态中检索,因此它们什么也不做。

我对您的代码有很多疑问,但第一条评论是您不必等待/通知自己。

BlockingQueue
的好处之一是它们完全同步。 调用
queue.take()
将阻塞,直到有消息要消费,而
queue.put()
将等待,直到队列中有空间放置它。

public void sendMessage() {
    BlockingQueue<Message> topicQueue = mb.getQueue(topic);
    if (topicQueue != null) {
        Message m = topic.generateMessage();
        topicQueue.put(m);
        // ... counters and the like?
    }
}

public void receiveMessage() {
    for (Topic topic : topics) {
        BlockingQueue<Message> queue = mb.getQueue(topic);
        Message message = queue.take();
        // ... work with the message
     }
}

至于为什么你的代码不起作用,很难说。 以下是一些评论:

  • 我没有看到您将消息放入
    topicQueue
    中的
    sendMessage()
    中。
  • 要非常小心同步的内容。 如果发布者和订阅者正在等待另一方通知,则发布者和订阅者都需要在同一个对象实例上同步。
  • 请注意计数器和活动布尔值,如果它们由不同线程写入/读取,它们会正确同步。
  • A
    peek()
    remove()
    可以通过
    poll()
    来完成,它可以完成这两件事,如果队列中没有消息则返回
    null
    。 再次
    take()
    等待。
  • 通过
    InterruptedException
    使用好的图案。 你至少应该在 catch 块中做一个
    Thread.currentThread().interrupt();
    但你也应该返回或跳出循环或其他东西。

希望这里有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.