我正在尝试创建一个管道,等待GCS文件夹中的新csv文件来处理它们并将输出写入BigQuery。
我写了以下代码:
public static void main(String[] args) {
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class));
TableReference tableRef = new TableReference();
tableRef.setProjectId(PROJECT_ID);
tableRef.setDatasetId(DATASET_ID);
tableRef.setTableId(TABLE_ID);
//Pipeline p = Pipeline.create(PipelineOptionsFactory.as(Options.class));
// Read files as they arrive in GS
p.apply("ReadFile", TextIO.read()
.from("gs://mybucket/*.csv")
.watchForNewFiles(
// Check for new files every 30 seconds
Duration.standardSeconds(30),
// Never stop checking for new files
Watch.Growth.<String>never()
)
)
.apply(ParDo.of(new DoFn<String, Segment>() {
@ProcessElement
public void processElement(ProcessContext c) {
String[] items = c.element().split(",");
if (items[0].startsWith("_", 1)) {
// Skip header (the header is starting with _comment)
LOG.info("Skipped header");
return;
}
Segment segment = new Segment(items);
c.output(segment);
}
}))
.apply(ParDo.of(new FormatSegment()))
.apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(FormatSegment.getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
// Run the pipeline.
p.run();
}
如果我删除watchForNewFiles
部分我的代码工作得很好(我看到关于写入GCS临时位置的并行化的INFO日志,最终输出写入BigQuery)。
但是如果我让watchForNewFiles
(上面的代码)然后我只看到1个INFO日志(关于写入GCS临时位置)并且执行被卡住了。 BigQuery中没有更多日志,也没有错误和输出。
任何的想法?
看起来当使用waitForNewFiles()
时,我们必须使用BigQueryIO.Write.Method.STREAMING_INSERTS
方法写入BigQuery。
有效的代码现在是这样的:
.apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(FormatSegment.getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
使用DataflowRunner我尝试使用此错误.. java.lang.UnsupportedOperationException:DataflowRunner当前不支持可拆分的DoFn:org.apache.beam.sdk.transforms.Watch$WatchGrowthFn@4a1691ac
对于直接跑步者,我看到它轮询,但其余的管道似乎没有发射并且没有错误。写入数据存储区和bigquery。