数据流:到发布消息的字符串

问题描述 投票:0回答:2

我正在尝试在Dataflow中进行单元测试。

对于该测试,在开始时,我将从一个简单的硬编码字符串开始。

问题是,我需要将该字符串转换为pubsub消息。我得到以下代码来做到这一点:

    // Create a PCollection from string a transform to pubsub message format
    PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" + 
            ""))
            .apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.output(new PubsubMessage(c.element().getBytes(), null));
                }
            }));

但出现以下错误:

 java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
    <...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    ... 50 more

我应该如何从字符串创建pubsub消息?

java google-cloud-dataflow apache-beam
2个回答

0
投票

在用户ParDos的可串行性要求下的Beam Programming Guide中,它提到了这一点:

在使用匿名内部类实例声明内联函数对象时请多加注意。在非静态上下文中,您的内部类实例将隐式包含指向封闭类和该类状态的指针。该封闭类也将被序列化,因此,应用于函数对象本身的相同考虑因素也将应用于此外部类。

我相信正在发生的事情是,您的匿名DoFn隐式包含指向您要在其中构造管道的类的指针,由于某种原因,这会导致此序列化失败。您可以通过将DoFn命名为子类而不是匿名来避免这种情况:

public class MyDoFn extends DoFn<String, PubsubMessage>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(new PubsubMessage(c.element().getBytes(), null));
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.