在Polars库中使用sink_parquet时出错

问题描述 投票:0回答:1

我尝试从数据集中提取一些特征,然后使用 Python 中的 Polars 库将结果写入 Parquet 文件。这是我正在使用的代码:

import ipaddress

import numpy as np
import polars as pl


def extract_session_features(sessions: pl.LazyFrame) -> pl.LazyFrame:
    return (
        sessions.with_columns(
            (pl.col("dpkts") + pl.col("spkts")).alias("total_packets"),
            (pl.col("dbytes") + pl.col("sbytes")).alias("total_bytes"),
            (pl.col("dpkts") / pl.col("spkts")).alias("bytes_ratio"),
            (pl.col("dbytes") / pl.col("sbytes")).alias("packets_ratio"),
            (pl.col("spkts") / pl.col("dur")).alias("sent_packets_rate"),
            (pl.col("dpkts") / pl.col("dur")).alias("received_packets_rate"),
            (pl.col("sbytes") / pl.col("dur")).alias("sent_bytes_rate"),
            (pl.col("dbytes") / pl.col("dur")).alias("received_bytes_rate"),
            (pl.col("sbytes") / pl.col("spkts")).alias("mean_pkt_sent_size"),
            (pl.col("dbytes") / pl.col("dpkts")).alias("mean_pkt_recv_size"),
            (
                pl.col("Timestamp")
                .diff()
                .dt.total_seconds()
                .fill_null(0)
                .over("ID")
                .alias("time_since_last_session")
            ),
        )
        .with_columns(
            pl.when(pl.col("^.*(_ratio|_rate).*$").is_infinite())
            .then(-1)
            .otherwise(pl.col("^.*(_ratio|_rate).*$"))
            .name.keep()
        )
        .fill_nan(-1)
    )


filtered_sessions = pl.scan_parquet("./processed_merge_file_filtered.parquet")
print(filtered_sessions.head().collect(streaming=True))


sessions_features = extract_session_features(filtered_sessions)
sessions_features.sink_parquet("./sessions_features")

当我运行此代码时,出现以下错误:

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-lazy/src/physical_plan/planner/lp.rs:153:28:
sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/home/cpinon/Documentos/Project/SourceCode/project/data/01_raw/sessions/test_feature_engineering.py", line 50, in <module>
    sessions_features.sink_parquet("./sessions_features")
  File "/home/cpinon/Documentos/Project/SourceCode/project/.venv/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1895, in sink_parquet
    return lf.sink_parquet(
pyo3_runtime.PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

错误消息建议使用collect().write_parquet(),但我不确定为什么不能使用sink_parquet()方法。如果我使用前一种解决方案,我的计算机会耗尽内存,因为它无法处理整个数据帧。

python parquet python-polars
1个回答
2
投票

流引擎并不支持所有极坐标操作。特别是,我猜这就是阻止您直播的原因:

(
    pl.col("Timestamp")
    .diff()
    .dt.total_seconds()
    .fill_null(0)
    .over("ID")
    .alias("time_since_last_session")
),

你可以通过这样做来验证

print(sessions_features.explain(streaming=True))

然后你会得到看起来有点像的东西:

print(pl.select(a=pl.lit(1)).lazy().select(pl.col('a').pow(2)).explain(streaming=True))
--- STREAMING
 SELECT [col("a").pow([2])] FROM
  DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: "None"  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

您希望看到

--- STREAMING
--- END STREAMING
之间的所有内容。流式传输不支持任何超出这些线的操作。

关于流媒体的官方文档有点宽松,但是这里一些额外的信息。

就获得您需要的内容而言,您可以像准下沉解决方法那样松散地执行一些操作,其中您必须获取

ID
的唯一列表,然后循环遍历过滤后的版本。

另一个想法是仅使用 pyarrow 数据集编写器或 duckdb (我对它们是否有具体知识)能够支持您的流媒体需求。

© www.soinside.com 2019 - 2024. All rights reserved.