如何有条件地将PTransform应用到PCollection?

问题描述 投票:0回答:1

我有一个

PCollection
,并且希望在验证条件时应用自定义
PTransform
(该条件不取决于
Pcollection
内容)。

示例:我有日志,如果

PipelineOptions
中提供了日期,我想根据该日期进行过滤。

现在,我最好的解决方案是:

// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
    logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...

它有效,但我不喜欢重新分配日志。更重要的是,我不喜欢打破

apply
的链条。这看起来不像是优雅的方式。

有某种条件

PTransform
吗?或者如果没有,将条件检查放在
PTransform
中并在未验证的情况下输出所有内容会更有效吗?

梦的例子:

PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
    .applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
    .apply(...
java google-cloud-dataflow apache-beam
1个回答
2
投票

不幸的是,Beam 没有任何类似

applyIf
的功能, 您当前的方法是进行此类条件过滤的一般方法。

PTransform 中的条件检查为每个元素添加了额外的操作,这将根据检查类型对性能产生影响。

如果可能,最好避免从管道进行变换,而不是让 PTransform 变得更复杂。

从代码美观的角度来看,您可以使用包装器变换来有条件地应用相关的过滤器 pardo。 示例:

public static class ConditionallyFilter
      extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
  private final String date;
  public ConditionallyFilter(String date){
    this.date = date;
  }
  @Override
  public PCollection<LogRow> expand(PCollection<LogRow> logs) {
    if(!date.equals("")){
      logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
    }
    return logs;
  }
} 


// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...
© www.soinside.com 2019 - 2024. All rights reserved.