使用 pyspark 进行权重采样

问题描述 投票:0回答:4

我使用 PySpark 在 Spark 上有一个不平衡的数据帧。 我想重新采样以使其平衡。 我只在 PySpark 中找到了示例函数

sample(withReplacement, fraction, seed=None)

但我想对单位体积权重的数据帧进行采样 在Python中,我可以这样做

df.sample(n,Flase,weights=log(unitvolume))

有什么方法可以使用 PySpark 做同样的事情吗?

python apache-spark pyspark sampling
4个回答
3
投票

Spark 提供了分层采样工具,但这仅适用于分类数据。您可以尝试将其分桶:

from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col, log

df_log = df.withColumn("log_unitvolume", log(col("unitvolume"))
splits = ... # A list of splits

bucketizer = Bucketizer(splits=splits, inputCol="log_unitvolume", outputCol="bucketed_log_unitvolume")

df_log_bucketed = bucketizer.transform(df_log)

计算统计数据:

counts = df.groupBy("bucketed_log_unitvolume")
fractions  = ...  # Define fractions from each bucket:

并使用这些进行采样:

df_log_bucketed.sampleBy("bucketed_log_unitvolume", fractions)

您还可以尝试将

log_unitvolume
重新调整为 [0, 1] 范围,然后:

from pyspark.sql.functions import rand 

df_log_rescaled.where(col("log_unitvolume_rescaled") < rand())

1
投票

我认为完全忽略 .sample() 函数会更好。使用统一随机数生成器可以实现无放回采样:

import pyspark.sql.functions as F

n_samples_appx = 100
total_weight = df.agg(F.sum('weight')).collect().values
df.filter(F.rand(seed=843) < F.col('weight') / total_weight * n_samples_appx)

这将随机包含/排除数据集中的行,这通常与替换采样相当。如果您的 RHS 超过 1,您应该小心解释——加权采样是一个微妙的过程,严格来说,只能在替换时执行。

因此,如果您想进行替换采样,您可以使用 F.rand() 来获取泊松分布的样本,它将告诉您要包含该行的多少个副本,并且您可以将该值视为权重,或者做一些烦人的连接和联合来重复您的行。但我发现这通常不是必需的。

您还可以使用哈希以可移植、可重复的方式执行此操作:

import pyspark.sql.functions as F

n_samples_appx = 100
total_weight = df.agg(F.sum('weight')).collect().values
df.filter(F.hash(F.col('id')) % (total_weight / n_samples_appx * F.col('weight')).astype('int') == 0)

这将以 1 模的速率进行采样,其中包含您的体重。

hash()
应该是一个一致且确定性的函数,但采样会像随机一样发生。


0
投票

这是一个函数,它从 PySpark DataFrame 中精确采样

n
行,其中权重列包含正权重(整数或浮点)。它使用
orderBy
limit

步骤:

  1. 通过将权重乘以从均匀分布中采样的数字来计算每行的排名。
  2. 按排名对行进行排序(降序)
  3. 使用 limit 将重新排序的 DataFrame 裁剪到前
    n
    行。

这是函数:

def weighted_sample(df: DataFrame, n: int, weight_col: str, seed=None) -> DataFrame:
    """
    Weighted sampling without replacement of n rows from a dataframe. Weights must be positive numbers.

    Parameters
    ----------
    df: DataFrame
        The dataframe to sample from
    n: int
        The number of rows to sample
    weights_col: str
        The name of the column that contains weights ⊆ R+
    """
    assert df.agg(F.min(weight_col)).collect()[0][0] > 0, 'weights must be positive'
    rank = (F.col(weight_col) * F.rand(seed=seed)).desc()
    return df.orderBy(rank).limit(n)

-1
投票

一种方法是使用

udf
制作采样柱。此列将包含一个随机数乘以您所需的体重。然后我们按采样列排序,取前N个。

考虑以下说明性示例:

创建虚拟数据

import numpy as np
import string
import pyspark.sql.functions as f

index = range(100)
weights = [i%26 for i in index]
labels = [string.ascii_uppercase[w] for w in weights]

df = sqlCtx.createDataFrame(
    zip(index, labels, weights),
    ('index', 'label', 'weight')
)

df.show(n=5)
#+-----+-----+------+
#|index|label|weight|
#+-----+-----+------+
#|    0|    A|     0|
#|    1|    B|     1|
#|    2|    C|     2|
#|    3|    D|     3|
#|    4|    E|     4|
#+-----+-----+------+
#only showing top 5 rows

添加采样列

在此示例中,我们希望使用列

weight
作为权重对 DataFrame 进行采样。我们定义一个
udf
,使用
numpy.random.random()
生成均匀随机数并乘以权重。然后我们在此列上使用
sort()
并使用
limit()
来获取所需的样本数量。

N = 10  # the number of samples

def get_sample_value(x):
    return np.random.random() * x

get_sample_value_udf = f.udf(get_sample_value, FloatType())

df_sample = df.withColumn('sampleVal', get_sample_value_udf(f.col('weight')))\
    .sort('sampleVal', ascending=False)\
    .select('index', 'label', 'weight')\
    .limit(N)

结果

正如预期的那样,DataFrame

df_sample
有 10 行,其内容往往包含靠近字母表末尾的字母(权重较高)。

df_sample.count()
#10

df_sample.show()
#+-----+-----+------+
#|index|label|weight|
#+-----+-----+------+
#|   23|    X|    23|
#|   73|    V|    21|
#|   46|    U|    20|
#|   25|    Z|    25|
#|   19|    T|    19|
#|   96|    S|    18|
#|   75|    X|    23|
#|   48|    W|    22|
#|   51|    Z|    25|
#|   69|    R|    17|
#+-----+-----+------+
© www.soinside.com 2019 - 2024. All rights reserved.