我有一个
PCollection
,并且希望在验证条件时应用自定义 PTransform
(该条件不取决于 Pcollection
内容)。
示例:我有日志,如果
PipelineOptions
中提供了日期,我想根据该日期进行过滤。
现在,我最好的解决方案是:
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...
它有效,但我不喜欢重新分配日志。更重要的是,我不喜欢打破
apply
的链条。这看起来不像是优雅的方式。
有某种条件
PTransform
吗?或者如果没有,将条件检查放在 PTransform
中并在未验证的情况下输出所有内容会更有效吗?
梦的例子:
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
.applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
.apply(...
不幸的是,Beam 没有任何类似
applyIf
的功能,
您当前的方法是进行此类条件过滤的一般方法。
PTransform 中的条件检查为每个元素添加了额外的操作,这将根据检查类型对性能产生影响。
如果可能,最好避免从管道进行变换,而不是让 PTransform 变得更复杂。
从代码美观的角度来看,您可以使用包装器变换来有条件地应用相关的过滤器 pardo。 示例:
public static class ConditionallyFilter
extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
private final String date;
public ConditionallyFilter(String date){
this.date = date;
}
@Override
public PCollection<LogRow> expand(PCollection<LogRow> logs) {
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
return logs;
}
}
// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...