我正在为 GCP Dataflow 批处理进行 POC。
我想传递 Pandas Dataframe 作为批量输入并执行列式转换并再次返回同一批次。
我参考下面链接中提供的 MultipyByTwo 示例 https://beam.apache.org/documentation/programming-guide/
当我输入 Pandas Dataframe process_batch 函数时未执行。
您能否告诉我原因,如果可能,请提供示例。
代码- pd = read_excel(路径)
result1 = ( pd | "测试批次" >> beam.ParDo(TestBatch(argv)))
DoFn 类 -
类(TestBatch(beam.DoFn): def init(self, args: 任意): self.args = args 打印(“(TestBatch.init”)
def setup(self) -> None:
print("(TestBatch.setup")
def process_batch(self, batch: pd.DataFrame) -> pd.DataFrame:
print("(TestBatch.process_batch")
print(batch)
yield batch
我建议你看一下Beam Dataframes,例如,https://beam.apache.org/releases/pydoc/current/apache_beam.dataframe.io.html#apache_beam.dataframe.io.read_excel。这允许你更有效地处理数据帧。
对于您的情况,此代码示例应该有效:
import apache_beam as beam
import pandas as pd
df = pd.read_csv("beers.csv")
class ProcessDf(beam.DoFn):
def process(self, e: pd.DataFrame):
print(e)
return e
with beam.Pipeline() as p:
p | beam.Create([df]) | beam.ParDo(ProcessDf())
Beam 管道需要从 Pipeline 开始。
https://beam.apache.org/get-started/ 有更多信息。 Tour of Beam 是学习一些基本概念的好地方。