如何在Combine.GroupedValues中使用自定义CombineFn?

问题描述 投票:2回答:1

我写了一个输入KV<String, TableRow>和输出KV<String, Iterable<TableRow>>的CombineFn。我想使用Combine.GroupedValues(或Combine.PerKey),源可能似乎表明这是可能的,但我收到以下错误:

Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>

我们正在使用Beam v2.10。这里的上下文是我们将会话窗口应用于KV<String, TableRow>的PCollection,然后使用GroupByKey创建KV<String, Iterable<TableRow>>的PCollection。在此步骤之后,我们的CombineFn将每个组缩减为KV<String, Iterable<TableRow>>,包含基于输入内容创建的TableRows的Iterable。

转换步骤:

public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {

  // group by step
  PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
    "Group by Key",
    GroupByKey.<String, TableRow>Create()
  );

    // combine step
  PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
    "Generate New Rows",
    // errors here
    // Incorrect number of type arguments for generic 
    // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) 
    // of type Combine; it cannot be parameterized with arguments 
    // <String, TableRow, Iterable<TableRow>>
    Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
  );


  return combinedValues;
}

组合功能:

private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {

  @DefaultCoder(AvroCoder.class)
  public static class Accum implements Serializable {
    Double startTime = 0.0;
    Double endTime = 0.0;
  }

  @Override
  public Accum createAccumulator() {
    return new Accum();
  }

  @Override
  public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
    // the earliest and latest times in the set of table rows is set on the accumulator

    return accumulator;
  }

  @Override
  public Accum mergeAccumulators(Iterable<Accum> accumulators) {
    Accum merged = createAccumulator();
    for (Accum accumulator : accumulators) {
      // merge steps happen here to find the earliest and latest times
    }

    return merged;
  }

  @Override
  public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
    // this step will create two rows based on the start and end times found in this function
  }
}

我期待CombineFn与Combine.GroupedValues兼容,正如文档似乎建议的那样。然而,这种情况并非如此。 Combine.PerKey是另一种选择,但是我们还没有找到一种方法来使用它和CombineFn。

相关链接: Documentation - Combine.GroupedValues Documentation - Combine.PerKey Documentation - Combine.CombineFn Source - Combine.GroupedValues Source - Combine.PerKey Source - Combine.CombineFn

java google-cloud-dataflow apache-beam
1个回答
0
投票

CreateEvents的签名似乎有点过时了。它应该是private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>>GroupBy一起使用。这里的输入是TableRow,组合的输出是Iterable<TableRow>

这是完整的代码

public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {

    // group by step
    PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
        "Group by Key",
        GroupByKey.<String, TableRow>create()
    );

    // combine step
    PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
        "Generate New Rows",
        // errors here
        // Incorrect number of type arguments for generic
        // method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
        // of type Combine; it cannot be parameterized with arguments
        // <String, TableRow, Iterable<TableRow>>
        Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
    );


    return combinedValues;
  }

  private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
      Double startTime = 0.0;
      Double endTime = 0.0;
    }

    @Override
    public Accum createAccumulator() {
      return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, TableRow input) {
      // the earliest and latest times in the set of table rows is set on the accumulator

      return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
      Accum merged = createAccumulator();
      for (Accum accumulator : accumulators) {
        // merge steps happen here to find the earliest and latest times
      }

      return merged;
    }

    @Override
    public Iterable<TableRow> extractOutput(Accum accumulator) {
      // this step will create two rows based on the start and end times found in this function
      return null;
    }
  }

或者,您也可以使用Combine.perKey以更简洁的方式进行分组和组合


  public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
    // combine step
    return rows.apply(Combine.perKey(new CreateEvents()));
  }

  private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {

    @DefaultCoder(AvroCoder.class)
    public static class Accum implements Serializable {
      Double startTime = 0.0;
      Double endTime = 0.0;
    }

    @Override
    public Accum createAccumulator() {
      return new Accum();
    }

    @Override
    public Accum addInput(Accum accumulator, TableRow input) {
      // the earliest and latest times in the set of table rows is set on the accumulator

      return accumulator;
    }

    @Override
    public Accum mergeAccumulators(Iterable<Accum> accumulators) {
      Accum merged = createAccumulator();
      for (Accum accumulator : accumulators) {
        // merge steps happen here to find the earliest and latest times
      }

      return merged;
    }

    @Override
    public Iterable<TableRow> extractOutput(Accum accumulator) {
      // this step will create two rows based on the start and end times found in this function
      return null;
    }
  }
© www.soinside.com 2019 - 2024. All rights reserved.