KeyError:''空字符串错误Pyspark(Spark RDD)

问题描述 投票:0回答:1

我正在做一个简单的练习,根据一个共同朋友边缘列表图形推荐新朋友,并在某些过滤条件下计算每个特定用户的前20个共同朋友。我正在使用Spark RDD执行此任务。

我在all_friends的边缘列表下方,该列表将朋友列表边缘存储为键值对。该图是无向的,因此对于每个('0', '1'),也会出现('1', '0')

    all_friends.take(4)
[('0', '1'), ('0', '2'), ('1', '0'), ('1', '3')]

所以我的代码的一部分包含以下内容:

    from collections import Counter
results = all_friends\
    .join(all_friends)\
    .filter(filter_conditions)\
    .map(lambda af1f2: (af1f2[1][0], af1f2[1][1]))\ #at this point each entry has form [(k,(v1,v2)], hence the lambda expression
    .groupByKey()\
    .mapValues(lambda v: Counter(v).most_common(20))

但是,在映射后,我得到一个KeyError,如下所述。如果将.keys().collect()放在地图后,也会发生这种情况。这很奇怪,因为我不确定为什么火花在原始rdd中显然不存在的情况下正在寻找键”(空字符串)。我不确定是否与完整的外部联接有关。有人可以建议吗?

 An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 78.0 failed 3 times, most recent failure: Lost task 1.2 in stage 78.0 (TID 291, 100.103.89.116, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-155-140ba198945e>", line 2, in <lambda>
KeyError: ''
python apache-spark pyspark rdd
1个回答
1
投票

filter_conditions条件看起来不正确。这是带有伪过滤器代码的工作代码

from pyspark import SparkConf
from pyspark.sql import SparkSession
from collections import Counter

conf = SparkConf().setAppName('Python Spark').set("spark.executor.memory", "1g")
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()

all_friends = spark_session.sparkContext.parallelize([('0', '1'), ('0', '2'), ('1', '0'), ('1', '3'), ('1', '3')])


# [('0', '1'), ('0', '2'), ('1', '0'), ('1', '3')]

# print(all_friends.take(4).collect())

def filter_conditions(c):
    if c[0] == '1':
        return c


results = all_friends.join(all_friends).filter(filter_conditions).map(
    lambda af1f2: (af1f2[1][0], af1f2[1][1])).groupByKey().mapValues(lambda v: Counter(v).most_common(20))

print(results.collect())

输出

[('3', [('3', 4), ('0', 2)]), ('0', [('3', 2), ('0', 1)])]
© www.soinside.com 2019 - 2024. All rights reserved.