我无法处理来自外部API调用的元素

问题描述 投票:0回答:1

我正在尝试在 DoFn 类中对 Google Natural Language API 进行外部调用:

class RequestAPI(beam.DoFn):
    def setup(self):
        self._client = language_v1.LanguageServiceClient()

    def process(self, element):
        classify = False
        if element['lang'] =='en':
            classify = True
        document = {"document": {
            "type_": "PLAIN_TEXT", "content": element['text']},
            "features": {
                "extract_entities": True,
                "extract_document_sentiment": True,
                "extract_entity_sentiment": True,
                "classify_text": classify,
            },
            "encoding_type": "UTF8"
        }

        response = self._client.annotate_text(request=document)
        new_element = element
        new_element['analysis'] = {'sentences': response.sentences,
                                   'score': response.document_sentiment.score,
                                   'magnitude': response.document_sentiment.magnitude,
                                   'tokens': response.tokens,
                                   'lang_api': response.language,
                                   'entities': response.entities
                                   }
        print(new_element)
        return new_element

并且,我将这个课程称为:

data_enrich: PCollection = (
            tweets_data | beam.ParDo(RequestAPI())
            | 'write data' >> beam.io.WriteToText('data.txt')
    )

问题是,当我收到响应并返回新元素时,出现以下错误:

TypeError: cannot pickle 'google.protobuf.pyext._message.RepeatedCompositeContainer' object [while running 'ParDo(RequestAPI)']

我已经检查过 ptoblem 不是 API 响应或新元素的类型(是一个字典)。

我正在使用 python 3.7 和 apache beam 版本 2.43。

我希望你能帮助我或给我任何关于发生了什么事的线索。

我之前用 Map 转换尝试过,它给了我同样的错误,我看到了一些使用 apache beam 进行外部调用的示例,并且这些示例使用 DoFn 类,所以我尝试了它,但它不起作用。它给了我同样的错误。

python apache-beam
1个回答
0
投票

DoFn 的输出需要是一个迭代器。根据文档(第4.2.1.2章)

您的流程方法应该接受一个参数元素,即 输入

element
,然后 返回一个 iterable 及其输出值。你可以 通过发射单个元素和yield语句来实现这一点。 您还可以将 return 语句与可迭代对象一起使用,例如 list发电机

所以你应该将

RequestAPI
的输出更改为

return [new_element]

甚至更好

yield new_element
© www.soinside.com 2019 - 2024. All rights reserved.