我有一个数据库,其中有一个带有链接的表。
我设法发现可以借助分区将列表拆分为更小的列表。 根据这篇文章,Partition类似乎是最快的(https://e.printstacktrace.blog/divide-a-list-to-lists-of-n-size-in-Java-8/)
将它们分成更小的列表后,我想使用这些链接并同时从它们中抓取数据。我本可以使用一个列表,然后:
linkList.parallelStream().forEach(link -> {
ScrapeLink(link);});
并设置 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");
但就我而言,我想将它们分割成更小的列表,然后将并行流传输到另一种方法,即我使用 ScraperAPI 在一个会话中分割每个链接(例如,使用 session_number 通过设置 session_number=123 来重用相同的代理。)
所以当我有这样的列表时: 最终列表链接 = Arrays.asList("link1","link2","link3","link4","link5","link6","link7");
System.out.println(Partition.ofSize(numbers, 3));
我会有[[link1, link2, link3], [link4, link5, link6], [link7]] 但是当我想同时在多个线程中处理这些小链表时该怎么办呢?
我的想法是使用 Java 8 Streams。但他们可能是更好的方法?
您可以使用默认 forkjoinpool(如您提到的容量为 5)
还有为您的子列表定义的自定义线程池。
因此,您需要首先创建一个像这样的可运行类,稍后将其提交到“新”线程池
@AllArgsConstructor
public void LinkProcessorTask implements Runnable {
private String link;
@Override
public void run() {
//do something with your link in the sublist
}
}
public void doWork() {
List<List<String>> subListsOfLinks = .... // partitioning function
subListsOfLinks.parallelStream().forEach(list -> {
ExecutorService executorService = Executors.newFixedThreadPool(4 //concurrency);
for(String link: list) {
LinkProcessorTask linkProcessorTask = new LinkProcessorTask(link);
executorService.submit(linkProcessorTask);
executorService.awaitTermination(//Timeout);
}
})
}
现在您可以自己设计决定是否要使这个新线程池成为具有固定并发性的全局线程池。或者您想在 ForkJoinPool 中调用。
如果你走进去,
total number of threads spawned = ForkJoinPoolConcurrency * CustomThreadPoolConcurrency.
否则就只是
ForkJoinPoolConcurrency + CustomThreadPoolConcurrency.
取决于您的机器等多种因素。
如果您想先等待集合中的所有链接完成然后再继续,您可以使用
CountDownLatch
来避免使用繁重的awaitTermination 方法。
不要使用流来安排工作,很容易将输入流(通过迭代器)连接到您的工作人员。 这不是必需的(除非任务非常快),但是,如果由于某种原因您需要分块获取数据,您可以直接执行。
原因是您没有处理流数据,您可以更好地控制任务的执行方式。
例如:
import lombok.SneakyThrows;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SharedJobPool {
@SneakyThrows
public static void main(String... args) {
int WORKERS = 5;
int CHUNK = 5;
Iterator<Integer> jobs = IntStream.range(0, 47).iterator();
List<Thread> workers = IntStream.range(0, WORKERS).mapToObj(j -> new Thread(() -> {
while (true) {
int[] chunk = new int[CHUNK];
int size = 0;
synchronized (jobs) {
while (size < CHUNK && jobs.hasNext())
chunk[size++] = jobs.next();
}
if (size == 0)
break;
slowJobProccesor(j, chunk);
}
})).collect(Collectors.toList());
for (Thread worker : workers)
worker.start();
for (Thread worker : workers)
worker.join();
}
@SneakyThrows
private static void slowJobProccesor(int j, int[] n) {
Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 1_500));
System.out.printf(" Thread #%d done job: %s%n", j, Arrays.stream(n).mapToObj(Integer::toString).collect(Collectors.joining(", ")));
}
}
有输出
Thread #1 done job: 5, 6, 7, 8, 9
Thread #2 done job: 10, 11, 12, 13, 14
Thread #4 done job: 20, 21, 22, 23, 24
Thread #3 done job: 15, 16, 17, 18, 19
Thread #0 done job: 0, 1, 2, 3, 4
Thread #2 done job: 30, 31, 32, 33, 34
Thread #1 done job: 25, 26, 27, 28, 29
Thread #4 done job: 35, 36, 37, 38, 39
Thread #3 done job: 40, 41, 42, 43, 44
Thread #0 done job: 45, 46, 0, 0, 0