我正在运行一个数据流管道,在该管道中,我必须将数据整理到一个Python数据帧中以用于下一步。因此,我使用DoFn
类并定义__init__
,process
和finish_bundle
函数,如下所示。我希望得到一个将所有条目整理到一个数据帧中的输出。我将此输出作为单例输入输入到管道的下一步中。
class collate_ga_data(beam.DoFn):
def __init__(self):
self._ga_data = pd.DataFrame()
self.window = beam.window.GlobalWindow()
logging.info("In INITIALIZATION : {0}".format(self.window))
def process(self, element,window=beam.DoFn.WindowParam):
self.window = window
logging.info("In PROCESS : {0}".format(self.window))
self._ga_data=self._ga_data.append(pd.DataFrame({k: [v] for k, v in element.items()}))
def finish_bundle(self):
logging.info(" The shape of ga_dataset imported : {0}".format(self._ga_data.shape))
logging.info("In FINISH BUNDLE : {0}".format(self.window))
yield WindowedValue(self._ga_data,0,windows=[self.window])
此代码使用Directrunner
可以完美地工作并给出预期的结果,但是使用Dataflow运行器会引发错误:
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 649, in do_work
work_executor.execute()
File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", line 178, in execute
op.finish()
File "apache_beam/runners/worker/operations.py", line 611, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 612, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 824, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 808, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 806, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 398, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 401, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 959, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 610, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.6/site-packages/apache_beam/transforms/sideinputs.py", line 65, in __getitem__
_FilteringIterable(self._iterable, target_window), self._view_options)
File "/usr/local/lib/python3.6/site-packages/apache_beam/pvalue.py", line 443, in _from_runtime_iterable
len(head), str(head[0]), str(head[1])))
ValueError: PCollection of size 2 with more than one element accessed as a singleton view. First two elements encountered are "Empty DataFrame
我进行了一些挖掘,发现DoFn提供了3个输出-1是必需的数据帧,其他两个是空数据帧。 finish_bundle给出3个结果。我不知道背后的原因。我不想为此使用任何窗口,但是根据文档,finish_bundle的输出必须是窗口值,因此我在其中有一个全局窗口。
以上代码的日志记录信息如下:
2020-02-27T16:32:45.331291913Z The shape of ga_dataset imported : (0, 0) I
2020-02-27T16:32:45.331489801Z In FINISH BUNDLE : GlobalWindow I
2020-02-27T16:32:45.390583276Z The shape of ga_dataset imported : (0, 0) I
2020-02-27T16:32:45.390754222Z In FINISH BUNDLE : GlobalWindow I
2020-02-27T16:32:48.639126300Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.641757011Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.644909381Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.647359848Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.649686336Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.651899814Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.654145240Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.656555175Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.658823966Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.660887002Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.663397789Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.665476560Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.667604684Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.669671535Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.672025680Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.674037218Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.676348209Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.678587436Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.680708885Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.682787656Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.685523986Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.687734365Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.689816713Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.691826343Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.693920373Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.696102380Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.698341846Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.700649023Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.703155755Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.705482244Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.707590818Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.709594726Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.711608886Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.713906288Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.716273546Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.718636035Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.720866918Z In PROCESS : GlobalWindow I
2020-02-27T16:32:48.723044872Z The shape of ga_dataset imported : (37, 8) I
2020-02-27T16:32:48.723157405Z In FINISH BUNDLE : GlobalWindow I
数据流仅为此管道使用一个工作程序。有人知道为什么会这样吗?
[根据Beam execution model,“将集合分为多个包是任意的,并由跑步者选择。”这就是为什么finish_bundle可以被多次调用的原因。
看来,使用DataFrame作为累加器,而使用带有CombineGlobaln的CombineFn可以更好地解决您的问题。请查看Beam programming guide(4.2.4。合并)以获取有关如何实现它的说明。