Apache Spark是一个用Scala编写的开源分布式数据处理引擎,为用户提供统一的API和分布式数据集。 Apache Spark的用例通常与机器/深度学习,图形处理有关。
我想读取一个ex:schema_file的文件,它将包含模式,并希望在代码中使用它来创建DataFrame我已经阅读了有关ConfigFactory以提供模式但不能将其用作...
我正在尝试使用spark scala代码来传输Twitter数据。我能够获取数据并创建数据帧并查看它。但是当我尝试提取status.getPlace.getCountry()时,我得到了一个java.lang ....
我正在使用to_utc_timestamp将时间戳转换为UTC时间。我在一个列中有日期时间,在另一列中有时区,当我通过时区时,它表示列不可调用:data_frame.withColumn('...
我有用于在python和R之间交换数据的羽毛格式文件sales.fea。在RI中使用以下命令:df = as.data.frame(feather :: read_feather(“sales.fea”))在python中我...
Spark Structured Streaming 2.2.1中没有按顺序发生到同一数据库接收器的两个Writestream。请建议如何按顺序执行它们。 val deleteSink = ds1.writestream ....
合并火花数据框中的行我有以下ID的数据名称Passport国家许可证UpdatedtimeStamp 1 Ostrich 12345 - ABC 11-02-2018 1 - - ...
我有一个名为values的矢量集合,我正在尝试将其转换为数据帧scala.collection.immutable.Vector [(String,Double)] = Vector((1,1.0),(2,2.4),(3,3.7) ),(4,5.0),(5,4.9))我有......
如何在DataFrame组中执行算术运算在Spark中进行聚合? [重复]
我有一个数据帧如下:val df = Seq((“x”,“y”,1),(“x”,“z”,2),(“x”,“a”,4),(“ x“,”a“,5),(”t“,”y“,1),(”t“,”y2“,6),(”t“,”y3“,3),(”t“ ,“y4”,5))。toDF(“F1”,“F2”,“F3”)+ --- + --- + - ...
将Spark Dataframe转换为Scala Map集合
我正在尝试找到将整个Spark数据帧转换为scala Map集合的最佳解决方案。最好说明如下:从这里开始(在Spark示例中):val df = sqlContext ....
我在第一次爆炸后使用以下命令进行第二次爆炸:myExplode = sqlContext.sql(“从myTable中选择爆炸(名称)作为name_x”)myExplode = sqlContext.sql(“select explode(...
如何在spark sql lag函数中添加if或case条件
需要在spark sql lag函数中添加一些条件,我的数据中有ID和日期,我想得到最近的非滞后日期。 id,日期er1,2018-01-19 er1,null er1,2018-02-10 er2,...
我正在尝试将具有嵌套结构类型(见下文)的DataFrame列扩展为多个列。我正在使用的Struct模式看起来像{“foo”:3,“bar”:{“baz”:2}}。理想情况下,我......
Scala / Spark:如何将此参数传递给.select语句
我有办法得到一个有效的数据框的子集:这工作val subset_cols = {joinCols:+ col} val df1_subset = df1.select(subset_cols.head,subset_cols.tail:_ *)这不起作用:(。 ..
在读取/加载时将原始JSON保留为Spark DataFrame中的列?
在将数据读入Spark DataFrame时,我一直在寻找一种将原始(JSON)数据添加为列的方法。我有一种方法可以通过连接执行此操作,但我希望有一种方法可以在...中执行此操作
是否有通用的方法来读取spark中的多线json。更具体的是火花?
我有一个多行json像这样{“_ id”:{“$ oid”:“50b59cd75bed76f46522c34e”},“student_id”:0,“class_id”:2,“得分”:[{“type”:“考试”, “得分”:57.92947112575566},{“type”:“...
我们正在运行连接到oracle并获取一些数据的spark工作。始终尝试0或1的JDBCRDD任务失败,并出现以下错误。在随后的尝试任务中完成。正如少数人所建议的......
以下是我正在尝试实现的操作:types = [“200”,“300”] def Count(ID):cnd = F.when((** F.col(“type”)类型**) ,1).otherwise(F.lit(0))返回F.sum(cnd).alias(“CountTypes”)...
我想使用sparklyr包打开gz文件,因为我在R上使用Spark。我知道我可以使用read.delim2(gzfile(“filename.csv.gz”),sep =“,”,header = FALSE )打开gz文件,我可以用...
我运行一个火花工作,它记录了该过程的进展情况。最后,它给出了两种类型的时间,指的是完成时间。这两种类型有什么区别。这读了......
在Spark Dataframe中的列列表中添加一列rowums
我有一个包含多个列的Spark数据帧。我想在数据帧上添加一列,它是一定数量的列的总和。例如,我的数据如下所示:ID var1 var2 var3 ...