如何使用 Java 将 PCollection 转换为 Apache Beam 中的列表集合?

问题描述 投票:0回答:2

我正在使用 Apache Beam 来处理批处理数据,为此,我从 List 创建 PCollection 对象,但是一旦完成执行管道处理,我需要将 PCollection 中的结果转换回 List 集合。有人可以帮我解决这个问题吗?

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

List<Map<String,String>> inputDataList = ...//API call results.

PCollection pcollection = pipeline.apply(Create.of(inputDataList).withCoder(MapCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of())));

pcollection = pcollection.apply()// applying few transformation.

List<Map<String,String>> outputDataList = ? //How to get list from pcollection object?

在互联网上没有找到太多帮助,无论提到什么解决方案,其方法都没有为 PCollection 定义。例如get()、GetAllOut() 等

java apache-beam


,因为 Apache Beam 只是转换和操作的逻辑计划,但执行实际上是由 Spark 等其他引擎完成的。

您可以做的就是让 PCollection 将结果写入接收器(可以是文件、GCS、BigQuery,..),然后从该接收器中读取结果。

查看 Apache Beam 的 执行模型

其实我也刚刚遇到这个问题。与上面的答案和评论相反,可以将 PCollection 写入 Java 列表,尽管这样做有一些限制,正如我在下面讨论的那样。至于您想要这样做的原因,有几种情况不足以使用 PCollections:

    您想要使用遗留 API,尤其是其中的类不可 Java 序列化的 API。可串行化是在 Beam 集合上创建转换的要求。
  1. 您希望将 PCollection 的内容写入文件、数据库或具有内置 IO 连接器的服务之一之外的某个目标。就我而言,我想写入 Java OutputStreams,它是文件、url 和 Swing 组件(如 JTextArea)的共同点; OutputStream 及其子类不可序列化。虽然 Beam 可以做一些复杂的事情,比如处理流数据,但做一些简单的事情,比如向屏幕窗口写入文本,却出人意料地困难。
为了将 PCollection 写入 Java 列表(或其他 Java 集合),您可以使用 Beam Combiner。就我而言,我使用 ArrayAggArray 组合器。写入数据是一个两个阶段的过程。首先,应用组合器并运行管道。然后,您在 Pipeline 之外处理生成的 Java 对象。这是该方法的第一个限制。运行 Pipeline 后,您将无法再访问它创建的任何 PCollection,因此本质上任何后处理都不得回调任何 Beam 对象。

请注意,您创建的 Java 对象可能非常大,并且无法并行处理(这就是您首先要使用 Beam 的原因!)。这是我的方法的第二个限制。

技巧在于,在扩展 Beam Combiner 时,您必须将最终的累加器保存到 Pipeline 外部的 Java 对象中。我通过在组合器的构造函数中传递对对象的引用来完成此操作。在Combiner的extractOutput方法中,您必须将最终累加器的元素添加到Java对象中(您不能简单地替换它,因为Java不会将新实例传递回调用者)。

这是一个例子: 首先,扩展光束组合器:

public static class JsonWriter extends ArrayAgg.ArrayAggArray<String> { private static final long serialVersionUID = -5573525980983338238L; /** * Holds the JSON serialization of the records in applicable PCollection. * This field serves as the output of the Combine operation on this PCollection. * It should be passed as an empty list to the constructor, and is populated by the * extractOutput method. */ private List<String> records; /** * @param records the output of the Combine operation on this PCollection; * should be passed as an empty list to the constructor * */ public JsonWriter(List<String> records) { super(); if(records== null || records.size()!=0) throw new IllegalArgumentException("Records should be empty"); this.records= records; } @Override public List<String> addInput( List<String> accum, String input) { return super.addInput(accum, input); } @Override public List<String> extractOutput( List<String> accumulator) { if (accumulator.isEmpty()) { return null; } this.records.addAll(accumulator); return accumulator; } /** * @return the records */ public List<String> getRecords() { return records; } }//class JsonWriter

instance.recordsToJson(); //instance has a PCollection pipeline.run().waitUntilFinish();

recordsAsJson = new LinkedList<String>(); /** * Maps the records in dataframe to their JSON representation * and collects the resulting strings into the recordsAsJson list. * Called as part of the Beam pipeline. */ public void recordsToJson() { JsonWriter writer= new JsonWriter(recordsAsJson); this.dataframe //a PCollection<Row> //transform each record into a JSON string .apply( MapElements.into(TypeDescriptors.strings()) .via((Row record) -> MsdxInstance.recordToJson(record))) //Collect the JSON strings into a list .apply( this.dataframe.getName() + "AsJson", Combine.globally(writer)); }//recordsToJson

/** * Writes the JSON serialization of this dataframe to the destination specified by the generator. * Called after the recordsToJson method and after the Beam pipeline runs. */ @Override public MsdxWriter.Generator toJson(MsdxWriter.Generator generator) { if(this.dataframe.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) throw new IllegalArgumentException("Unbounded collection of records"); AtomicBoolean firstRecord= new AtomicBoolean(true); try { generator.writeStartArray(false); recordsAsJson.forEach(record -> { try { if(!firstRecord.get()) generator.writeArrayValueSeparator(false); else firstRecord.set(false); generator.writeRaw(record); } catch (IOException e) { System.err.println(e.getMessage()); e.printStackTrace(); } }); generator.writeEndArray(false); } catch (IOException e) { System.err.println(e.getMessage()); e.printStackTrace(); } return generator; }//toJson

window.println("cities:"); instance.toJson(MsdxOutputDestination.toStream(window.printStream()));

© www.soinside.com 2019 - 2024. All rights reserved.