使用Python Polars执行基于整数的滚动窗口group_

问题描述 投票:0回答:2

我有一个基于外/内循环的函数,我正在尝试使用 Python Polars DataFrames 进行矢量化。该函数是移动平均线的一种,将用于过滤时间序列金融数据。这是函数:

def ma_j(df_src: pl.DataFrame, depth: float):

    jrc04 = 0.0
    jrc05 = 0.0
    jrc06 = 0.0
    jrc08 = 0.0

    series = df_src['close']

    for x in range(0, len(series)):
        if x >= x - depth*2:
            for k in np.arange(start=math.ceil(depth), stop=0, step=-1):
                jrc04 = jrc04 + abs(series[x-k] - series[x-(k+1)])
                jrc05 = jrc05 + (depth + k) * abs(series[x-k] - series[x-(k+1)])
                jrc06 = jrc06 + series[x-(k+1)]
        else:
            jrc03 = abs(series - (series[1]))
            jrc13 = abs(series[x-depth] - series[x - (depth+1)])
            jrc04 = jrc04 - jrc13 + jrc03
            jrc05 = jrc05 - jrc04 + jrc03 * depth
            jrc06 = jrc06 - series[x - (depth+1)] + series[x-1]
        jrc08 = abs(depth * series[x] - jrc06)

    if jrc05 == 0.0:
        ma = 0.0
    else:
        ma = jrc08/jrc05

    return ma

对我来说棘手的一点是多重内循环回顾(对于 k in...)。我查看了多个在

group_by_dynamic
数据上使用
timeseries
的示例。例如,这里。我还看到了
rolling
的示例,但这似乎仍然使用句点。

但是,我想去掉

timeseries
并只使用源
Series
。这是否意味着我需要在整数范围内分组?

使用此数据示例:

import polars as pl
import numpy as np

i, t, v = np.arange(0, 50, 1), np.arange(0, 100, 2), np.random.randint(1,101,50)
df = pl.DataFrame({"i": i, "t": t, "rand": v})
df = df.with_columns((pl.datetime(2022,10,30) + pl.duration(seconds=df["t"])).alias("datetime")).drop("t")
cols = ["i", "datetime", "rand"]
df = df.select(cols)

数据框看起来像这样:

shape: (50, 3)
┌─────┬─────────────────────┬──────┐
│ i   ┆ datetime            ┆ rand │
│ --- ┆ ---                 ┆ ---  │
│ i64 ┆ datetime[μs]        ┆ i64  │
╞═════╪═════════════════════╪══════╡
│ 0   ┆ 2022-10-30 00:00:00 ┆ 87   │
│ 1   ┆ 2022-10-30 00:00:02 ┆ 66   │
│ 2   ┆ 2022-10-30 00:00:04 ┆ 30   │
│ 3   ┆ 2022-10-30 00:00:06 ┆ 87   │
│ 4   ┆ 2022-10-30 00:00:08 ┆ 74   │
│ …   ┆ …                   ┆ …    │
│ 45  ┆ 2022-10-30 00:01:30 ┆ 91   │
│ 46  ┆ 2022-10-30 00:01:32 ┆ 52   │
│ 47  ┆ 2022-10-30 00:01:34 ┆ 68   │
│ 48  ┆ 2022-10-30 00:01:36 ┆ 26   │
│ 49  ┆ 2022-10-30 00:01:38 ┆ 99   │
└─────┴─────────────────────┴──────┘

...我可以像这样按日期时间进行分组”:

df.group_by_dynamic("datetime", every="10s").agg(
    pl.col("rand").mean().alias('rolling mean')
)

这给出了: enter image description here

但这有3个问题:

  • 我不想对日期时间进行分组...我想对 [x] 大小的容器中的每一行(也许
    i
    ?)进行分组。
  • 我需要每一行的值
  • 我想根据上面函数中的各种情况定义聚合函数

关于如何使用 Polars 来解决这个问题,有什么建议吗?谢谢。

----------编辑1

遵循@ritchie46的很棒的建议(谢谢伙计!),这是groupby:

result_grp = (
    df
    .rolling(index_column="i", period="10i")
    .agg(
        pl.len().alias("rolling_slots"),
        pl.col("rand").mean().alias("roll_mean")
    )
)

df2 = df.select(
    pl.all(),
    result_grp.get_column("rolling_slots"),
    result_grp.get_column("roll_mean"),
)

现在给出:

shape: (50, 5)
┌─────┬─────────────────────┬──────┬───────────────┬───────────┐
│ i   ┆ datetime            ┆ rand ┆ rolling_slots ┆ roll_mean │
│ --- ┆ ---                 ┆ ---  ┆ ---           ┆ ---       │
│ i64 ┆ datetime[μs]        ┆ i64  ┆ u32           ┆ f64       │
╞═════╪═════════════════════╪══════╪═══════════════╪═══════════╡
│ 0   ┆ 2022-10-30 00:00:00 ┆ 23   ┆ 1             ┆ 23.0      │
│ 1   ┆ 2022-10-30 00:00:02 ┆ 72   ┆ 2             ┆ 47.5      │
│ 2   ┆ 2022-10-30 00:00:04 ┆ 46   ┆ 3             ┆ 47.0      │
│ 3   ┆ 2022-10-30 00:00:06 ┆ 37   ┆ 4             ┆ 44.5      │
│ 4   ┆ 2022-10-30 00:00:08 ┆ 12   ┆ 5             ┆ 38.0      │
│ …   ┆ …                   ┆ …    ┆ …             ┆ …         │
│ 45  ┆ 2022-10-30 00:01:30 ┆ 95   ┆ 10            ┆ 53.7      │
│ 46  ┆ 2022-10-30 00:01:32 ┆ 100  ┆ 10            ┆ 62.7      │
│ 47  ┆ 2022-10-30 00:01:34 ┆ 6    ┆ 10            ┆ 62.2      │
│ 48  ┆ 2022-10-30 00:01:36 ┆ 27   ┆ 10            ┆ 56.5      │
│ 49  ┆ 2022-10-30 00:01:38 ┆ 33   ┆ 10            ┆ 54.5      │
└─────┴─────────────────────┴──────┴───────────────┴───────────┘

这太棒了;现在,我如何对分组值应用自定义函数,而不是mean(),例如:

f_jparams(depth_array, jrc04, jrc05, jrc06, jrc08):
    
    _depth = len(depth_array)
    
    if len(depth_array) > 3:
        for x in np.arange(start=1, stop=len(depth_array), step=1):
            jrc04 = jrc04 + abs(depth_array[x] - depth_array[x-1])
            jrc05 = jrc05 + (_depth+x) * abs(depth_array[x] - depth_array[x-1])
            jrc06 = jrc06 + depth_array[x-1]
    else:
        jrc03 = abs(depth_array[_depth-1] - depth_array[_depth-2])
        jrc13 = abs(depth_array[0] - depth_array[1])
        jrc04 = jrc04 - jrc13 + jrc03
        jrc05 = jrc05 - jrc04 + jrc03*_depth
        jrc06 = jrc06 - depth_array[1] + depth_array[_depth-2]
        
    jrc08 = abs(_depth * depth_array[0] - jrc06)
    
    if jrc05 == 0.0:
        ma = 0.0
    else:
        ma = jrc08/jrc05
        
    return ma, jrc04, jrc05, jrc06, jrc08

谢谢!

----编辑2:

多亏了这篇post,我可以将

rand
滚动组中的项目收集到每行的列表中:

depth = 10

result_grp = (
    df
    .rolling(
        index_column="i", 
        period=str(depth) + "i",
        # offset="0i",
        # closed="left"
    )
    .agg(
        pl.len().alias("rolling_slots"),
        pl.col("rand").mean().alias("roll_mean"),
        pl.col("rand").name.suffix('_val_list'),
    )
)

df2 = df.select(
    pl.all(),
    result_grp.get_column("rolling_slots"),
    result_grp.get_column("roll_mean"),
    result_grp.get_column("rand_val_list"),
)

同样从这篇文章中,我看到了一种使滚动窗口

period
成为变量的方法;不错!

有没有办法同时使用

get_columns
exclude
这样我就不必列出我想要的每一个列?

数据框现在看起来像:

shape: (50, 6)
┌─────┬─────────────────────┬──────┬───────────────┬───────────┬─────────────────┐
│ i   ┆ datetime            ┆ rand ┆ rolling_slots ┆ roll_mean ┆ rand_val_list   │
│ --- ┆ ---                 ┆ ---  ┆ ---           ┆ ---       ┆ ---             │
│ i64 ┆ datetime[μs]        ┆ i64  ┆ u32           ┆ f64       ┆ list[i64]       │
╞═════╪═════════════════════╪══════╪═══════════════╪═══════════╪═════════════════╡
│ 0   ┆ 2022-10-30 00:00:00 ┆ 23   ┆ 1             ┆ 23.0      ┆ [23]            │
│ 1   ┆ 2022-10-30 00:00:02 ┆ 72   ┆ 2             ┆ 47.5      ┆ [23, 72]        │
│ 2   ┆ 2022-10-30 00:00:04 ┆ 46   ┆ 3             ┆ 47.0      ┆ [23, 72, 46]    │
│ 3   ┆ 2022-10-30 00:00:06 ┆ 37   ┆ 4             ┆ 44.5      ┆ [23, 72, … 37]  │
│ 4   ┆ 2022-10-30 00:00:08 ┆ 12   ┆ 5             ┆ 38.0      ┆ [23, 72, … 12]  │
│ …   ┆ …                   ┆ …    ┆ …             ┆ …         ┆ …               │
│ 45  ┆ 2022-10-30 00:01:30 ┆ 95   ┆ 10            ┆ 53.7      ┆ [10, 11, … 95]  │
│ 46  ┆ 2022-10-30 00:01:32 ┆ 100  ┆ 10            ┆ 62.7      ┆ [11, 84, … 100] │
│ 47  ┆ 2022-10-30 00:01:34 ┆ 6    ┆ 10            ┆ 62.2      ┆ [84, 53, … 6]   │
│ 48  ┆ 2022-10-30 00:01:36 ┆ 27   ┆ 10            ┆ 56.5      ┆ [53, 46, … 27]  │
│ 49  ┆ 2022-10-30 00:01:38 ┆ 33   ┆ 10            ┆ 54.5      ┆ [46, 56, … 33]  │
└─────┴─────────────────────┴──────┴───────────────┴───────────┴─────────────────┘

我现在是否应该重新循环遍历

rand_val_list
列并将每个分组值列表发送到我的函数?或者有更好的
polars
方法吗?

再次感谢!

python dataframe python-polars
2个回答
2
投票

您在寻找

periods="10i"
吗?

Polars

rolling
接受具有以下查询语言的
period
参数:

        - 1ns   (1 nanosecond)
        - 1us   (1 microsecond)
        - 1ms   (1 millisecond)
        - 1s    (1 second)
        - 1m    (1 minute)
        - 1h    (1 hour)
        - 1d    (1 day)
        - 1w    (1 week)
        - 1mo   (1 calendar month)
        - 1y    (1 calendar year)
        - 1i    (1 index count)

其中

i
只是索引/行数。

因此,在您的数据上,我们计算槽数的滚动 group_by 将给出:

(df.rolling(index_column="i", period="10i")
   .agg(
       pl.len().alias("rolling_slots")
   )
)
shape: (50, 2)
┌─────┬───────────────┐
│ i   ┆ rolling_slots │
│ --- ┆ ---           │
│ i64 ┆ u32           │
╞═════╪═══════════════╡
│ 0   ┆ 1             │
│ 1   ┆ 2             │
│ 2   ┆ 3             │
│ 3   ┆ 4             │
│ 4   ┆ 5             │
│ …   ┆ …             │
│ 45  ┆ 10            │
│ 46  ┆ 10            │
│ 47  ┆ 10            │
│ 48  ┆ 10            │
│ 49  ┆ 10            │
└─────┴───────────────┘

-1
投票
def ma_j(df_src: pl.DataFrame, depth: float):
    jrc04 = 0.0
    jrc05 = 0.0
    jrc06 = 0.0
    jrc08 = 0.0

    series = df_src['close']

    for x in range(0, len(series)):
        if x >= x - depth*2:
            for k in np.arange(start=math.ceil(depth), stop=0, step=-1):
                jrc04 = jrc04 + abs(series[x-k] - series[x-(k+1)])
                jrc05 = jrc05 + (depth + k) * abs(series[x-k] - series[x-(k+1)])
                jrc06 = jrc06 + series[x-(k+1)]
        else:
            jrc03 = abs(series - (series[1]))
            jrc13 = abs(series[x-depth] - series[x - (depth+1)])
            jrc04 = jrc04 - jrc13 + jrc03
            jrc05 = jrc05 - jrc04 + jrc03 * depth
            jrc06 = jrc06 - series[x - (depth+1)] + series[x-1]
        jrc08 = abs(depth * series[x] - jrc06)

    if jrc05 == 0.0:
        ma = 0.0
    else:
        ma = jrc08/jrc05

    return ma
© www.soinside.com 2019 - 2024. All rights reserved.