PipedInputStream
和新的Java并发软件包?没有使用管道的类别来执行此任务的更好的方法?
BufferedOutputStream
写入文件可能就足够了。
如果您想对缓冲区大小和写入文件的块的大小进行更多控制,则可以做这样的事情:class Producer implements Runnable {
private final OutputStream out;
private final SomeDBClass db;
public Producer( OutputStream out, SomeDBClass db ){
this.out = out;
this.db = db;
}
public void run(){
// If you're writing to a text file you might want to wrap
// out in a Writer instead of using `write` directly.
while( db has more data ){
out.write( the data );
}
out.flush();
out.close();
}
}
class Consumer implements Runnable {
private final InputStream in;
private final OutputStream out;
public static final int CHUNKSIZE=512;
public Consumer( InputStream in, OutputStream out ){
this.out = out;
this.in = in;
}
public void run(){
byte[] chunk = new byte[CHUNKSIZE];
for( int bytesRead; -1 != (bytesRead = in.read(chunk,0,CHUNKSIZE) );;){
out.write(chunk, 0, bytesRead);
}
out.close();
}
}
在调用代码中:
FileOutputStream toFile = // Open the stream to a file
SomeDBClass db = // Set up the db connection
PipedInputStream pi = new PipedInputStream(); // Optionally specify a size
PipedOutputStream po = new PipedOutputStream( pi );
ExecutorService exec = Executors.newFixedThreadPool(2);
exec.submit( new Producer( po, db ) );
exec.submit( new Consumer( pi, toFile ) );
exec.shutdown();
也捕获可能会抛出的任何例外。
注意,如果这是您要做的,那么使用
ExecutorService
没有任何优势。当您有许多任务时,执行者很有用(太多,无法同时启动所有任务)。在这里,您只有两个线程必须同时运行,因此呼叫
Thread#start
ExecutionService
的类似API,您可以在具有
AsyncTaskExecutor
的Springboot应用中使用它这是一项可以显示其工作原理的测试
@EnableAsync
@SpringBootTest
class PipedInputOutputTest {
@Autowired private AsyncTaskExecutor taskExecutor;
@Test
void ensureThreadPoolTaskExecutor() {
assertThat(taskExecutor).isNotNull();
}
/**
* Show an example of piping from a source to become another input stream.
*/
@Test
void testPipedIO() throws IOException {
final var pipedOutputStream = new PipedOutputStream();
final var pipedInputStream = new PipedInputStream(pipedOutputStream);
Future<Void> pullDataFromSourceAndSendToPipeFuture = taskExecutor.submit(() -> {
var urlConnection = new URL("https://trajano.net").openConnection();
try (pipedOutputStream;
var sourceInputStream = urlConnection.getInputStream()) {
byte[] buf = new byte[2_000];
int bytesRead;
while ((bytesRead = sourceInputStream.read(buf)) != -1) {
pipedOutputStream.write(buf,0, bytesRead);
}
}
return null;
});
Future<Long> pullDataFromPipeToSomeProcessing = taskExecutor.submit(() -> {
try (pipedInputStream) {
long size = 0;
byte[] buf = new byte[1_000];
int bytesRead;
while ((bytesRead = pipedInputStream.read(buf)) != -1) {
size += bytesRead;
}
return size;
}
});
try {
pullDataFromSourceAndSendToPipeFuture.get();
long size = pullDataFromPipeToSomeProcessing.get();
assertThat(size).isPositive();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("Thread interrupted");
} catch (ExecutionException e) {
fail("Execution error", e.getCause());
}
}
}
ExecutionService
的优点是减少
try
-
catch
块,因为它允许Callable<T>
Runnable
。 线程上的例外被捕获在catch (ExecutionException)
块中。通过减轻池的责任和配置,包括交换到虚拟线程,而不是在较新的
.中运行的人的线程池。