将 RDD (pyspark) 保存并加载到 pickle 文件正在更改 SparseVectors 的顺序

问题描述 投票:0回答:1

我在预分词(unigram tokenizer)数据集上训练了 tf-idf,使用 pyspark 的 HashingTF 和 IDF 实现将其从

list[list(token1, token2, token3, ...)]
转换为 RDD。我尝试使用 tf-idf 值保存 RDD,但是当我将输出保存到文件然后从文件加载它时。加载的文件输出一个 RDD,它是原始保存的 RDD,但 SparseVector 的顺序现在似乎是随机的一个作为 RDD 中的第一个,然后分配正确的顺序。

我的代码中所有重要的部分:

from pyspark.mllib.feature import HashingTF, IDF
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tf-idf').getOrCreate()
sc = spark.sparkContext

data = load_from_disk("pre cleaned data")

tokenizer = Tokenizer.from_file("pre trained tokenizer")
tokenized_data = tokenizer.encode_batch(data["content"])
tokenized_data = [doc.tokens for doc in tokenized_data] #converting tokenized data to ONLY list of each document tokenized

rdd_data = sc.parallelize(tokenized_data) #converting to RDD so it works with IDF

hashingTF = HashingTF(numFeatures = 1<<21)
htf_data = hashingTF.transform(rdd_data)

idf = IDF().fit(htf_data)
tfidf_data = idf.transform(htf_data)


tfidf_data.saveAsPickleFile("some/path")

print(tfidf_data.collect()) # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...]


# ----- pretend like you are in a new function or file now -----


spark = SparkSession.builder.appName('tf-idf').getOrCreate()
sc = spark.sparkContext

ti = sc.pickleFile("some/path")

print(ti.collect())  # Outputs a list of sparse vectors containing numFeatures and a dictionary of hash and tf-idf values, looks like this: list[SparseVector(NumFeatures, {hash_value: tf-idf_value, ...}), ...] HOWEVER this time the order of the SparseVectors is not the same as the order when originally saved, but all the SparseVectors still exist somewhere in the RDD (I checked this, it just seemingly randomizes the order for some reason when loading the pickle file)

为了说明发生的情况,如果我们用从 0 开始的 id 标记每个 SparseVector(假设我们有 6 个,在我的例子中为 8600)。那么在原始 RDD 中,索引将为 0, 1, 2, 3, 4, 5。在从 pickle 文件读取的 RDD 中,假设每个 SparseVector 都分配了相同的 id,我们现在得到 3, 4, 5, 0, 1, 2. 看起来,一个随机 SparseVector 现在是 RDD 中的第一个,然后第二个 SparseVector 就是原始 SparseVector 后面的一个,依此类推,一旦到达原始 SparseVector 中的最后一个,它就会循环回到第一个。

python python-3.x pyspark rdd tf-idf
1个回答
0
投票

我认为,如果您想将 RDD 保存到文件然后将其加载回来,Spark 不保证保留元素的顺序。

您需要显式地将每个元素与索引配对,并在加载后按该索引对元素进行排序。

这种方法有助于订单继续,而无需考虑数据如何跨节点分区和保存。

详细信息:https://spark.apache.org/docs/latest/rdd-programming-guide.html

我相信,这段代码可以工作。


from pyspark.mllib.feature import HashingTF, IDF
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('tf-idf').getOrCreate()
sc = spark.sparkContext


data = load_from_disk("pre cleaned data")

# Tokenize data
tokenizer = Tokenizer.from_file("pre trained tokenizer")
tokenized_data = tokenizer.encode_batch(data["content"])
tokenized_data = [doc.tokens for doc in tokenized_data]  # Converting tokenized data to ONLY list of each document tokenized

# Convert to RDD
rdd_data = sc.parallelize(tokenized_data)

# Apply HashingTF
hashingTF = HashingTF(numFeatures=1<<21)
htf_data = hashingTF.transform(rdd_data)

# Fit and transform using IDF
idf = IDF().fit(htf_data)
tfidf_data = idf.transform(htf_data)

# Pair SparseVectors with their indices
indexed_tfidf_data = tfidf_data.zipWithIndex().map(lambda x: (x[1], x[0]))

# Save the indexed RDD as a pickle file
indexed_tfidf_data.saveAsPickleFile("some/path")

# Load the indexed RDD from the pickle file
loaded_indexed_tfidf_data = sc.pickleFile("some/path")

# Sort the loaded RDD by the indices
sorted_tfidf_data = loaded_indexed_tfidf_data.sortByKey().map(lambda x: x[1])

# Collect and print the sorted RDD
print(sorted_tfidf_data.collect())
© www.soinside.com 2019 - 2024. All rights reserved.