我有许多设备(目前有八个,但将来可能会更多),它们带有简单的阻塞 HTTP REST API。进行 GET 调用来给出一个值,API 会响应成功或失败,但始终返回 200。还有另一个 GET 来查找当前设置。因此,典型的顺序是:
http://\<some address\>/devicename/set/10
http://\<some address\>/devicename/get
我有一个 Spring Boot 反应式 API 来并行调用当前使用的这些下游 API
Mono.zip()
。我只有 set()
编码。当我调用反应式 API 时,它会进行所有 set()
调用,并向反应式 API 调用者发回显示结果的响应。
我遇到的问题是:我需要以编程方式等待所有
set()
调用的结果,执行 get()
调用,等待这些响应,然后可选地执行更多轮 set() -> get()
直到所有值都在合规性或重试次数已用完。
我尝试了多种变体:
block()
- 运行时出现 block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
CountDownLatch
位于 subscribe()
内 - 永远不会完成且不会递减锁存器subscribe()
(几种变体)- 仍然不等待所有回复。我觉得我错过了一些微不足道的东西。显然,在反应式 API 中的某个地方,等待正在发生,因为它总是带着所有已完成的
set()
响应返回给调用者。通常,这需要 30 到 40 秒才能完成,这是预期的,也是可以的。
我需要能够在用户代码中执行此操作,并启动提到的其他调用。我用浏览器访问了响应式 API,老实说,即使浏览器调用超时,也没关系,只要工作完成即可。在所有情况下,我确实需要保留对下游的并行调用,因为每个设备都可以同时执行工作。
反应式编程是一种过渡技术,旨在最大限度地提高多线程工作负载的性能。响应式编程令人困惑,难以推理,而且几乎不可能调试。
随着 Java 的 Project Loom 中虚拟线程的创建,对响应式的需求很大程度上消失了。请参阅 Java 21 虚拟线程是否解决了切换到反应式单线程框架的主要原因?,以及 Brian Goetz 的视频。
通过虚拟线程,您可以获得反应式的高性能,而且还可以获得简单代码的简单性,此外,您还可以利用我们现代工具的所有调试和线程跟踪功能。
在您的情况下,您似乎有一堆想要同时运行的“Setter”任务。所有这些任务完成后,您需要运行一堆“Getter”任务来验证所有设置的结果。因此,显然您遇到的情况是,那些“Setter”任务可能会失败,或者他们的工作可能会被其他一些未指定的进程撤消。
ExecutorService
。
如果这些任务涉及阻塞,例如文件 I/O、数据库访问、日志记录、套接字连接、Web 服务调用等,请在 Java 21 及更高版本中使用由
虚拟线程支持的
ExecutorService
实现。
ExecutorService
对象是 AutoCloseable
。这意味着您可以使用方便的 try-with-resources 语法在提交的任务完成后自动关闭 ExecutorService
。
因此,在您的情况下,您可以在 try-with-resources 中使用
ExecutorService
来运行第一组任务。然后
这里有一些示例代码来演示。
首先是一个简单的
Device
类,其中包含一个可变字段,可以由我们的 Setter
任务更改并由我们的 Getter
任务检索。
package work.basil.example.devices;
import java.util.UUID;
public class Device
{
final UUID id;
boolean setting; // Mutable.
public Device ( final UUID aDeviceId , final boolean aSetting )
{
this.id = aDeviceId;
this.setting = aSetting;
}
}
还有
Polling
课程:
Setter
任务完成后,运行一堆 Getter 任务。package work.basil.example.devices;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class Polling
{ // Fields
final private Collection < Device > devices;
// Constructor
public Polling ( final Collection < Device > devices )
{
this.devices = devices;
System.out.println ( "devices = " + devices );
// Run Setting tasks
try
(
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
)
{
// Make a Setter task for each device. Submit task to executor service.
devices.stream ( ).map ( Setter :: new ).forEach ( executorService :: submit );
}
// At this point the flow-of-control stops until all submitted tasks are done.
// Run Getting tasks.
List < Future < Boolean > > futures = new ArrayList <> ( );
try
(
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
)
{
// Make a Setter task for each device. Submit task to executor service.
for ( Device device : devices )
{
futures.add ( executorService.submit ( new Getter ( device ) ) );
}
}
// At this point the flow-of-control stops until all submitted tasks are done.
long countDevicesNotSetToTrue = futures.stream ( ).filter ( future -> !future.resultNow ( ) ).count ( );
if ( countDevicesNotSetToTrue > 0 )
{
System.out.println ("Oops, not all set. Try again." );
}
}
}
// Getter task
class Getter implements Callable < Boolean >
{
private final Device device;
public Getter ( final Device aDevice ) { this.device = aDevice; }
@Override
public Boolean call ( )
{
// Sleep to simulate some blocking work, such as network calls, logging, database access, file I/O, etc.
try { Thread.sleep ( Duration.ofSeconds ( ThreadLocalRandom.current ( ).nextInt ( 7 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
boolean val = this.device.setting;
System.out.println ( "Getting " + val + " for " + this.device.id );
return val;
}
}
// Setter task
class Setter implements Runnable
{
private final Device device;
public Setter ( final Device device ) { this.device = device; }
@Override
public void run ( )
{
// Sleep to simulate some blocking work, such as network calls, logging, database access, file I/O, etc.
try { Thread.sleep ( Duration.ofSeconds ( ThreadLocalRandom.current ( ).nextInt ( 7 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }
// Set the setting, but not truthfully. Simulate the situation implied in the Question where the setting either fails or is later flipped.
this.device.setting = ThreadLocalRandom.current ( ).nextBoolean ( ); // Simulate the situation where setting may fail or may be changed later.
System.out.println ( "Setting device value now for: " + this.device.id + ". Tried to set to `true`` but actually set to: " + this.device.setting );
}
}
最后我们有一个
App
类来完成这个任务。我们实例化几个 Device
对象,并将其提交给 Polling
对象。
package work.basil.example.devices;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
public class App
{
public static void main ( String[] args )
{
System.out.println ("Hello World! "+ Instant.now() );
App app = new App();
app.demo();
}
private void demo ( ) {
Polling polling = new Polling (
List.of (
new Device ( UUID.fromString ( "b60f3152-f425-4f50-b82e-28cc66acd650" ) , true ) ,
new Device ( UUID.fromString ( "a024571b-fc33-4026-b1ec-271f38f15e3f" ) , true ) ,
new Device ( UUID.fromString ( "2dd8f151-5351-4c14-a283-a5a40908009e" ) , true ) ,
new Device ( UUID.fromString ( "73a3dcfb-088d-4361-bebc-ab627f72589a" ) , true ) ,
new Device ( UUID.fromString ( "4d3cb1f3-9203-4034-b4de-ef84e6465dac" ) , true ) ,
new Device ( UUID.fromString ( "8a3b3cca-f9bf-47ed-8814-9074f0b346c7" ) , true ) ,
new Device ( UUID.fromString ( "b1c169fc-be87-4732-a0e9-bcb427715625" ) , true )
)
);
}
}