我正在尝试在 Polars 中执行以下操作。 对于 B 列中低于 80 的值,将在 1 到 4 之间缩放,而对于高于 80 的值,将设置为 5。
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
df_pandas = pd.DataFrame(
{
"A": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"B": [50, 300, 80, 12, 105, 78, 66, 42, 61.5, 35],
}
)
test_scaler = MinMaxScaler(feature_range=(1,4))
df_pandas.loc[df_pandas['B']<80, 'Test'] = test_scaler.fit_transform(df_pandas.loc[df_pandas['B']<80, "B"].values.reshape(-1,1))
df_pandas = df_pandas.fillna(5)
这就是我对 Polars 所做的:
# dt is a dictionary
dt = df.filter(
pl.col('B')<80
).to_dict(as_series=False)
below_80 = list(dt.keys())
dt_scale = list(
test_scaler.fit_transform(
np.array(dt['B']).reshape(-1,1)
).reshape(-1) # reshape back to one dimensional
)
# reassign to dictionary dt
dt['B'] = dt_scale
dt_scale_df = pl.DataFrame(dt)
dt_scale_df
dummy = df.join(
dt_scale_df, how="left", on="A"
).fill_null(5)
dummy = dummy.rename({"B_right": "Test"})
结果:
shape: (10, 3)
┌─────┬───────┬──────────┐
│ A ┆ B ┆ Test │
│ --- ┆ --- ┆ --- │
│ i64 ┆ f64 ┆ f64 │
╞═════╪═══════╪══════════╡
│ 1 ┆ 50.0 ┆ 2.727273 │
│ 2 ┆ 300.0 ┆ 5.0 │
│ 3 ┆ 80.0 ┆ 5.0 │
│ 4 ┆ 12.0 ┆ 1.0 │
│ 5 ┆ 105.0 ┆ 5.0 │
│ 6 ┆ 78.0 ┆ 4.0 │
│ 7 ┆ 66.0 ┆ 3.454545 │
│ 8 ┆ 42.0 ┆ 2.363636 │
│ 9 ┆ 61.5 ┆ 3.25 │
│ 10 ┆ 35.0 ┆ 2.045455 │
└─────┴───────┴──────────┘
有更好的方法吗?
好吧,我为您准备了 3 个示例,应该对您有所帮助,您应该优先选择最后一个。
因为您只想将缩放器应用到列的一部分,所以我们应该确保只将该部分数据发送到缩放器。这可以通过以下方式完成:
这需要一个将应用于分区的 python 函数。在函数本身中,我们必须检查我们所在的分区并相应地处理它。
df = pl.from_pandas(df_pandas)
min_max_sc = MinMaxScaler((1, 4))
def my_scaler(s: pl.Series) -> pl.Series:
if s.len() > 0 and s[0] > 80:
out = (s * 0 + 5)
else:
out = pl.Series(min_max_sc.fit_transform(s.to_numpy().reshape(-1, 1)).flatten())
# ensure all types are the same
return out.cast(pl.Float64)
df.with_columns(
pl.col("B").map_elements(my_scaler).over(pl.col("B") < 80).alias("Test")
)
这会将原始数据帧分区到包含不同分区的字典中。然后我们只根据需要修改分区。
parts = { key[0]: part for key, part in # partition_by returns keys as tuples
df
.with_columns((pl.col("B") < 80).alias("part"))
.partition_by("part", as_dict=True)
.items()
}
parts[True] = parts[True].with_columns(
pl.col("B").map_batches(
lambda s: pl.Series(min_max_sc.fit_transform(s.to_numpy().reshape(-1, 1)).flatten())
).alias("Test")
)
parts[False] = parts[False].with_columns(
pl.lit(5.0).alias("Test")
)
pl.concat(parts.values()).drop("part")
这个是我最喜欢的。我们可以创建一个创建极坐标表达式的函数,它是您需要的
min_max
缩放函数。这将具有最佳性能。
def min_max_scaler(col: str, predicate: pl.Expr):
x = pl.col(col)
x_min = x.filter(predicate).min()
x_max = x.filter(predicate).max()
# * 3 + 1 to set scale between 1 - 4
return (x - x_min) / (x_max - x_min) * 3 + 1
predicate = pl.col("B") < 80
df.with_columns(
pl.when(predicate)
.then(min_max_scaler("B", predicate))
.otherwise(5).alias("Test")
)