我有一个接收异步消息的多线程应用程序。如何等到收到消息(或超时)?是否有更好的方法使用 wait() 和 notify() 来做到这一点?
我试着用带有原子引用的原始线程睡眠来做到这一点。
class SignalReceiver{
String message;
Boolean messageReceived; // used AtomicBoolean
void receive(String message){
this.message = message;
messageReceived = true; // set message flag
}
void waitTillMessageReceived(long timeout){
if(!messageReceived){ // message could be received before
while(!messageReceived){
Thread.sleep(100);
// wait only till timeout
}
}
messageReceived = false; // reset message flag
}
}
这是一个带有等待/通知的例子
公共类 SignalReceiver{ 字符串消息;
public static void main(String[] args) {
final SignalReceiver sr=new SignalReceiver();
new Thread(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
}
System.out.println("message sent");
sr.receive("hello");
}).start();;
System.out.println("waiting...");
String mess=sr.waitTillMessageReceived(5000L);
System.out.println("message: "+mess);
}
public synchronized void receive(String message){
this.message = message;
notify();
}
public synchronized String waitTillMessageReceived(long timeout){
try {
wait(timeout);
}
catch (InterruptedException e) {
}
return message;
}
}
这个例子提出了一些问题。例如,它不能正确处理许多同步消息的到达。 正如评论中所建议的,最好使用适当的同步类,如 java.util.concurrent.LinkedBlockingDeque。
BlockingQueue
我遵循了Louis Wasserman和markspace提供的线索,建议使用队列。
BlockingQueue
,它 (a) 是线程安全的,并且 (b) 在添加和删除元素时可以阻塞。我从该接口的 Javadoc 上给出的示例代码开始。
对于
BlockingQueue
接口的实现,我选择了ArrayBlockingQueue
。
类生产和消费随机生成的UUID 值。
这里是生产者类。在无限循环中,
run
方法尝试将 UUID 对象添加到队列中。如果队列当前繁忙,则尝试添加块,并在队列可用时继续。如果被中断,例如被执行者服务关闭,这个run
方法完成,从而结束提交的任务。
package work.basil.example.async;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
public class Producer implements Runnable
{
// Member fields.
private final BlockingQueue < UUID > queue;
// Constructor.
Producer ( BlockingQueue < UUID > q ) { queue = q; }
public void run ( )
{
System.out.println( "`Producer` object is starting its `run` method. " + Instant.now() );
try
{
while ( true )
{
// Sleep, to simulate some lengthy work.
Duration timeToSleep = Duration.ofMillis( ThreadLocalRandom.current().nextInt( 1_000 , 3_000 ) );
try { Thread.sleep( timeToSleep ); } catch ( InterruptedException e ) { break; }
queue.put( produce() ); // Post a new value to our BlockingQueue.
}
}
catch ( InterruptedException e )
{
// Could be interrupted by an executor service closing, or other reasons.
// Simply let this `Runnable` object end its `run` method.
// No code needed here.
}
System.out.println( "`Producer` object is ending its `run` method, after being interrupted. " + Instant.now() );
}
// Logic
UUID produce ( )
{
UUID uuid = UUID.randomUUID();
System.out.println( "Producing UUID = " + uuid + " at " + Instant.now() );
return uuid;
}
}
还有消费阶层。
run
方法请求队列中的下一个可用元素。调用会阻塞,直到元素可用为止。在等待期间,如果执行程序服务关闭等中断发生,run
方法完成,从而结束提交的任务。
package work.basil.example.async;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable
{
private final BlockingQueue < UUID > queue;
Consumer ( BlockingQueue < UUID > q ) { queue = q; }
public void run ( )
{
System.out.println( "`Consumer` object is starting its `run` method. " + Instant.now() );
try
{
while ( true ) { consume( queue.take() ); }
}
catch ( InterruptedException e )
{
// Could be interrupted by an executor service closing, or other reasons.
// Simply let this `Runnable` object end its `run` method.
// No code needed here.
}
System.out.println( "`Consumer` object is ending its `run` method, after being interrupted. " + Instant.now() );
}
void consume ( UUID uuid )
{
System.out.println( "Consuming UUID: " + uuid + " at " + Instant.now() );
}
}
还有一个应用程序类来演示队列的运行情况。
请注意,您有责任在某个时候优雅地关闭您的执行程序服务,至少在您的应用程序执行结束时。否则线程的后备池可能会无限期地运行,就像僵尸🧟u200d♂️。
package work.basil.example.async;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
public class App
{
public static void main ( String[] args )
{
System.out.println( "INFO - Demo start. " + Instant.now() );
BlockingQueue q = new ArrayBlockingQueue( 10 , true );
Consumer c = new Consumer( q );
Producer p = new Producer( q );
ExecutorService executorService = Executors.newFixedThreadPool( 2 );
executorService.submit( c );
executorService.submit( p );
// Let the background threads do their work for a while.
try { Thread.sleep( Duration.ofSeconds( 8 ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "INFO - Main thread is interrupting tasks run by executor service." );
executorService.shutdownNow();
try
{
if ( ! executorService.awaitTermination( 30 , TimeUnit.SECONDS ) )
{
System.err.println( "Executor service failed to terminate. " + Instant.now() );
}
}
catch ( InterruptedException e ) { throw new RuntimeException( e ); }
System.out.println( "INFO - Demo end. " + Instant.now() );
}
}
这似乎有效。
INFO - Demo start. 2023-02-28T07:24:03.603169Z
`Consumer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
`Producer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
Producing UUID = 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.855110Z
Consuming UUID: 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.863520Z
Producing UUID = 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.678767Z
Consuming UUID: 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.679680Z
Producing UUID = af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337011Z
Consuming UUID: af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337917Z
Producing UUID = 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.927173Z
Consuming UUID: 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.928794Z
INFO - Main thread is interrupting tasks run by executor service.
`Consumer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619844Z
`Producer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619725Z
INFO - Demo end. 2023-02-28T07:24:11.621988Z
警告: 我不是并发问题的专家。使用风险自负。