我正在使用 Apache Beam 来处理批处理数据,为此,我从 List 创建 PCollection 对象,但是一旦完成执行管道处理,我需要将 PCollection 中的结果转换回 List 集合。有人可以帮我解决这个问题吗?
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
List<Map<String,String>> inputDataList = ...//API call results.
PCollection pcollection = pipeline.apply(Create.of(inputDataList).withCoder(MapCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of())));
pcollection = pcollection.apply()// applying few transformation.
List<Map<String,String>> outputDataList = ? //How to get list from pcollection object?
在互联网上没有找到太多帮助,无论提到什么解决方案,其方法都没有为 PCollection 定义。例如get()、GetAllOut() 等
不可能将
PCollection
转换为 Collection
,因为 Apache Beam 只是转换和操作的逻辑计划,但执行实际上是由 Spark 等其他引擎完成的。
您可以做的就是让 PCollection 将结果写入接收器(可以是文件、GCS、BigQuery,..),然后从该接收器中读取结果。
查看 Apache Beam 的 执行模型
请注意,您创建的 Java 对象可能非常大,并且无法并行处理(这就是您首先要使用 Beam 的原因!)。这是我的方法的第二个限制。
技巧在于,在扩展 Beam Combiner 时,您必须将最终的累加器保存到 Pipeline 外部的 Java 对象中。我通过在组合器的构造函数中传递对对象的引用来完成此操作。在Combiner的extractOutput方法中,您必须将最终累加器的元素添加到Java对象中(您不能简单地替换它,因为Java不会将新实例传递回调用者)。
这是一个例子: 首先,扩展光束组合器:
public static class JsonWriter extends ArrayAgg.ArrayAggArray<String> {
private static final long serialVersionUID = -5573525980983338238L;
/**
* Holds the JSON serialization of the records in applicable PCollection.
* This field serves as the output of the Combine operation on this PCollection.
* It should be passed as an empty list to the constructor, and is populated by the
* extractOutput method.
*/
private List<String> records;
/**
* @param records the output of the Combine operation on this PCollection;
* should be passed as an empty list to the constructor
*
*/
public JsonWriter(List<String> records) {
super();
if(records== null || records.size()!=0)
throw new IllegalArgumentException("Records should be empty");
this.records= records;
}
@Override
public List<String> addInput(
List<String> accum,
String input) {
return super.addInput(accum, input);
}
@Override
public List<String> extractOutput(
List<String> accumulator) {
if (accumulator.isEmpty()) {
return null;
}
this.records.addAll(accumulator);
return accumulator;
}
/**
* @return the records
*/
public List<String> getRecords() {
return records;
}
}//class JsonWriter
来电者看起来像这样:
instance.recordsToJson(); //instance has a PCollection
pipeline.run().waitUntilFinish();
哪里
recordsAsJson = new LinkedList<String>();
/**
* Maps the records in dataframe to their JSON representation
* and collects the resulting strings into the recordsAsJson list.
* Called as part of the Beam pipeline.
*/
public void recordsToJson() {
JsonWriter writer= new JsonWriter(recordsAsJson);
this.dataframe //a PCollection<Row>
//transform each record into a JSON string
.apply(
MapElements.into(TypeDescriptors.strings())
.via((Row record) -> MsdxInstance.recordToJson(record)))
//Collect the JSON strings into a list
.apply(
this.dataframe.getName() + "AsJson",
Combine.globally(writer));
}//recordsToJson
然后您可以像这样使用管道外部的列表:
/**
* Writes the JSON serialization of this dataframe to the destination specified by the generator.
* Called after the recordsToJson method and after the Beam pipeline runs.
*/
@Override
public MsdxWriter.Generator toJson(MsdxWriter.Generator generator) {
if(this.dataframe.isBounded().equals(PCollection.IsBounded.UNBOUNDED))
throw new IllegalArgumentException("Unbounded collection of records");
AtomicBoolean firstRecord= new AtomicBoolean(true);
try {
generator.writeStartArray(false);
recordsAsJson.forEach(record -> {
try {
if(!firstRecord.get())
generator.writeArrayValueSeparator(false);
else
firstRecord.set(false);
generator.writeRaw(record);
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
});
generator.writeEndArray(false);
} catch (IOException e) {
System.err.println(e.getMessage());
e.printStackTrace();
}
return generator;
}//toJson
还有
window.println("cities:");
instance.toJson(MsdxOutputDestination.toStream(window.printStream()));
我希望这有帮助。