我尝试从数据集中提取一些特征,然后使用 Python 中的 Polars 库将结果写入 Parquet 文件。这是我正在使用的代码:
import ipaddress
import numpy as np
import polars as pl
def extract_session_features(sessions: pl.LazyFrame) -> pl.LazyFrame:
return (
sessions.with_columns(
(pl.col("dpkts") + pl.col("spkts")).alias("total_packets"),
(pl.col("dbytes") + pl.col("sbytes")).alias("total_bytes"),
(pl.col("dpkts") / pl.col("spkts")).alias("bytes_ratio"),
(pl.col("dbytes") / pl.col("sbytes")).alias("packets_ratio"),
(pl.col("spkts") / pl.col("dur")).alias("sent_packets_rate"),
(pl.col("dpkts") / pl.col("dur")).alias("received_packets_rate"),
(pl.col("sbytes") / pl.col("dur")).alias("sent_bytes_rate"),
(pl.col("dbytes") / pl.col("dur")).alias("received_bytes_rate"),
(pl.col("sbytes") / pl.col("spkts")).alias("mean_pkt_sent_size"),
(pl.col("dbytes") / pl.col("dpkts")).alias("mean_pkt_recv_size"),
(
pl.col("Timestamp")
.diff()
.dt.total_seconds()
.fill_null(0)
.over("ID")
.alias("time_since_last_session")
),
)
.with_columns(
pl.when(pl.col("^.*(_ratio|_rate).*$").is_infinite())
.then(-1)
.otherwise(pl.col("^.*(_ratio|_rate).*$"))
.name.keep()
)
.fill_nan(-1)
)
filtered_sessions = pl.scan_parquet("./processed_merge_file_filtered.parquet")
print(filtered_sessions.head().collect(streaming=True))
sessions_features = extract_session_features(filtered_sessions)
sessions_features.sink_parquet("./sessions_features")
当我运行此代码时,出现以下错误:
thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-lazy/src/physical_plan/planner/lp.rs:153:28:
sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
File "/home/cpinon/Documentos/Project/SourceCode/project/data/01_raw/sessions/test_feature_engineering.py", line 50, in <module>
sessions_features.sink_parquet("./sessions_features")
File "/home/cpinon/Documentos/Project/SourceCode/project/.venv/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1895, in sink_parquet
return lf.sink_parquet(
pyo3_runtime.PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
错误消息建议使用collect().write_parquet(),但我不确定为什么不能使用sink_parquet()方法。如果我使用前一种解决方案,我的计算机会耗尽内存,因为它无法处理整个数据帧。
流引擎并不支持所有极坐标操作。特别是,我猜这就是阻止您直播的原因:
(
pl.col("Timestamp")
.diff()
.dt.total_seconds()
.fill_null(0)
.over("ID")
.alias("time_since_last_session")
),
你可以通过这样做来验证
print(sessions_features.explain(streaming=True))
然后你会得到看起来有点像的东西:
print(pl.select(a=pl.lit(1)).lazy().select(pl.col('a').pow(2)).explain(streaming=True))
--- STREAMING
SELECT [col("a").pow([2])] FROM
DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: "None" --- END STREAMING
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
您希望看到
--- STREAMING
和 --- END STREAMING
之间的所有内容。流式传输不支持任何超出这些线的操作。
关于流媒体的官方文档有点宽松,但是这里一些额外的信息。
就获得您需要的内容而言,您可以像准下沉解决方法那样松散地执行一些操作,其中您必须获取
ID
的唯一列表,然后循环遍历过滤后的版本。
另一个想法是仅使用 pyarrow 数据集编写器或 duckdb (我不对它们是否有具体知识)能够支持您的流媒体需求。