我正在从事 Apache Beam Dataflow 作业,以在 Firestore 中存储丰富的流数据。如果处理过程中丢失任何事件数据,我会将这些记录推送到 BigQuery 表中。下面,我分享了相关的代码片段。 EnrichEventsWithFirestore 类获取 Firestore 存储所需的数据。我使用标记的输出来捕获失败的记录并将其推送到 BigQuery,如管道中所示。但是,我不断遇到错误: AttributeError:“PBegin”对象没有属性“windowing” 我该如何解决它?
class EnrichEventsWithFirestore(beam.DoFn):
def __init__(self, database):
self.database = database
self.client = None
def setup(self):
if not self.client:
self.client = firestore.Client(project='temp_myself', database=self.database)
def process(self, element):
list_ids = []
for ele in element:
Id = ele[0]['Id']
list_ids.append(self.client.document('cust_add', str(Id)))
docs = self.client.get_all(references=list_ids)
Id_map = {}
for doc in docs:
doc_dict = doc.to_dict()
if doc_dict:
Id_map[doc_dict['Id']] = {
"c_num": doc_dict.get('c_num', "NA"),
"ct_num": doc_dict.get("ct_num", "NA")
}
enriched_data = []
for ele in element:
tran_data, item_data = ele
Id = int(tran_data['Id'])
filename = str(tran_data['filename'])
tdatetime = tran_data['tdatetime']
if Id in Id_map:
ct_num = Id_map[Id]['ct_num']
c_num = Id_map[Id]['c_num']
else:
yield beam.pvalue.TaggedOutput('discarded',
[{'Id': Id, 'filename': filename, 'tdatetime': tdatetime}]
)
continue
tran_data["ct_num"] = ct_num
tran_data["c_num"] = c_num
enriched_data.append((tran_data, item_data))
yield enriched_data
# Pipeline Code
def run(pipeline_args):
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
pipeline_options.view_as(StandardOptions).streaming = True
pipeline = beam.Pipeline(options=pipeline_options)
if pipeline:
results = (
pipeline
| "Process File Firestore" >> beam.FlatMap(parseForFirestore, campaign__data)
| "Window and Batch Data Firestore" >> beam.WindowInto(
beam.window.FixedWindows(5),
trigger=Repeatedly(AfterAny(AfterCount(100), AfterProcessingTime(5))),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=600
)
| "Group Data Firestore" >> beam.GroupBy(lambda s: assign_random_int())
| "Extract Values Firestore" >> beam.Values()
| "Enrich Events with Firestore Data" >> beam.ParDo(
EnrichEventsWithFirestore('temp_myself')
).with_outputs('discarded', main='not_discarded')
)
results.not_discarded | "Push Data to Firestore" >> beam.ParDo(InsertToFireStoreInBatches('temp_myself'))
results.discarded | "Write to BQ" >> beam.io.WriteToBigQuery(
'bq_table',
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_APPEND",
ignore_unknown_columns=True,
method="STREAMING_INSERTS",
schema=tab_schema
)
pipeline.run()
您的管道似乎没有定义起点。
每个 apache beam 管道中的第一个转换非常独特,并且期望获得
PBegin
对象,而不是 PCollection
- 就像常规转换一样。
PBegin
是一种特殊的对象,代表管道的开始。您没有定义源 - 例如 beam.Create
、beam.ReadFrom*
等。
所以 FlatMap
得到 PBegin
,但期望得到 PCollection
- 所以出现错误。
尝试在
pipeline
和 beam.FlatMap
转换之间添加一些源。