我在 Spring Boot Web 服务中安排了一项定期重复的任务 (@EnableSceduling)。当该任务触发时,它会调用注册对象的 Runnable/run 方法。在该运行方法中,我需要执行工作,并且在工作完成之前不要退出该运行方法。问题是我有其他线程正在执行此运行线程工作所需的其他工作。所以在运行线程中我有这样的东西:
@Component
public class DoWork implements Runnable {
@override
public void run() {
// Setup clients.
// Call services.
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
while(callbacksWorkCompletedFlag == false) {
Thread.sleep (1000);
}
// Do computation with callback responses.
// After computation is completed, exit run method.
}
}
public class MyResponseCallback implements Consumer<String> {
@override
public void accept (final Sting response) {
// Do work with response.
}
}
public class MyErrorCallback implements Consumer<Throwable> {
@override
public void accept (final Throwable error) {
// Log error.
}
}
在 Java/Spring boot 中是否有更好的方法来做到这一点?
这是使用
CompletableFuture
的示例。它使用 Mono.subscribe
的第三个参数来让未来知道它何时完成。
@Override
public void run() {
Mono<String> response1 = client1.post();
CompletableFuture<?> future1 = new CompletableFuture<>();
response1.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future1.complete(null));
Mono<String> response2 = client2.post();
CompletableFuture<?> future2 = new CompletableFuture<>();
response2.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future2.complete(null));
Mono<String> responseX = clientX.post();
CompletableFuture<?> futureX = new CompletableFuture<>();
responseX.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> futureX.complete(null));
CompletableFuture.allOf(future1, future2, futureX).join();
}
这是一个
CountDownLatch
示例:
@Override
public void run() {
CountDownLatch latch = new CountDownLatch(3);
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}