使用执行权限和管道阅读器/pipedWriter(或PipedInputStream/pipedOutputStream)用于消费者生产商

问题描述 投票:0回答:2
有一个示例用于使用

PipedInputStream

和新的Java并发软件包?

没有使用管道的类别来执行此任务的更好的方法?

对于您的任务,在您从数据库中读取时,只使用单个线程并使用

BufferedOutputStream

写入文件可能就足够了。

如果您想对缓冲区大小和写入文件的块的大小进行更多控制,则可以做这样的事情:
java producer-consumer executorservice java.util.concurrent
2个回答
5
投票
class Producer implements Runnable { private final OutputStream out; private final SomeDBClass db; public Producer( OutputStream out, SomeDBClass db ){ this.out = out; this.db = db; } public void run(){ // If you're writing to a text file you might want to wrap // out in a Writer instead of using `write` directly. while( db has more data ){ out.write( the data ); } out.flush(); out.close(); } } class Consumer implements Runnable { private final InputStream in; private final OutputStream out; public static final int CHUNKSIZE=512; public Consumer( InputStream in, OutputStream out ){ this.out = out; this.in = in; } public void run(){ byte[] chunk = new byte[CHUNKSIZE]; for( int bytesRead; -1 != (bytesRead = in.read(chunk,0,CHUNKSIZE) );;){ out.write(chunk, 0, bytesRead); } out.close(); } }

在调用代码中:

FileOutputStream toFile = // Open the stream to a file SomeDBClass db = // Set up the db connection PipedInputStream pi = new PipedInputStream(); // Optionally specify a size PipedOutputStream po = new PipedOutputStream( pi ); ExecutorService exec = Executors.newFixedThreadPool(2); exec.submit( new Producer( po, db ) ); exec.submit( new Consumer( pi, toFile ) ); exec.shutdown();

也捕获可能会抛出的任何例外。

注意,如果这是您要做的,那么使用

ExecutorService
没有任何优势。当您有许多任务时,执行者很有用(太多,无法同时启动所有任务)。在这里,您只有两个线程必须同时运行,因此呼叫
Thread#start
    直接将其开销较少。
  • Spring Boot提供了一个名为
ExecutionService

的类似API,您可以在具有

AsyncTaskExecutor
的Springboot应用中使用它
这是一项可以显示其工作原理的测试

@EnableAsync

0
投票
从编码的角度使用

@SpringBootTest class PipedInputOutputTest { @Autowired private AsyncTaskExecutor taskExecutor; @Test void ensureThreadPoolTaskExecutor() { assertThat(taskExecutor).isNotNull(); } /** * Show an example of piping from a source to become another input stream. */ @Test void testPipedIO() throws IOException { final var pipedOutputStream = new PipedOutputStream(); final var pipedInputStream = new PipedInputStream(pipedOutputStream); Future<Void> pullDataFromSourceAndSendToPipeFuture = taskExecutor.submit(() -> { var urlConnection = new URL("https://trajano.net").openConnection(); try (pipedOutputStream; var sourceInputStream = urlConnection.getInputStream()) { byte[] buf = new byte[2_000]; int bytesRead; while ((bytesRead = sourceInputStream.read(buf)) != -1) { pipedOutputStream.write(buf,0, bytesRead); } } return null; }); Future<Long> pullDataFromPipeToSomeProcessing = taskExecutor.submit(() -> { try (pipedInputStream) { long size = 0; byte[] buf = new byte[1_000]; int bytesRead; while ((bytesRead = pipedInputStream.read(buf)) != -1) { size += bytesRead; } return size; } }); try { pullDataFromSourceAndSendToPipeFuture.get(); long size = pullDataFromPipeToSomeProcessing.get(); assertThat(size).isPositive(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); fail("Thread interrupted"); } catch (ExecutionException e) { fail("Execution error", e.getCause()); } } }

ExecutionService
的优点是减少
try
-
catch

块,因为它允许

Callable<T>

blove the,它允许抛出异常而不是
Runnable
。 线程上的例外被捕获在

catch (ExecutionException)

块中。
通过减轻池的责任和配置,包括交换到虚拟线程,而不是在较新的
.
中运行的人的线程池。
	
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.