从 Servlet 生成一组新线程会遇到什么问题以及存在哪些替代方案?

问题描述 投票:0回答:1

为了简单起见,我将省略一些细节并尝试突出要点:

我有一个消耗用户请求的servlet,当用户发送请求时,我需要从许多端点收集数据。一个用户可以拥有来自多个来源的数据,因此,我需要从所有来源获取所有数据并将其响应给用户。要从一个端点获取数据,我需要调用 HTTP URL。这需要大量的时间。假设从 1 秒到 20 分钟。

例如,现在我们有一个用户 Jonh,他有 3 个文档来源。他从我的 servlet 请求他的文档,我开始收集文档。我请求数据并将其收集在列表中,然后将其回复给约翰。显然,约翰再等一段时间,他就会生气。这是一个例子:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

@RestController
public class SearchDocumentsController {

    @GetMapping("/search/{userId}")
    public List<String> searchDocuments(@PathVariable String userId) {
        List<String> connections = getUserConnections(userId);
        List<String> documents = syncDocuments(userId, connections);
        return documents;
    }

    // Simulate a method that retrieves user connections
    private List<String> getUserConnections(String userId) {
        return List.of("one", "two", "three");
    }

    // This method is the one that takes a long time to execute
    public List<String> syncDocuments(String userId, List<String> connections) {
        List<String> documents = new ArrayList<>();
        for (String connection : connections) {
            List<String> documentsFromOneConnection = searchInDataSource(userId, connection);
            documents.addAll(documentsFromOneConnection);
        }
        return documents;
    }

    // Simulate a slow data source
    private List<String> searchInDataSource(String userId, String connection) {
        List<String> documents = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            sleepSeconds(3);
            documents.add("doc " + i + " from " + connection + " for " + userId);
        }
        return documents;
    }

    private void sleepSeconds(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

27秒结束。我的任务是将这个过程切换为并行。当我收到请求时,我应该创建 3 个线程(或其他数量)并并行运行它们。它可以是这样的:

    @GetMapping("/search/{userId}")
    public List<String> searchDocuments(@PathVariable String userId) {
        long start = System.currentTimeMillis();
        List<String> connections = getUserConnections(userId);
        List<String> documents = asyncDocuments(userId, connections);
        long end = System.currentTimeMillis();
        System.out.println("Time taken: " + (end - start) / 1000 + " seconds");
        return documents;
    }

    // This method is the one that takes a long time to execute
    public List<String> asyncDocuments(String userId, List<String> connections) {
        List<CompletableFuture<List<String>>> futures = new ArrayList<>();

        for (String connection : connections) {
            futures.add(CompletableFuture.supplyAsync(() -> searchInDataSource(userId, connection)));
        }

        // Wait for all futures to complete
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();//block until all futures are completed
        //By tis point all futures are completed

        List<String> collectedResults = new ArrayList<>();
        for (CompletableFuture<List<String>> future : futures) {
            collectedResults.addAll(future.join()); //join() will return the result of the future without blocking
        }

        return collectedResults;
    }

它的任务时间为 9 秒。在实际项目中,我在 CompletableFuture 中使用我的执行器服务,它不是 spring boot,而是一个简单的 servlet。

问题: 从 servlet 生成一组线程可以吗?有没有更好的方法来获得相同的结果?主要关注的是资源。我使用全局线程池,所以有一个边界,之后我需要返回当时找到的所有文档,但无论如何它看起来都是错误的。我在实践中读到了Java并发,这是错误的(我不记得章节了),我看到了一些SO问题,这不是一个好方法,但我正在寻找一个好的解释,参考书籍,文章等而且,如果有人知道如何解决此任务的替代模式/方法/架构,我会听到并感激不已。

java multithreading design-patterns completable-future java-threads
1个回答
0
投票

您发布的代码太多,难以轻松消化。但据我所知,这些要点可能会对您有所帮助。

执行服务

使用执行器服务来管理您的线程。

在现代 Java 中,

ExecutorService
AutoCloseable
。因此,您可以方便地使用 try-with-resources 语法在任务完成后自动关闭执行器服务。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit tasks.
}
// Flow of control blocks here until tasks are completed/cancelled.

为了简单和清晰起见,考虑为您的任务定义一个单独的类。实施

Runnable
Callable
;就你而言,
Callable

package work.basil.example.threading.bunch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;

public class TextRetriever implements Callable < List < String > >
{
    private final String userId;
    private final String connection;

    public TextRetriever ( final String userId , final String connection )
    {
        this.userId = userId;
        this.connection = connection;
    }

    @Override
    public List < String > call ( ) throws Exception
    {
        List < String > documents = new ArrayList <>( );
        // Simulate hitting a remote service multiple times to retrieve text.
        for ( int ordinal = 1 ; ordinal <= 3 ; ordinal++ )
        {
            int countSeconds = ThreadLocalRandom.current( ).nextInt( 2 , 6 );
            Thread.sleep( Duration.ofSeconds( countSeconds ) );
            documents.add( "Doc # " + ordinal + " from " + connection + " for " + userId + " in thread " + Thread.currentThread().threadId() );
        }
        return List.copyOf( documents ); // Return unmodifiable list, as general good habit.
    }
}

接下来我们编写一个应用程序来创建虚拟数据,然后实例化一堆任务,最后将它们全部提交给执行器服务。我们捕获返回的

Future
对象集合来跟踪任务的进度和结果。

如果您的任务涉及阻塞代码,例如对外部服务进行网络调用,请使用 虚拟线程 以获得最佳性能。在这里,我们使用

Executors
来获得
ExecutorService
的实现,为每个任务创建一个新的虚拟线程。

package work.basil.example.threading.bunch;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;

public class App
{
    public static void main ( String[] args )
    {
        App app = new App( );
        app.demo( );
    }

    private void demo ( )
    {
        System.out.println("INFO - Demo start. Wait several seconds. " + Instant.now()  );

        List < String > userIds = IntStream.rangeClosed( 101 , 105 ).mapToObj( String :: valueOf ).toList( );
        List < String > connections = List.of( "Alpha" , "Beta" , "Gamma" , "Delta" , "Epsilon" );

        Collection < TextRetriever > tasks = new ArrayList <>( userIds.size( ) );
        for ( int index = 0 ; index < userIds.size( ) ; index++ )
        {
            TextRetriever task = new TextRetriever( userIds.get( index ) , connections.get( index ) );
            tasks.add( task );
        }

        List < Future < List < String > > > futures  = List.of();
        try (
                ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor( ) ;
        )
        {
            try
            {
                futures = executorService.invokeAll( tasks );
            } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
        }
        // Flow-of-control blocks here, until tasks are all completed/failed.
        for ( Future < List < String > > future : futures )
        {
            try
            {
                List < String > result = future.get();
                System.out.println( "result = " + result );
            } catch ( InterruptedException e )
            {
                throw new RuntimeException( e );
            } catch ( ExecutionException e )
            {
                throw new RuntimeException( e );
            }
        }

        System.out.println("INFO - Demo end. " + Instant.now()  );
    }
}

此代码有效。

INFO - Demo start. Wait a few seconds. 2024-07-16T18:49:57.722209Z
result = [Doc # 1 from Alpha for 101 in thread 21, Doc # 2 from Alpha for 101 in thread 21, Doc # 3 from Alpha for 101 in thread 21]
result = [Doc # 1 from Beta for 102 in thread 23, Doc # 2 from Beta for 102 in thread 23, Doc # 3 from Beta for 102 in thread 23]
result = [Doc # 1 from Gamma for 103 in thread 24, Doc # 2 from Gamma for 103 in thread 24, Doc # 3 from Gamma for 103 in thread 24]
result = [Doc # 1 from Delta for 104 in thread 25, Doc # 2 from Delta for 104 in thread 25, Doc # 3 from Delta for 104 in thread 25]
result = [Doc # 1 from Epsilon for 105 in thread 26, Doc # 2 from Epsilon for 105 in thread 26, Doc # 3 from Epsilon for 105 in thread 26]
INFO - Demo end. 2024-07-16T18:50:08.747660Z

但是,您可以通过让每个任务创建子任务,然后将其提交给另一个任务来压缩更多性能

ExecutorService
。网络调用“非常”慢。因此,旋转如此多的任务是有道理的,特别是对于对 CPU 和内存几乎没有要求的非常“便宜”的虚拟线程。

© www.soinside.com 2019 - 2024. All rights reserved.