如何将变量从管道外部传递给ParDo函数,然后传递给Dataflow作业。下面是一个示例,我试图在创建管道之前派生fileDate并将其传递给ParDo函数。我在接口中声明了变量
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
我将工作中的值设置为
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
并且我正在以ParDo函数的形式访问变量
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
这里是完整的代码
public class CsvToBq {
public static void main(String[] args) {
PipelineOptionsFactory.register(CsvToBq.class);
CsvToBq options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(CsvToBq.class);
Date date = new Date();
String fileDate = formatter.format(date);
options.setFileDate(fileDate);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv"))
.apply("TRANSFORM", ParDo.of(new WikiParDo()))
.apply("WRITE", BigQueryIO.writeTableRows()
.to(String.format("%s:dataset_name.wiki_demo", options.getProject()))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_TRUNCATE)
.withSchema(getTableSchema()));
pipeline.run();
}
private static TableSchema getTableSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("title").setType("STRING"));
fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("file_date").setType("STRING"));
return new TableSchema().setFields(fields);
}
public interface CsvToBq extends DataflowPipelineOptions {
@Description("File Date")
String getFileDate();
void setFileDate(String value);
}
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
String fileDate = options.getFileDate();
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
row.set("file_date", fileDate);
c.output(row);
}
}
}
但是这不起作用。我尝试使用StaticValueProvider和sideinputs,但看起来不符合目的。
我认为您将需要以下内容:
CsvToBq options = c.getPipelineOptions().as(CsvToBq.class);
String fileDate = options.getFileDate();
此外,如果您不打算使用ValueProviders(a current requirement for passing parameters to Dataflow templates,则还可以执行以下操作:
private static class WikiParDo extends DoFn<String, TableRow> {
String fileName;
public WikiParDo(String fileName) {
this.fileName = fileName;
}
请注意您存储的内容必须可序列化。 joda.time如果我记得的话,即时对象还可以。
我使用StaticValueProvider完成了此操作。有关更多信息,请参见here
我们需要定义如下扩展选项的接口
@Description("File Date")
ValueProvider<String[]> getFileDate();
void setFileDate(ValueProvider<String[]> value);
在main方法中,我们可以将变量设置为
Date date = new Date();
String fileDate = formatter.format(date);
并且可以在管道应用转换中以]传递>
ParDo.of(new WikiParDo(StaticValueProvider.of(fileDate)))
并且在ParDo函数中,我们可以如下使用
ValueProvider<String[]> fDate; StoreFrontBqConv(ValueProvider<String[]> fileDate) { this.fDate = fileDate; } fDate.get()
这里是完整的代码
public class CsvToBq { public interface CsvToBq extends DataflowPipelineOptions { @Description("File Date") ValueProvider<String[]> getFileDate(); void setFileDate(ValueProvider<String[]> value); } private static class WikiParDo extends DoFn<String, TableRow> { ValueProvider<String[]> fDate; StoreFrontBqConv(ValueProvider<String[]> fileDate) { this.fDate = fileDate; } @ProcessElement public void processElement(ProcessContext c) throws Exception { PipelineOptions options = c.getPipelineOptions(); String[] split = c.element().split(","); TableRow row = new TableRow(); for (int i = 0; i < split.length; i++) { TableFieldSchema col = getTableSchema().getFields().get(i); row.set(col.getName(), split[i]); } row.set("file_date", fDate.get()); c.output(row); } } private static TableSchema getTableSchema() { List<TableFieldSchema> fields = new ArrayList<>(); fields.add(new TableFieldSchema().setName("year").setType("INTEGER")); fields.add(new TableFieldSchema().setName("month").setType("INTEGER")); fields.add(new TableFieldSchema().setName("day").setType("INTEGER")); fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING")); fields.add(new TableFieldSchema().setName("language").setType("STRING")); fields.add(new TableFieldSchema().setName("title").setType("STRING")); fields.add(new TableFieldSchema().setName("views").setType("INTEGER")); fields.add(new TableFieldSchema().setName("file_date").setType("STRING")); return new TableSchema().setFields(fields); } public static void main(String[] args) { PipelineOptionsFactory.register(CsvToBq.class); CsvToBq options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CsvToBq.class); Date date = new Date(); String fileDate = formatter.format(date); Pipeline pipeline = Pipeline.create(options); pipeline.apply("READ", TextIO.read().from("gs://bucket/file.csv")) .apply("TRANSFORM", ParDo.of(new WikiParDo(StaticValueProvider.of(fileDate)))).apply("WRITE", BigQueryIO.writeTableRows().to(String.format("%s:dataset_name.wiki_demo", options.getProject())) .withCreateDisposition(CREATE_IF_NEEDED).withWriteDisposition(WRITE_TRUNCATE) .withSchema(getTableSchema())); pipeline.run(); }
}