Apache Spark是一个用Scala编写的开源分布式数据处理引擎,为用户提供统一的API和分布式数据集。 Apache Spark的用例通常与机器/深度学习,图形处理有关。
如何在DataFrame组中执行算术运算在Spark中进行聚合? [重复]
我有一个数据帧如下:val df = Seq((“x”,“y”,1),(“x”,“z”,2),(“x”,“a”,4),(“ x“,”a“,5),(”t“,”y“,1),(”t“,”y2“,6),(”t“,”y3“,3),(”t“ ,“y4”,5))。toDF(“F1”,“F2”,“F3”)+ --- + --- + - ...
将Spark Dataframe转换为Scala Map集合
我正在尝试找到将整个Spark数据帧转换为scala Map集合的最佳解决方案。最好说明如下:从这里开始(在Spark示例中):val df = sqlContext ....
我在第一次爆炸后使用以下命令进行第二次爆炸:myExplode = sqlContext.sql(“从myTable中选择爆炸(名称)作为name_x”)myExplode = sqlContext.sql(“select explode(...
如何在spark sql lag函数中添加if或case条件
需要在spark sql lag函数中添加一些条件,我的数据中有ID和日期,我想得到最近的非滞后日期。 id,日期er1,2018-01-19 er1,null er1,2018-02-10 er2,...
我正在尝试将具有嵌套结构类型(见下文)的DataFrame列扩展为多个列。我正在使用的Struct模式看起来像{“foo”:3,“bar”:{“baz”:2}}。理想情况下,我......
Scala / Spark:如何将此参数传递给.select语句
我有办法得到一个有效的数据框的子集:这工作val subset_cols = {joinCols:+ col} val df1_subset = df1.select(subset_cols.head,subset_cols.tail:_ *)这不起作用:(。 ..
在读取/加载时将原始JSON保留为Spark DataFrame中的列?
在将数据读入Spark DataFrame时,我一直在寻找一种将原始(JSON)数据添加为列的方法。我有一种方法可以通过连接执行此操作,但我希望有一种方法可以在...中执行此操作
是否有通用的方法来读取spark中的多线json。更具体的是火花?
我有一个多行json像这样{“_ id”:{“$ oid”:“50b59cd75bed76f46522c34e”},“student_id”:0,“class_id”:2,“得分”:[{“type”:“考试”, “得分”:57.92947112575566},{“type”:“...
我们正在运行连接到oracle并获取一些数据的spark工作。始终尝试0或1的JDBCRDD任务失败,并出现以下错误。在随后的尝试任务中完成。正如少数人所建议的......
以下是我正在尝试实现的操作:types = [“200”,“300”] def Count(ID):cnd = F.when((** F.col(“type”)类型**) ,1).otherwise(F.lit(0))返回F.sum(cnd).alias(“CountTypes”)...
我想使用sparklyr包打开gz文件,因为我在R上使用Spark。我知道我可以使用read.delim2(gzfile(“filename.csv.gz”),sep =“,”,header = FALSE )打开gz文件,我可以用...
我运行一个火花工作,它记录了该过程的进展情况。最后,它给出了两种类型的时间,指的是完成时间。这两种类型有什么区别。这读了......
在Spark Dataframe中的列列表中添加一列rowums
我有一个包含多个列的Spark数据帧。我想在数据帧上添加一列,它是一定数量的列的总和。例如,我的数据如下所示:ID var1 var2 var3 ...
我如何能够找到匹配字符串的出现,如下面的代码片段,我能够将过滤后的字符串作为输出,但不是出现导入org.apache.spark._ import org ....
我想在full_outer_join的基础上加入两个数据帧,并尝试在连接的结果集中添加一个新列,它告诉我匹配的记录,单独的左数据框中的不匹配记录和...
这个答案很好地解释了如何使用pyspark的groupby和pandas_udf来进行自定义聚合。但是,我不可能手动声明我的架构,如示例的这一部分所示......
如何在spark shell中注册Java SPark UDF?
下面是我的java udf代码,包com.udf; import org.apache.spark.sql.api.java.UDF1;公共类SparkUDF实现UDF1 {@Override public String call(String arg)...
我想使用Spark来解析网络消息,并以有状态的方式将它们分组为逻辑实体。问题描述假设每条消息都在输入数据帧的一行中,......
当我打印出我的rdd的第一个元素如下:print(“input = {}”。format(input.take(1)[0]))我得到一个结果:(u'motor',[0.001, ...,0.9])[0.001,...,0.9]的类型是一个列表。 ...
pyspark读取csv文件multiLine选项不适用于具有换行符spark2.3和spark2.2的记录
我正在尝试使用pyspark csv reader读取dat文件,它包含换行符(“\ n”)作为数据的一部分。 Spark无法将此文件作为单列读取,而是将其视为新的...