rdd 相关问题

弹性分布式数据集(RDD)是一种分布式内存抽象,允许程序员在大型集群上执行内存计算,同时保留MapReduce等数据流模型的容错能力。

如何使用PySpark清理rdd或DataFrame(drop nulls和duplicates)

我是Python / PySpark的新手,在使用Mac终端之前我无法清理数据。我想删除包含空值或重复行的任何行。我用.distinct()和...

回答 1 投票 0

spark根据hive表映射名称和id将数据帧的列从name转换为id

首先,我们在hive中有一个hive表类:id |名字1 |历史2 |艺术...然后我们从mongodb读取一本书集来激发数据帧:bookname |类别欧洲| ...

回答 1 投票 0

如何在没有嵌套转换的情况下创建新的RDD

我想创建一个RDD,其记录格式如下:( trip,(起始站详细信息),(结束站详细信息))import org.apache.spark._ val input1 = sc.textFile(“data / trips / * “)......

回答 1 投票 -2

ReduceByKey on元组的Iterable值

我试图计算特定日期的特定项目的外观。我的输入结构是Date \ tItem1:AppearencesOfItem1,...,ItemN:AppearencesOfItemN例20/10/2000 \ tItem1:1,Item2:5 20 / ...

回答 2 投票 2

将Spark DataFrame转换为HashMaps的HashMap

我的数据框如下所示:column1_ID column2 column3 column4 A_123 12 A 1 A_123 12 B 2 A_123 23 A 1 B_456 56 DB 4 B_456 ...

回答 2 投票 1

Spark scala使用Row和Schema从rdd创建dataFrame

我正在尝试从RDD创建一个数据帧,以便能够使用以下格式写入json。示例json如下所示(预期输出)“1234”:[{loc:'abc',...

回答 2 投票 0

如何在多个RDD上使用groupByKey()?

我有多个RDD,其中包含一个公共字段CustomerId。例如:debitcardRdd的数据为(CustomerId,debitField1,debitField2,......),creditcardRdd的数据为(CustomerId,creditField1,...

回答 2 投票 0

如何删除csv文件中的最后一行

我是新来的火花我想从csv文件中删除标题和最后一行注释xyz“id”,“member_id”“60045257”,“63989975”,“60981766”,“65023535”,总金额:4444228900 ...

回答 1 投票 0

Spark RDD map和mappartitions,生成的顺序是一样的吗?

以下内容:rdd.map(x => xx)与rdd.mapPartions(x => x.map(c => cc))将在两种情况下生成的转换后的RDD具有相同的顺序吗?

回答 1 投票 0

计算标记到该键的集合的每个值的键数

我有一对像这样的RDD:id值id1 set(1232,3,1,93,35)id2 set(321,42,5,13)id3 set(1233,3,5)id4 set(1232,56, 3,35,5)现在,我想得到包含在......中的每个值的总计数。

回答 2 投票 1

Pyspark:数据框的词典列表

我有一个数据框,其中我有一列,每行包含一个字典列表:[Row(payload = u“[{'key1':'value1'},{'key2':'value2'},{'key3 ':'value3'},{...}]“),行(有效负载= u”[{'key1':'...

回答 2 投票 0

无法使用scala在spark中使用groupByKey对2个值执行聚合

这个问题是关于使用scala的spark中的groupByKey()。考虑下面的数据名称,标记,价值克里斯,30,1克里斯,35,1罗伯特,12,1罗伯特,20,1创建低于rdd val dataRDD = sc.parallelize(List(...

回答 2 投票 1

Pyspark - 基于RDD中的键的总和[重复]

我有一个csv文件,如下所示:ID,NAME,SUBJECT,MARKS 1,ABC,ECONOMICS,50 1,ABC,SCIENCE,60 1,ABC,ENGLISH,70 2,XYZ,ECONOMICS,50 2,XYZ,ENGLISH, 40 2,XYZ,SCIENCE,65我能够将其加载到RDD ...

回答 1 投票 -2

如何彻底驱逐Spark中持久的“流块”

- 更新 - 我们发现当遇到“com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException时出现以下情况:无法更新检查点 - 实例不保留租约...

回答 1 投票 1

无法使用take()操作打印RDD内容

当我尝试使用first()动作打印RDD内容时,我可以使用foreach循环打印它。但是使用take()动作它不会打印出内容。使用first()myRDD.first()。foreach(println)1 2013 -...

回答 1 投票 0

什么是在不相交集上对行进行聚类的正确JavaRDD转换

我在JavaPairRDD中设置了行 其中MyPojo是一个带有属性的pojo(让我们称之为HashSet 值)。现在我想基于任何...集群(合并)我的行...

回答 1 投票 0

如何在pyspark做年龄段?

假设我有一个带有数字列Age的数据框。我想根据Age Something中的值生成一个新列:Age |新时代范围-------------------------------...

回答 2 投票 0

如何使用Spark查找中位数和分位数

如何使用分布式方法,IPython和Spark查找整数RDD的中位数? RDD大约为700,000个元素,因此太大而无法收集和找到中位数。这个...

回答 4 投票 55

如何根据pyspark中的条件组合dataFrame中的行

我必须为应用程序处理包含日志(入口和出口)的数据帧。数据如下:USER | DATETIME | IN_OUT --------------------------------- 0002 2018/08/28 12:00 ...

回答 1 投票 0

pyspark中的函数input()

我的问题在于,当我输入p的值时,没有任何事情发生,它没有追求执行:有没有办法解决它?从pyspark导入导入系统SparkContext sc = SparkContext(“local”,“...

回答 2 投票 2

© www.soinside.com 2019 - 2024. All rights reserved.