响应式 API 需要调用并行的下游 API 并等待响应

问题描述 投票:0回答:1

我有许多设备(目前有八个,但将来可能会更多),它们带有简单的阻塞 HTTP REST API。进行 GET 调用来给出一个值,API 会响应成功或失败,但始终返回 200。还有另一个 GET 来查找当前设置。因此,典型的顺序是:

http://\<some address\>/devicename/set/10

http://\<some address\>/devicename/get

我有一个 Spring Boot 反应式 API 来并行调用当前使用的这些下游 API

Mono.zip()
。我只有
set()
编码。当我调用反应式 API 时,它会进行所有
set()
调用,并向反应式 API 调用者发回显示结果的响应。

我遇到的问题是:我需要以编程方式等待所有

set()
调用的结果,执行
get()
调用,等待这些响应,然后可选地执行更多轮
set() -> get()
直到所有值都在合规性或重试次数已用完。

我尝试了多种变体:

  1. block()
    - 运行时出现
    block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
  2. 错误
  3. CountDownLatch
    位于
    subscribe()
    内 - 永远不会完成且不会递减锁存器
  4. subscribe()
    (几种变体)- 仍然不等待所有回复。

我觉得我错过了一些微不足道的东西。显然,在反应式 API 中的某个地方,等待正在发生,因为它总是带着所有已完成的

set()
响应返回给调用者。通常,这需要 30 到 40 秒才能完成,这是预期的,也是可以的。

我需要能够在用户代码中执行此操作,并启动提到的其他调用。我用浏览器访问了响应式 API,老实说,即使浏览器调用超时,也没关系,只要工作完成即可。在所有情况下,我确实需要保留对下游的并行调用,因为每个设备都可以同时执行工作。

java spring-boot rest reactive-programming
1个回答
0
投票

不需要反应式

反应式编程是一种过渡技术,旨在最大限度地提高多线程工作负载的性能。响应式编程令人困惑,难以推理,而且几乎不可能调试。

随着 Java 的 Project Loom 中虚拟线程的创建,对响应式的需求很大程度上消失了。请参阅 Java 21 虚拟线程是否解决了切换到反应式单线程框架的主要原因?,以及 Brian Goetz 的视频

通过虚拟线程,您可以获得反应式的高性能,而且还可以获得简单代码的简单性,此外,您还可以利用我们现代工具的所有调试和线程跟踪功能。

您的应用场景

在您的情况下,您似乎有一堆想要同时运行的“Setter”任务。所有这些任务完成后,您需要运行一堆“Getter”任务来验证所有设置的结果。因此,显然您遇到的情况是,那些“Setter”任务可能会失败,或者他们的工作可能会被其他一些未指定的进程撤消。

执行服务

要同时执行一堆任务,请在 Java 5 及更高版本中使用

ExecutorService

如果这些任务涉及阻塞,例如文件 I/O、数据库访问、日志记录、套接字连接、Web 服务调用等,请在 Java 21 及更高版本中使用由

虚拟线程
支持的 ExecutorService 实现。

尝试资源语法

在 Java 19 及更高版本中,

ExecutorService
对象是
AutoCloseable
。这意味着您可以使用方便的 try-with-resources 语法在提交的任务完成后自动关闭
ExecutorService

因此,在您的情况下,您可以在 try-with-resources 中使用

ExecutorService
来运行第一组任务。然后

示例应用程序

这里有一些示例代码来演示。

首先是一个简单的

Device
类,其中包含一个可变字段,可以由我们的
Setter
任务更改并由我们的
Getter
任务检索。

package work.basil.example.devices;

import java.util.UUID;

public class Device
{
    final UUID id;
    boolean setting;  // Mutable.

    public Device ( final UUID aDeviceId , final boolean aSetting )
    {
        this.id = aDeviceId;
        this.setting = aSetting;
    }
}

还有

Polling
课程:

  1. 运行一堆 Setter 任务
  2. Setter
    任务完成后,运行一堆 Getter 任务。
  3. 验证所有结果是否符合我们的预期/希望。
package work.basil.example.devices;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class Polling
{    // Fields
    final private Collection < Device > devices;

    // Constructor
    public Polling ( final Collection < Device > devices )
    {
        this.devices = devices;
        System.out.println ( "devices = " + devices );

        // Run Setting tasks
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
                )
        {
            // Make a Setter task for each device. Submit task to executor service.
            devices.stream ( ).map ( Setter :: new ).forEach ( executorService :: submit );
        }
        // At this point the flow-of-control stops until all submitted tasks are done.

        // Run Getting tasks.
        List < Future < Boolean > > futures = new ArrayList <> ( );
        try
                (
                        ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
                )
        {
            // Make a Setter task for each device. Submit task to executor service.
            for ( Device device : devices )
            {
                futures.add ( executorService.submit ( new Getter ( device ) ) );
            }
        }
        // At this point the flow-of-control stops until all submitted tasks are done.

        long countDevicesNotSetToTrue = futures.stream ( ).filter ( future -> !future.resultNow ( ) ).count ( );
        if ( countDevicesNotSetToTrue > 0 )
        {
            System.out.println ("Oops, not all set. Try again." );
        }
    }
}

// Getter task
class Getter implements Callable < Boolean >
{
    private final Device device;

    public Getter ( final Device aDevice ) { this.device = aDevice; }

    @Override
    public Boolean call ( )
    {
        // Sleep to simulate some blocking work, such as network calls, logging, database access, file I/O, etc.
        try { Thread.sleep ( Duration.ofSeconds ( ThreadLocalRandom.current ( ).nextInt ( 7 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }

        boolean val = this.device.setting;
        System.out.println ( "Getting " + val + " for " + this.device.id );
        return val;
    }
}

// Setter task
class Setter implements Runnable
{
    private final Device device;

    public Setter ( final Device device ) { this.device = device; }

    @Override
    public void run ( )
    {
        // Sleep to simulate some blocking work, such as network calls, logging, database access, file I/O, etc.
        try { Thread.sleep ( Duration.ofSeconds ( ThreadLocalRandom.current ( ).nextInt ( 7 ) ) ); } catch ( InterruptedException e ) { throw new RuntimeException ( e ); }

        // Set the setting, but not truthfully. Simulate the situation implied in the Question where the setting either fails or is later flipped.
        this.device.setting = ThreadLocalRandom.current ( ).nextBoolean ( ); // Simulate the situation where setting may fail or may be changed later.
        System.out.println ( "Setting device value now for: " + this.device.id + ". Tried to set to `true`` but actually set to: " + this.device.setting );
    }
}

最后我们有一个

App
类来完成这个任务。我们实例化几个
Device
对象,并将其提交给
Polling
对象。

package work.basil.example.devices;

import java.time.Instant;
import java.util.List;
import java.util.UUID;

public class App
{
    public static void main ( String[] args )
    {
        System.out.println ("Hello World! "+ Instant.now() );
        App app = new App();
        app.demo();
    }

    private void demo ( ) {
        Polling polling = new Polling (
                List.of (
                        new Device ( UUID.fromString ( "b60f3152-f425-4f50-b82e-28cc66acd650" ) , true ) ,
                        new Device ( UUID.fromString ( "a024571b-fc33-4026-b1ec-271f38f15e3f" ) , true ) ,
                        new Device ( UUID.fromString ( "2dd8f151-5351-4c14-a283-a5a40908009e" ) , true ) ,
                        new Device ( UUID.fromString ( "73a3dcfb-088d-4361-bebc-ab627f72589a" ) , true ) ,
                        new Device ( UUID.fromString ( "4d3cb1f3-9203-4034-b4de-ef84e6465dac" ) , true ) ,
                        new Device ( UUID.fromString ( "8a3b3cca-f9bf-47ed-8814-9074f0b346c7" ) , true ) ,
                        new Device ( UUID.fromString ( "b1c169fc-be87-4732-a0e9-bcb427715625" ) , true )
                )
        );
    }
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.