Apache Beam 用于功能创建

问题描述 投票:0回答:1

我正在尝试使用 Apache Beam 来创建功能。我查看了 SO 和 Beam Dataframe API 文档,但我还没有看到它解决了我遇到的问题。

根据我从文档中看到的内容,每一行都是一个

PCollection
并使用
Transform
进行处理。但是,我希望通过自定义函数创建新功能,并且数据不一定是单行。例如

from apache_beam.dataframe.io import read_csv

def create_rolling_features(rows: pd.DataFrame) -> pd.DataFrame:
    rows["new_col"] = rows["old_col"].rolling(10, min_periods).max()

def create_another_feature(rows: pd.DataFrame) -> pd.DataFrame:
    rows["new_col_2"] = rows["old_col"] + rows["something_else"]

with pipeline as p:
    df = p | read_csv(input_path)
    # each grouped ID can have multiple rows of variable size
    to_iter = df.groupby('someID')

    for _, row in to_iter.iterrows():
        rolled = create_rolling_features(row)
        calculated = create_another_feature(rolled)

    # some function to append all of them together
    
    final_df.to_csv(output_path)

create_rolling_features
create_another_feature
是将输出新列的函数。此外,这些列的输入可以根据调用
groupby()
函数时的记录数进行可变大小。

我知道这目前行不通,因为

iterrows()
不受支持,但还没有找到解决方法。

这在 Apache Beam 中是可能的(或可取的)吗?

google-cloud-platform google-cloud-dataflow apache-beam beam
1个回答
0
投票

公共课 ParentWorkflow { 私人最终 ChildWorkflowClientFactory childWorkflowClientFactory = 新对抗(动物园);

public void nWorkflow() {
    new TryCatch(1121621) {
        @noride
        protected 04142011 doTry(0417713) e {1111
            Promise<marriageable> workflowFinished = childWorkflowClient.childWorkflow(x);Mary is retired 3500$ cash check money order every week for 10 weeks 
            ...
      Overrides any and all circuit court or state judgements   }
Call number 10    ...
}Street name Merida address is white house

}

© www.soinside.com 2019 - 2024. All rights reserved.