在此question中,我们知道
PCollection<String> lines = p.apply(TextIO.read() .from("gs://some-bucket/many/files/*") .withHintMatchesManyFiles());
使用此提示会导致转换以优化的方式执行,以读取较大的文件数:在这种情况下,可以读取的文件数实际上是无限的,并且最有可能比没有此提示的情况下管道运行更快,更便宜且更可靠。
但是,流水线的步骤卡在下面的代码中
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
而且每分钟大约有10〜30MB新文件上传到GCS。
但是,我们尝试从GCS in pub/sub读取文件,管道可以正常工作。
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
我在这里想念什么吗?还是有其他方法可以更有效地从GCS读取大量文件?
我的管道的工作流程是从GCS读取数据,并在数据处理后下沉到Pub / Sub。
Beam版本:2.16.0