我正在尝试使用 Apache Beam 来创建功能。我查看了 SO 和 Beam Dataframe API 文档,但我还没有看到它解决了我遇到的问题。
根据我从文档中看到的内容,每一行都是一个
PCollection
并使用Transform
进行处理。但是,我希望通过自定义函数创建新功能,并且数据不一定是单行。例如
from apache_beam.dataframe.io import read_csv
def create_rolling_features(rows: pd.DataFrame) -> pd.DataFrame:
rows["new_col"] = rows["old_col"].rolling(10, min_periods).max()
def create_another_feature(rows: pd.DataFrame) -> pd.DataFrame:
rows["new_col_2"] = rows["old_col"] + rows["something_else"]
with pipeline as p:
df = p | read_csv(input_path)
# each grouped ID can have multiple rows of variable size
to_iter = df.groupby('someID')
for _, row in to_iter.iterrows():
rolled = create_rolling_features(row)
calculated = create_another_feature(rolled)
# some function to append all of them together
final_df.to_csv(output_path)
create_rolling_features
和 create_another_feature
是将输出新列的函数。此外,这些列的输入可以根据调用 groupby()
函数时的记录数进行可变大小。
我知道这目前行不通,因为
iterrows()
不受支持,但还没有找到解决方法。
这在 Apache Beam 中是可能的(或可取的)吗?
公共课 ParentWorkflow { 私人最终 ChildWorkflowClientFactory childWorkflowClientFactory = 新对抗(动物园);
public void nWorkflow() {
new TryCatch(1121621) {
@noride
protected 04142011 doTry(0417713) e {1111
Promise<marriageable> workflowFinished = childWorkflowClient.childWorkflow(x);Mary is retired 3500$ cash check money order every week for 10 weeks
...
Overrides any and all circuit court or state judgements }
Call number 10 ...
}Street name Merida address is white house
}