apache-spark 相关问题

Apache Spark是一个用Scala编写的开源分布式数据处理引擎,为用户提供统一的API和分布式数据集。 Apache Spark的用例通常与机器/深度学习,图形处理有关。

如何从代码外部提供spark / scala中的模式

我想读取一个ex:schema_file的文件,它将包含模式,并希望在代码中使用它来创建DataFrame我已经阅读了有关ConfigFactory以提供模式但不能将其用作...

回答 1 投票 0

Twitter使用Spark流式传输

我正在尝试使用spark scala代码来传输Twitter数据。我能够获取数据并创建数据帧并查看它。但是当我尝试提取status.getPlace.getCountry()时,我得到了一个java.lang ....

回答 2 投票 0

传递列以转换为时区值

我正在使用to_utc_timestamp将时间戳转换为UTC时间。我在一个列中有日期时间,在另一列中有时区,当我通过时区时,它表示列不可调用:data_frame.withColumn('...

回答 1 投票 0

如何原生阅读羽毛文件?

我有用于在python和R之间交换数据的羽毛格式文件sales.fea。在RI中使用以下命令:df = as.data.frame(feather :: read_feather(“sales.fea”))在python中我...

回答 3 投票 7

Spark结构化流多个WriteStream到同一个接收器

Spark Structured Streaming 2.2.1中没有按顺序发生到同一数据库接收器的两个Writestream。请建议如何按顺序执行它们。 val deleteSink = ds1.writestream ....

回答 1 投票 2

合并spark scala Dataframe中的行

合并火花数据框中的行我有以下ID的数据名称Passport国家许可证UpdatedtimeStamp 1 Ostrich 12345 - ABC 11-02-2018 1 - - ...

回答 2 投票 3

将Vector集合转换为dataframe时出现架构错误

我有一个名为values的矢量集合,我正在尝试将其转换为数据帧scala.collection.immutable.Vector [(String,Double)] = Vector((1,1.0),(2,2.4),(3,3.7) ),(4,5.0),(5,4.9))我有......

回答 1 投票 0

如何在DataFrame组中执行算术运算在Spark中进行聚合? [重复]

我有一个数据帧如下:val df = Seq((“x”,“y”,1),(“x”,“z”,2),(“x”,“a”,4),(“ x“,”a“,5),(”t“,”y“,1),(”t“,”y2“,6),(”t“,”y3“,3),(”t“ ,“y4”,5))。toDF(“F1”,“F2”,“F3”)+ --- + --- + - ...

回答 2 投票 -1

将Spark Dataframe转换为Scala Map集合

我正在尝试找到将整个Spark数据帧转换为scala Map集合的最佳解决方案。最好说明如下:从这里开始(在Spark示例中):val df = sqlContext ....

回答 2 投票 8

SparkSQL在第一次爆炸后第二次爆炸

我在第一次爆炸后使用以下命令进行第二次爆炸:myExplode = sqlContext.sql(“从myTable中选择爆炸(名称)作为name_x”)myExplode = sqlContext.sql(“select explode(...

回答 1 投票 0

如何在spark sql lag函数中添加if或case条件

需要在spark sql lag函数中添加一些条件,我的数据中有ID和日期,我想得到最近的非滞后日期。 id,日期er1,2018-01-19 er1,null er1,2018-02-10 er2,...

回答 1 投票 0

如何将嵌套的Struct列展开为多列?

我正在尝试将具有嵌套结构类型(见下文)的DataFrame列扩展为多个列。我正在使用的Struct模式看起来像{“foo”:3,“bar”:{“baz”:2}}。理想情况下,我......

回答 2 投票 2

Scala / Spark:如何将此参数传递给.select语句

我有办法得到一个有效的数据框的子集:这工作val subset_cols = {joinCols:+ col} val df1_subset = df1.select(subset_cols.head,subset_cols.tail:_ *)这不起作用:(。 ..

回答 1 投票 -3

在读取/加载时将原始JSON保留为Spark DataFrame中的列?

在将数据读入Spark DataFrame时,我一直在寻找一种将原始(JSON)数据添加为列的方法。我有一种方法可以通过连接执行此操作,但我希望有一种方法可以在...中执行此操作

回答 2 投票 3

是否有通用的方法来读取spark中的多线json。更具体的是火花?

我有一个多行json像这样{“_ id”:{“$ oid”:“50b59cd75bed76f46522c34e”},“student_id”:0,“class_id”:2,“得分”:[{“type”:“考试”, “得分”:57.92947112575566},{“type”:“...

回答 1 投票 0

Spark Job在第一次尝试时无法连接到oracle

我们正在运行连接到oracle并获取一些数据的spark工作。始终尝试0或1的JDBCRDD任务失败,并出现以下错误。在随后的尝试任务中完成。正如少数人所建议的......

回答 3 投票 0

PySpark列表中的项目

以下是我正在尝试实现的操作:types = [“200”,“300”] def Count(ID):cnd = F.when((** F.col(“type”)类型**) ,1).otherwise(F.lit(0))返回F.sum(cnd).alias(“CountTypes”)...

回答 1 投票 1

如何在R中使用sparklyr打开“GZ FILE”?

我想使用sparklyr包打开gz文件,因为我在R上使用Spark。我知道我可以使用read.delim2(gzfile(“filename.csv.gz”),sep =“,”,header = FALSE )打开gz文件,我可以用...

回答 1 投票 1

Spark运行日志中两个DAG调度程序时间有什么区别?

我运行一个火花工作,它记录了该过程的进展情况。最后,它给出了两种类型的时间,指的是完成时间。这两种类型有什么区别。这读了......

回答 1 投票 1

在Spark Dataframe中的列列表中添加一列rowums

我有一个包含多个列的Spark数据帧。我想在数据帧上添加一列,它是一定数量的列的总和。例如,我的数据如下所示:ID var1 var2 var3 ...

回答 4 投票 16

© www.soinside.com 2019 - 2024. All rights reserved.