我正在尝试对惰性极坐标数据框执行枢轴操作。
这意味着,如果我收集数据,我就可以进行数据透视:
df = pl.read_parquet(path,low_memory=True)
pivoted_df = df.pivot(index=["ind1", "ind2", "ind3", "ind4"], columns="my_signal", values="Value", aggregate_function="mean")
这些代码行有效。
但是,如果我通过调用此方法使用惰性数据框:
df = pl.scan_parquet(path,low_memory=True)
我找不到执行相同算法的方法。
.pivot
无法应用于惰性对象。
重要提示:我在任何时候都不想用
df.collect()
收集数据,因为我有一个非常大的数据集,不适合内存。
我想在最后用
pivoted_df .sink_parquet()
保存惰性数据框,所以是的,我不想在任何时候收集数据。
提前谢谢您!
我尝试使用
.group_by
代替.pivot
:
grouped_df = df.group_by(["["ind1", "ind2", "ind3", "ind4"])
transformed_df = grouped_df.agg(**{f"{col}_mean": pl.col("Value").mean() for col in pl.col('my_signal').unique()})
但是我收到错误
'Expr' object is not iterable
,这是正常的,因为 pl.col('full_signal_name').unique()
是一个表达式。
还有其他选择吗?
示例:
df:
┌──────┬──────┬──────┬──────┬───────────┬───────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ my_signal ┆ Value │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ str ┆ i64 │
╞══════╪══════╪══════╪══════╪═══════════╪═══════╡
│ www ┆ xxx ┆ yyy ┆ zzz ┆ a ┆ 1 │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ a ┆ 1 │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ a ┆ 1 │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ b ┆ 2 │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ b ┆ 2 │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ b ┆ 2 │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ c ┆ 3 │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ c ┆ 3 │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ c ┆ 3 │
└──────┴──────┴──────┴──────┴───────────┴───────┘
pivoted_df:
┌──────┬──────┬──────┬──────┬──────┬─────┬──────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ f64 ┆ f64 ┆ f64 │
╞══════╪══════╪══════╪══════╪══════╪═════╪══════╡
│ www ┆ xxx ┆ yyy ┆ zzz ┆ 1.0 ┆ 2.0 ┆ null │
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ null ┆ 2.0 ┆ 3.0 │
└──────┴──────┴──────┴──────┴──────┴─────┴──────┘
pivot()
的文档中有一些建议:
请注意,pivot 仅在 eager 模式下可用。如果您提前知道唯一列值,则可以使用
在惰性模式下获得与上面相同的结果polars.LazyFrame.groupby()
根据您的用例调整此示例:
index = pl.col("ind1", "ind2", "ind3", "ind4")
columns = pl.col("my_signal")
values = pl.col("Value")
unique_column_values = list(df.unique("my_signal").select("my_signal").collect().to_series())
aggregate_function = lambda col: col.mean()
df.group_by(index).agg(
aggregate_function(values.filter(columns == value)).alias(value)
for value in unique_column_values
).collect()
┌──────┬──────┬──────┬──────┬─────┬──────┬──────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ b ┆ a ┆ c │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ f64 ┆ f64 ┆ f64 │
╞══════╪══════╪══════╪══════╪═════╪══════╪══════╡
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ 2.0 ┆ null ┆ 3.0 │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ 2.0 ┆ 1.0 ┆ null │
└──────┴──────┴──────┴──────┴─────┴──────┴──────┘
我也试图找到一种替代方法。由于我们想要动态列,自然的方法是使用 unnest()
的
Struct
方法。所以中间目标是最终得到这样的 DataFrame:
┌──────┬──────┬──────┬──────┬────────────────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ my_signal │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ struct[3] │
╞══════╪══════╪══════╪══════╪════════════════╡
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ {2.0,3.0,null} │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ {2.0,null,1.0} │
└──────┴──────┴──────┴──────┴────────────────┘
相当接近这个中间目标相对容易:
(
df
.group_by("ind1", "ind2", "ind3", "ind4", "my_signal")
.mean()
.group_by("ind1", "ind2", "ind3", "ind4")
.agg(pl.struct(pl.col("my_signal"),pl.col("Value")))
.collect()
)
┌──────┬──────┬──────┬──────┬────────────────────────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ my_signal │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ list[struct[2]] │
╞══════╪══════╪══════╪══════╪════════════════════════╡
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ [{"b",2.0}, {"c",3.0}] │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ [{"a",1.0}, {"b",2.0}] │
└──────┴──────┴──────┴──────┴────────────────────────┘
所以现在我们只需要找到将结构体列表
[{"b",2.0}, {"c",3.0}]
转换为结构体{a:2.0,b:3.0}
的方法即可。但是,我找不到一种简单的方法来做到这一点。也许你可以通过list_eval()
来做到?
不过有一个有点难看的蛮力方法 - 在组内进入 eager 模式并使用数据透视并转换回结构。我必须添加一个虚拟列,因为枢轴需要
index
列,而在我看来,我不需要它。
def transform(x):
return (
x
.list.explode().struct.unnest()
.with_columns(pl.lit(1).alias('i'))
.pivot(index="i", values="Value", columns="my_signal", aggregate_function='mean')
.drop('i')
.select(pl.struct(pl.all()))
.item()
)
(
df
.group_by("ind1", "ind2", "ind3", "ind4")
.agg(pl.struct(pl.col("my_signal"),pl.col("Value")))
.with_columns(pl.col("my_signal").map_elements(transform))
.unnest('my_signal')
.collect()
)
┌──────┬──────┬──────┬──────┬─────┬──────┬──────┐
│ ind1 ┆ ind2 ┆ ind3 ┆ ind4 ┆ b ┆ c ┆ a │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ str ┆ str ┆ f64 ┆ f64 ┆ f64 │
╞══════╪══════╪══════╪══════╪═════╪══════╪══════╡
│ fff ┆ xxx ┆ yyy ┆ zzz ┆ 2.0 ┆ 3.0 ┆ null │
│ www ┆ xxx ┆ yyy ┆ zzz ┆ 2.0 ┆ null ┆ 1.0 │
└──────┴──────┴──────┴──────┴─────┴──────┴──────┘
也许有更好的方法将
[{"b",2.0}, {"c",3.0}]
转换为{a:2.0,b:3.0}
,我需要稍后研究一下。