Pyspark:洗牌RDD

问题描述 投票:7回答:2

我正在尝试随机调整RDD中元素的顺序。我目前的方法是用洗牌整数的 RDD 压缩元素,然后用这些整数连接。

然而,pyspark只用了1亿个整数就结束了。我使用的是下面的代码。

我的问题是:有没有更好的方法,要么用随机索引进行压缩,要么用其他方式进行洗牌?

我试过用随机键排序,可以用,但很慢。

def random_indices(n):
    """
    return an iterable of random indices in range(0,n)
    """
    indices = range(n)
    random.shuffle(indices)
    return indices

在 pyspark 中发生了以下情况。

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41)
SparkContext available as sc.
>>> import clean
>>> clean.sc = sc
>>> clean.random_indices(100000000)
Killed
python hadoop apache-spark bigdata pyspark
2个回答
5
投票

一种可能的方法是使用以下方法添加随机键 mapParitions

import os
import numpy as np

swap = lambda x: (x[1], x[0])

def add_random_key(it):
    # make sure we get a proper random seed
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator
    rs = np.random.RandomState(seed)
    # Could be randint if you prefer integers
    return ((rs.rand(), swap(x)) for x in it)

rdd_with_keys = (rdd
  # It will be used as final key. If you don't accept gaps 
  # use zipWithIndex but this should be cheaper 
  .zipWithUniqueId()
  .mapPartitions(add_random_key, preservesPartitioning=True))

接下来可以重新分区,对每个分区进行排序,提取值。

n = rdd.getNumPartitions()
(rdd_with_keys
    # partition by random key to put data on random partition 
    .partitionBy(n)
    # Sort partition by random value to ensure random order on partition
    .mapPartitions(sorted, preservesPartitioning=True)
    # Extract (unique_id, value) pairs
    .values())

如果每个分区的排序还是太慢的话,可以用Fisher -Yates shuffle来代替。

如果你只是需要一个随机数据,那么你可以使用 mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs

RandomRDDs.uniformRDD(sc, n)

理论上可以用输入的方式进行压缩。rdd 但它需要匹配每个分区的元素数量。


-1
投票

pyspark工作了!

from random import randrange
data_rnd = data.sortBy(lambda x: randrange(1000000))

© www.soinside.com 2019 - 2024. All rights reserved.