我正在尝试在 DoFn 类中对 Google Natural Language API 进行外部调用:
class RequestAPI(beam.DoFn):
def setup(self):
self._client = language_v1.LanguageServiceClient()
def process(self, element):
classify = False
if element['lang'] =='en':
classify = True
document = {"document": {
"type_": "PLAIN_TEXT", "content": element['text']},
"features": {
"extract_entities": True,
"extract_document_sentiment": True,
"extract_entity_sentiment": True,
"classify_text": classify,
},
"encoding_type": "UTF8"
}
response = self._client.annotate_text(request=document)
new_element = element
new_element['analysis'] = {'sentences': response.sentences,
'score': response.document_sentiment.score,
'magnitude': response.document_sentiment.magnitude,
'tokens': response.tokens,
'lang_api': response.language,
'entities': response.entities
}
print(new_element)
return new_element
并且,我将这个课程称为:
data_enrich: PCollection = (
tweets_data | beam.ParDo(RequestAPI())
| 'write data' >> beam.io.WriteToText('data.txt')
)
问题是,当我收到响应并返回新元素时,出现以下错误:
TypeError: cannot pickle 'google.protobuf.pyext._message.RepeatedCompositeContainer' object [while running 'ParDo(RequestAPI)']
我已经检查过 ptoblem 不是 API 响应或新元素的类型(是一个字典)。
我正在使用 python 3.7 和 apache beam 版本 2.43。
我希望你能帮助我或给我任何关于发生了什么事的线索。
我之前用 Map 转换尝试过,它给了我同样的错误,我看到了一些使用 apache beam 进行外部调用的示例,并且这些示例使用 DoFn 类,所以我尝试了它,但它不起作用。它给了我同样的错误。
DoFn 的输出需要是一个迭代器。根据文档(第4.2.1.2章)
您的流程方法应该接受一个参数元素,即 输入
,然后 返回一个 iterable 及其输出值。你可以 通过发射单个元素和yield语句来实现这一点。 您还可以将 return 语句与可迭代对象一起使用,例如 list 或 发电机。element
所以你应该将
RequestAPI
的输出更改为
return [new_element]
甚至更好
yield new_element