为什么自定义Python对象不能与ParDo Fn一起使用?

问题描述 投票:0回答:2

我目前刚开始在Python中使用Apache Beam和Dataflow runner。我有兴趣创建一个发布到Google Cloud PubSub的批处理管道,我已经修改了Beam Python API并找到了解决方案。然而,在我的探索中,我遇到了一些令我好奇的有趣问题。

1. The Successful Pipeline

目前,我从GCS以批量方式发布数据的成功光束管道如下所示:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2. The Unsuccessful Pipelines

在这里,我尝试让发布者在qazxsw poi上分享。我曾尝试过以下方法。

一个。在DoFn中初始化发布者

DoFn

湾在DoFn外部初始化Publisher,并将其传递给DoFn

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

通过class PublishFn(beam.DoFn): def __init__(self, publisher, topic_path): self.publisher = publisher self.topic_path = topic_path super(self.__class__, self).__init__() def process(self, element, **kwargs): future = self.publisher.publish(self.topic_path, data=element.encode("utf-8")) return future.result() def run_gcs_to_pubsub(argv): .... ## same as 1 batch_settings = pubsub_v1.types.BatchSettings( max_bytes=1024, # One kilobyte max_latency=1, # One second ) publisher = pubsub_v1.PublisherClient(batch_settings) with beam.Pipeline(options=options) as p: ... # same as 1 (normalized_data | "Write to PubSub" >> beam.ParDo( PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic")) ) 方法共享发布者的尝试均失败,并显示以下错误消息:

DoFn

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

我的问题是:

  1. 共享发布者实现会改善光束管道性能吗?如果是,那么我想探索这个解决方案。
  2. 为什么我的失败管道上会出现错误?是否由于在 File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__ TypeError: no default __reduce__ due to non-trivial __cinit__ 函数之外初始化并将自定义类对象传递给DoFn?如果是这样的话,我怎样才能实现一个管道,以便能够在DoFn中重用自定义对象?

谢谢,非常感谢您的帮助。

编辑:解决方案

好的,所以Ankur解释了我的问题发生的原因,并讨论了如何在DoFn上完成序列化。基于这些知识,我现在明白在DoFn中有两种解决方案可以使自定义对象共享/重用:

  1. 使自定义对象Serializable:这允许在DoFn对象创建期间(在process下)初始化/可用对象。此对象必须是可序列化的,因为它将在管道提交期间被序列化,其中将创建DoFn对象(调用__init__)。如何实现这一点在下面的答案中得到了回答。另外,我发现这个要求实际上与[1] [2]下的Beam文档有关。
  2. __init__之外的DoFn函数中初始化非可序列化对象以避免序列化,因为在管道提交期间不会调用init之外的函数。 Ankur的答案解释了如何实现这一目标。

参考文献:

[1] __init__

[2] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

python google-cloud-dataflow apache-beam
2个回答
2
投票

https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms无法正确腌制。更多关于酸洗PublisherClient。在here方法中初始化PublisherClient避免了process的酸洗。

如果目的是重用PublisherClient,我建议在process方法中初始化PublisherClient并使用以下代码将其存储在PublisherClient中。

self

0
投票

感谢Ankur,我发现这个问题是由于python中的酸洗问题。然后,我试图通过首先解决class PublishFn(beam.DoFn): def __init__(self, topic_path): self.topic_path = topic_path super(self.__class__, self).__init__() def process(self, element, **kwargs): if not hasattr(self, 'publish'): from google.cloud import pubsub_v1 self.publisher = pubsub_v1.PublisherClient() future = self.publisher.publish(self.topic_path, data=element.encode("utf-8")) return future.result() 酸洗的问题来解决问题,并找到了在梁上的PublisherClient上分享PublisherClient的解决方案。

在python中,我们可以用DoFn包腌制自定义对象,我意识到这个包已经在Beam python实现上用于酸洗对象。所以我尝试解决问题,并发现了这个错误:

dill

然后,我试图修复此错误,我的管道现在正常工作!

以下是解决方案:

TypeError: no default __reduce__ due to non-trivial __cinit__

解决方案的工作方式如下:首先,我从class PubsubClient(PublisherClient): def __reduce__(self): return self.__class__, (self.batch_settings,) # The DoFn to perform on each element in the input PCollection. class PublishFn(beam.DoFn): def __init__(self, topic_path): self.topic_path = topic_path from google.cloud import pubsub_v1 batch_settings = pubsub_v1.types.BatchSettings( max_bytes=1024, # One kilobyte max_latency=1, # One second ) self.publisher = PubsubClient(batch_settings=batch_settings) super(self.__class__, self).__init__() def process(self, element, **kwargs): future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8")) return future.result() # ...the run_gcs_to_pubsub is the same as my successful pipeline 继承并自己实现PublisherClient函数。请注意,因为我只使用__reduce__属性来初始化我的batch_settings,所以这个属性足以支持我的PublisherClient函数。然后我在__reduce__中使用这个修改过的PublisherClient作为我的DoFn。

希望通过这个新的解决方案,我的管道将获得性能提升。

© www.soinside.com 2019 - 2024. All rights reserved.