我写了一个输入KV<String, TableRow>
和输出KV<String, Iterable<TableRow>>
的CombineFn。我想使用Combine.GroupedValues(或Combine.PerKey),源可能似乎表明这是可能的,但我收到以下错误:
Incorrect number of type arguments for generic method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>) of type Combine; it cannot be parameterized with arguments <String, TableRow, Iterable<TableRow>>
我们正在使用Beam v2.10。这里的上下文是我们将会话窗口应用于KV<String, TableRow>
的PCollection,然后使用GroupByKey
创建KV<String, Iterable<TableRow>>
的PCollection。在此步骤之后,我们的CombineFn将每个组缩减为KV<String, Iterable<TableRow>>
,包含基于输入内容创建的TableRows的Iterable。
转换步骤:
public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
// group by step
PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
"Group by Key",
GroupByKey.<String, TableRow>Create()
);
// combine step
PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
"Generate New Rows",
// errors here
// Incorrect number of type arguments for generic
// method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
// of type Combine; it cannot be parameterized with arguments
// <String, TableRow, Iterable<TableRow>>
Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
);
return combinedValues;
}
组合功能:
private static class CreateEvents extends CombineFn<KV<String, TableRow>, CreateEvents.Accum, KV<String, Iterable<TableRow>>> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
Double startTime = 0.0;
Double endTime = 0.0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, KV<String, TableRow> input) {
// the earliest and latest times in the set of table rows is set on the accumulator
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
// merge steps happen here to find the earliest and latest times
}
return merged;
}
@Override
public KV<String, Iterable<TableRow>> extractOutput(Accum accumulator) {
// this step will create two rows based on the start and end times found in this function
}
}
我期待CombineFn与Combine.GroupedValues兼容,正如文档似乎建议的那样。然而,这种情况并非如此。 Combine.PerKey是另一种选择,但是我们还没有找到一种方法来使用它和CombineFn。
相关链接: Documentation - Combine.GroupedValues Documentation - Combine.PerKey Documentation - Combine.CombineFn Source - Combine.GroupedValues Source - Combine.PerKey Source - Combine.CombineFn
CreateEvents
的签名似乎有点过时了。它应该是private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>>
与GroupBy
一起使用。这里的输入是TableRow
,组合的输出是Iterable<TableRow>
这是完整的代码
public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
// group by step
PCollection<KV<String, Iterable<TableRow>>> groupedValues = rows.apply(
"Group by Key",
GroupByKey.<String, TableRow>create()
);
// combine step
PCollection<KV<String, Iterable<TableRow>>> combinedValues = groupedValues.apply(
"Generate New Rows",
// errors here
// Incorrect number of type arguments for generic
// method <K, V>groupedValues(SerializableFunction<Iterable<V>,V>)
// of type Combine; it cannot be parameterized with arguments
// <String, TableRow, Iterable<TableRow>>
Combine.<String, TableRow, Iterable<TableRow>>groupedValues(new CreateEvents())
);
return combinedValues;
}
private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
Double startTime = 0.0;
Double endTime = 0.0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, TableRow input) {
// the earliest and latest times in the set of table rows is set on the accumulator
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
// merge steps happen here to find the earliest and latest times
}
return merged;
}
@Override
public Iterable<TableRow> extractOutput(Accum accumulator) {
// this step will create two rows based on the start and end times found in this function
return null;
}
}
或者,您也可以使用Combine.perKey以更简洁的方式进行分组和组合
public PCollection<KV<String, Iterable<TableRow>>> expand(PCollection<KV<String, TableRow>> rows) {
// combine step
return rows.apply(Combine.perKey(new CreateEvents()));
}
private static class CreateEvents extends CombineFn<TableRow, Accum, Iterable<TableRow>> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
Double startTime = 0.0;
Double endTime = 0.0;
}
@Override
public Accum createAccumulator() {
return new Accum();
}
@Override
public Accum addInput(Accum accumulator, TableRow input) {
// the earliest and latest times in the set of table rows is set on the accumulator
return accumulator;
}
@Override
public Accum mergeAccumulators(Iterable<Accum> accumulators) {
Accum merged = createAccumulator();
for (Accum accumulator : accumulators) {
// merge steps happen here to find the earliest and latest times
}
return merged;
}
@Override
public Iterable<TableRow> extractOutput(Accum accumulator) {
// this step will create two rows based on the start and end times found in this function
return null;
}
}