rdd 相关问题

弹性分布式数据集(RDD)是一种分布式内存抽象,允许程序员在大型集群上执行内存计算,同时保留MapReduce等数据流模型的容错能力。

Pyspark RDDReducebyKey()

`嗨,我想了解我的代码中的问题出在哪里。 #代码 rdd_avg=sc.parallelize([("a",10),("b",15),("c",20),("a",8)]) rdd_sp1=rdd_avg.map(lambd...

回答 1 投票 0

如何将RDD元素类型从list改为Int?

目前,我的 RDD 看起来像 [[2475], [1900], [2300]] 我希望我的 RDD 是这样的 [2475, 1900, 2300]

回答 2 投票 0

spark Scala 中基于自定义文件大小的分区器

我有一个要求,我有一个 s3 文件路径列表及其文件大小 Seq[(String, Int)],所以我创建了一个相同的 RDD val rdd: RDD[(String, Int)] = driverContext.sc.parallelize(

回答 1 投票 0

PySpark 中分组数据的直方图

我有由日期时间、ID 和速度组成的数据,我希望使用 PySpark 获取每个 ID 的速度直方图数据(起点/终点和计数)。样本数据: df = Spark.createDat...

回答 1 投票 0

ValueError:RDD 为空——Pyspark(Windows 独立版)

我正在尝试创建一个RDD,但spark没有创建它,抛出错误,粘贴在下面; 数据=记录.map(lambda r:LabeledPoint(extract_label(r),extract_features(r))) 第一个点=数据.第一个...

回答 2 投票 0

多次行动会引发失败

我是 Spark 新手。 我在将 df 保存到 Hive 表的部分遇到了一些问题。 def insert_into_hive_table(df: DataFrame, table_name: str): # 用于调试 - 此操作正在运行...

回答 1 投票 0

rdd.zipWithIndex() 在非常大的数据集上抛出 IllegalArgumentException

我正在 Azure Databricks 中运行 python 笔记本。尝试使用 rdd.zipWithIndex() 添加行号时出现 IllegalArgumentException 错误。该文件大小为 2.72 GB,有 1238951 行(我

回答 2 投票 0

如何在 Apache Spark 中对整数列表进行排序?

最近我开始使用 Apache Spark 对大量数据进行排序。 在我的初始测试中,我尝试在 PySpark 上并行对整数列表进行排序,但显然使用

回答 2 投票 0

(Spark 3.3.2 OpenJDK19 PySpark Pandas_UDF Python3.10 Ubuntu22.04 Dockerized)测试脚本产生类型错误:'JavaPackage'对象不可调用

我创建了一个安装 Ubuntu 22.04、Python 3.10、Spark 3.3.2、Hadoop 3、Scala 13 和 Open JDK 19 的 docker 容器。 在 AWS 中部署代码之前,我目前正在使用它作为测试环境。 这

回答 0 投票 0

PySpark:如何根据多个条件附加来自其他 pyspark 数据框的新列?

我有pyspark df1 |编号 |名称 |电子邮件|年龄|大学| |---| ------+ --------------+---+--------| |12 |斯塔 |[email protected] |25 |clg1 | |21 |丹尼 |[email protected] |23 |clg2 | |37 |

回答 0 投票 0

PySpark - 读取检查点数据帧

我目前正在使用 pyspark 为机器学习应用程序执行一些数据清理。 最后一个会话崩溃了,但我设置了一个 checkpointdir 并检查了我的 DataFrame。 现在我有

回答 1 投票 0

使用 sc.textFile() 加载本地文件以激发

问题 如何使用 sc.textFile 从本地文件系统加载文件到 Spark?我需要更改任何 -env 变量吗?此外,当我在未安装 Hadoop 的 Windows 上尝试相同操作时,我...

回答 4 投票 0

使用 BinaryClassificationMetrics 时在 Spark 中使用 map 和 reduce 进行并行计算?

我正在尝试通过使用 map 和 reduce 而不是 for 循环来并行化 Spark 中 AUC 的计算。 但是,因为我有另一个 RDD sc.parallelize(recModelPredictionsAndLabels) 而我不能...

回答 0 投票 0

PySpark - 读取文本文件并忽略行内的换行

我有一个格式如下的文本文件: A,"123 主街 林肯, NE 55555",13343 B,"345 学校街",23432 我想将其作为 2 行而不是 3 行摄取,但是有没有办法点燃...

回答 0 投票 0

在 Scala Spark 中从具有嵌套序列的数据集创建 Dataframe 的问题

我正在尝试从包含嵌套序列的序列创建数据框,但出现 scala.Match 错误。 val data = Seq(("Java", Seq(Seq(true, 5L), 0, 7.5)), ("Python&quo...

回答 1 投票 0

为什么 getNumPartitions() 为同一数据集返回不同的值?

我有一个主要数据集。我需要应用一些过滤和扩充方法,这些方法需要一些 groupby 并在此数据集上加入。 当我在完成程序后运行 df.rdd.getNumPartitions() 时......

回答 0 投票 0

如何在 Spark 中计算单次扫描中的字数和对数

我有一个由单词和数字组成的字符串标记数组,我正在尝试在 Apache Spark 中同时计算单个单词、单词-单词对和数字-单词对的计数。我...

回答 0 投票 0

Java Apache Spark RDD 持久化

我有这样的代码: JavaPairRDD A = JavaPairRDD B; B.persist(StorageLevel.MEMORY_AND_DISK()) 稍后,变量 A 可能会或可能不会重新分配给另一个 JavaPairR 的转换...

回答 0 投票 0

如何在此 Pyspark mapreduce 代码中拆分年份?

我需要为每个单词计算每年有多少篇文章包含它。我一直坚持如何用单词来划分年份,因为我不断得到与日期相连的第一个单词,如图所示

回答 2 投票 0

在同一数据框中合并 RDD 映射结果列的方法

我正在 pyspark 代码中进行一次转换并创建新列。我观察到 source_df 正在被新列取代。是否可以将新列与现有数据框合并

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.