为了简单起见,我将省略一些细节并尝试突出要点:
我有一个消耗用户请求的servlet,当用户发送请求时,我需要从许多端点收集数据。一个用户可以拥有来自多个来源的数据,因此,我需要从所有来源获取所有数据并将其响应给用户。要从一个端点获取数据,我需要调用 HTTP URL。这需要大量的时间。假设从 1 秒到 20 分钟。
例如,现在我们有一个用户 Jonh,他有 3 个文档来源。他从我的 servlet 请求他的文档,我开始收集文档。我请求数据并将其收集在列表中,然后将其回复给约翰。显然,约翰再等一段时间,他就会生气。这是一个例子:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@RestController
public class SearchDocumentsController {
@GetMapping("/search/{userId}")
public List<String> searchDocuments(@PathVariable String userId) {
List<String> connections = getUserConnections(userId);
List<String> documents = syncDocuments(userId, connections);
return documents;
}
// Simulate a method that retrieves user connections
private List<String> getUserConnections(String userId) {
return List.of("one", "two", "three");
}
// This method is the one that takes a long time to execute
public List<String> syncDocuments(String userId, List<String> connections) {
List<String> documents = new ArrayList<>();
for (String connection : connections) {
List<String> documentsFromOneConnection = searchInDataSource(userId, connection);
documents.addAll(documentsFromOneConnection);
}
return documents;
}
// Simulate a slow data source
private List<String> searchInDataSource(String userId, String connection) {
List<String> documents = new ArrayList<>();
for (int i = 0; i < 3; i++) {
sleepSeconds(3);
documents.add("doc " + i + " from " + connection + " for " + userId);
}
return documents;
}
private void sleepSeconds(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
27秒结束。我的任务是将这个过程切换为并行。当我收到请求时,我应该创建 3 个线程(或其他数量)并并行运行它们。它可以是这样的:
@GetMapping("/search/{userId}")
public List<String> searchDocuments(@PathVariable String userId) {
long start = System.currentTimeMillis();
List<String> connections = getUserConnections(userId);
List<String> documents = asyncDocuments(userId, connections);
long end = System.currentTimeMillis();
System.out.println("Time taken: " + (end - start) / 1000 + " seconds");
return documents;
}
// This method is the one that takes a long time to execute
public List<String> asyncDocuments(String userId, List<String> connections) {
List<CompletableFuture<List<String>>> futures = new ArrayList<>();
for (String connection : connections) {
futures.add(CompletableFuture.supplyAsync(() -> searchInDataSource(userId, connection)));
}
// Wait for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();//block until all futures are completed
//By tis point all futures are completed
List<String> collectedResults = new ArrayList<>();
for (CompletableFuture<List<String>> future : futures) {
collectedResults.addAll(future.join()); //join() will return the result of the future without blocking
}
return collectedResults;
}
它的任务时间为 9 秒。在实际项目中,我在 CompletableFuture 中使用我的执行器服务,它不是 spring boot,而是一个简单的 servlet。
问题: 从 servlet 生成一组线程可以吗?有没有更好的方法来获得相同的结果?主要关注的是资源。我使用全局线程池,所以有一个边界,之后我需要返回当时找到的所有文档,但无论如何它看起来都是错误的。我在实践中读到了Java并发,这是错误的(我不记得章节了),我看到了一些SO问题,这不是一个好方法,但我正在寻找一个好的解释,参考书籍,文章等而且,如果有人知道如何解决此任务的替代模式/方法/架构,我会听到并感激不已。
您发布的代码太多,难以轻松消化。但据我所知,这些要点可能会对您有所帮助。
使用执行器服务来管理您的线程。
ExecutorService
是 AutoCloseable
。因此,您可以方便地使用 try-with-resources 语法在任务完成后自动关闭执行器服务。
try (
ExecutorService executorService = Executors.… ;
) {
// Submit tasks.
}
// Flow of control blocks here until tasks are completed/cancelled.
为了简单和清晰起见,考虑为您的任务定义一个单独的类。实施
Runnable
或 Callable
;就你而言,Callable
。
package work.basil.example.threading.bunch;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
public class TextRetriever implements Callable < List < String > >
{
private final String userId;
private final String connection;
public TextRetriever ( final String userId , final String connection )
{
this.userId = userId;
this.connection = connection;
}
@Override
public List < String > call ( ) throws Exception
{
List < String > documents = new ArrayList <>( );
// Simulate hitting a remote service multiple times to retrieve text.
for ( int ordinal = 1 ; ordinal <= 3 ; ordinal++ )
{
int countSeconds = ThreadLocalRandom.current( ).nextInt( 2 , 6 );
Thread.sleep( Duration.ofSeconds( countSeconds ) );
documents.add( "Doc # " + ordinal + " from " + connection + " for " + userId + " in thread " + Thread.currentThread().threadId() );
}
return List.copyOf( documents ); // Return unmodifiable list, as general good habit.
}
}
接下来我们编写一个应用程序来创建虚拟数据,然后实例化一堆任务,最后将它们全部提交给执行器服务。我们捕获返回的
Future
对象集合来跟踪任务的进度和结果。
如果您的任务涉及阻塞代码,例如对外部服务进行网络调用,请使用 虚拟线程 以获得最佳性能。在这里,我们使用
Executors
来获得 ExecutorService
的实现,为每个任务创建一个新的虚拟线程。
package work.basil.example.threading.bunch;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
public class App
{
public static void main ( String[] args )
{
App app = new App( );
app.demo( );
}
private void demo ( )
{
System.out.println("INFO - Demo start. Wait several seconds. " + Instant.now() );
List < String > userIds = IntStream.rangeClosed( 101 , 105 ).mapToObj( String :: valueOf ).toList( );
List < String > connections = List.of( "Alpha" , "Beta" , "Gamma" , "Delta" , "Epsilon" );
Collection < TextRetriever > tasks = new ArrayList <>( userIds.size( ) );
for ( int index = 0 ; index < userIds.size( ) ; index++ )
{
TextRetriever task = new TextRetriever( userIds.get( index ) , connections.get( index ) );
tasks.add( task );
}
List < Future < List < String > > > futures = List.of();
try (
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor( ) ;
)
{
try
{
futures = executorService.invokeAll( tasks );
} catch ( InterruptedException e ) { throw new RuntimeException( e ); }
}
// Flow-of-control blocks here, until tasks are all completed/failed.
for ( Future < List < String > > future : futures )
{
try
{
List < String > result = future.get();
System.out.println( "result = " + result );
} catch ( InterruptedException e )
{
throw new RuntimeException( e );
} catch ( ExecutionException e )
{
throw new RuntimeException( e );
}
}
System.out.println("INFO - Demo end. " + Instant.now() );
}
}
此代码有效。
INFO - Demo start. Wait a few seconds. 2024-07-16T18:49:57.722209Z
result = [Doc # 1 from Alpha for 101 in thread 21, Doc # 2 from Alpha for 101 in thread 21, Doc # 3 from Alpha for 101 in thread 21]
result = [Doc # 1 from Beta for 102 in thread 23, Doc # 2 from Beta for 102 in thread 23, Doc # 3 from Beta for 102 in thread 23]
result = [Doc # 1 from Gamma for 103 in thread 24, Doc # 2 from Gamma for 103 in thread 24, Doc # 3 from Gamma for 103 in thread 24]
result = [Doc # 1 from Delta for 104 in thread 25, Doc # 2 from Delta for 104 in thread 25, Doc # 3 from Delta for 104 in thread 25]
result = [Doc # 1 from Epsilon for 105 in thread 26, Doc # 2 from Epsilon for 105 in thread 26, Doc # 3 from Epsilon for 105 in thread 26]
INFO - Demo end. 2024-07-16T18:50:08.747660Z
但是,您可以通过让每个任务创建子任务,然后将其提交给另一个任务来压缩更多性能
ExecutorService
。网络调用“非常”慢。因此,旋转如此多的任务是有道理的,特别是对于对 CPU 和内存几乎没有要求的非常“便宜”的虚拟线程。