我正在尝试在Dataflow中进行单元测试。
对于该测试,在开始时,我将从一个简单的硬编码字符串开始。
问题是,我需要将该字符串转换为pubsub消息。我得到以下代码来做到这一点:
// Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));
但出现以下错误:
java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 50 more
我应该如何从字符串创建pubsub消息?
在用户ParDos的可串行性要求下的Beam Programming Guide中,它提到了这一点:
在使用匿名内部类实例声明内联函数对象时请多加注意。在非静态上下文中,您的内部类实例将隐式包含指向封闭类和该类状态的指针。该封闭类也将被序列化,因此,应用于函数对象本身的相同考虑因素也将应用于此外部类。
我相信正在发生的事情是,您的匿名DoFn隐式包含指向您要在其中构造管道的类的指针,由于某种原因,这会导致此序列化失败。您可以通过将DoFn命名为子类而不是匿名来避免这种情况:
public class MyDoFn extends DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}