我只是想知道在Apache Spark中RDD
和DataFrame
(Spark 2.0.0 DataFrame只是Dataset[Row]
的类型别名)有什么区别?
你能把一个转换成另一个吗?
通过谷歌搜索“DataFrame定义”来定义DataFrame
:
数据框是一个表或二维数组结构,其中每列包含一个变量的测量值,每行包含一个案例。
因此,DataFrame
由于其表格格式而具有额外的元数据,这允许Spark在最终查询上运行某些优化。
另一方面,RDD
仅仅是一个弹性分布式数据集,它更像是一个无法优化的数据黑盒,因为可以对其执行的操作不受约束。
但是,您可以通过其RDD
方法从DataFrame转到rdd
,您可以通过RDD
方法从DataFrame
转到toDF
(如果RDD是表格格式)
通常,由于内置的查询优化,建议尽可能使用DataFrame
。
Dataframe是Row对象的RDD,每个对象代表一条记录。 Dataframe还知道其行的模式(即数据字段)。虽然Dataframes看起来像常规RDD,但在内部它们以更有效的方式存储数据,利用其架构。此外,它们还提供RDD上不可用的新操作,例如运行SQL查询的功能。可以从外部数据源,查询结果或常规RDD创建数据帧。
参考文献:Zaharia M.,et al。学习星火(O'Reilly,2015)
我希望它有所帮助!
您可以将RDD与结构化和非结构化一起使用,其中Dataframe / Dataset只能处理结构化和半结构化数据(它具有适当的模式)
DataFrame是具有架构的RDD。您可以将其视为关系数据库表,因为每列都有一个名称和一个已知类型。 DataFrames的强大之处在于,当您从结构化数据集(Json,Parquet ..)创建DataFrame时,Spark能够通过对整个(Json,Parquet ..)数据集进行传递来推断模式。被装载。然后,在计算执行计划时,Spark可以使用模式并进行更好的计算优化。请注意,在Spark v1.3.0之前,DataFrame称为SchemaRDD
所有优秀的答案和使用每个API都有一些权衡。数据集是为了解决很多问题而构建的超级API,但很多时候如果你了解你的数据并且如果处理算法被优化以便在单次传递到大数据时做很多事情,则RDD仍然效果最好,那么RDD似乎是最佳选择。
使用数据集API的聚合仍然消耗内存,并且随着时间的推移会变得更好。
第一件事是
DataFrame
是从SchemaRDD
演变而来的。
是的.. Dataframe
和RDD
之间的转换是绝对可能的。
以下是一些示例代码段。
df.rdd
是RDD[Row]
以下是一些创建数据框的选项。
yourrddOffrow.toDF
转换为DataFrame
。createDataFrame
val df = spark.createDataFrame(rddOfRow, schema)
架构可以来自以下某些选项as described by nice SO post.. 从scala案例类和scala反射api
import org.apache.spark.sql.catalyst.ScalaReflection val schema = ScalaReflection.schemaFor[YourScalacaseClass].dataType.asInstanceOf[StructType]
或使用
Encoders
import org.apache.spark.sql.Encoders val mySchema = Encoders.product[MyCaseClass].schema
如Schema所描述,也可以使用
StructType
和StructField
创建val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("col1", DoubleType, true)) .add(StructField("col2", DoubleType, true)) etc...
In fact there Are Now 3 Apache Spark APIs..
RDD
API:自1.0发布以来,
RDD
(Resilient Distributed Dataset)API一直在Spark中。
RDD
API提供了许多转换方法,例如map
(),filter
()和reduce
(),用于对数据执行计算。这些方法中的每一种都产生代表转换数据的新RDD
。但是,这些方法只是定义要执行的操作,并且在调用操作方法之前不会执行转换。动作方法的示例是collect
()和saveAsObjectFile
()。
RDD示例:
rdd.filter(_.age > 21) // transformation
.map(_.last)// transformation
.saveAsObjectFile("under21.bin") // action
示例:使用RDD按属性过滤
rdd.filter(_.age > 21)
DataFrame
APISpark 1.3引入了一个新的
DataFrame
API,作为Project Tungsten计划的一部分,旨在提高Spark的性能和可扩展性。DataFrame
API引入了描述数据的模式概念,允许Spark管理模式,只在节点之间传递数据,比使用Java序列化更有效。
DataFrame
API与RDD
API完全不同,因为它是用于构建Spark的Catalyst优化器然后可以执行的关系查询计划的API。对于熟悉构建查询计划的开发人员而言,API很自然
示例SQL样式:
df.filter("age > 21");
限制:因为代码是按名称引用数据属性,所以编译器无法捕获任何错误。如果属性名称不正确,则只有在创建查询计划时才会在运行时检测到错误。
DataFrame
API的另一个缺点是它非常以scala为中心,虽然它支持Java,但支持有限。
例如,当从现有的DataFrame
的Java对象创建RDD
时,Spark的Catalyst优化器无法推断架构并假设DataFrame中的任何对象都实现了scala.Product
接口。 Scala case class
开箱即用,因为他们实现了这个界面。
Dataset
API
Dataset
API作为Spark 1.6中的API预览发布,旨在提供两全其美的优势;熟悉的面向对象编程风格和RDD
API的编译时类型安全性,但具有Catalyst查询优化器的性能优势。数据集也使用与DataFrame
API相同的高效堆外存储机制。在序列化数据时,
Dataset
API具有编码器的概念,可在JVM表示(对象)和Spark的内部二进制格式之间进行转换。 Spark具有非常先进的内置编码器,它们生成字节代码以与堆外数据交互,并提供对各个属性的按需访问,而无需对整个对象进行反序列化。 Spark尚未提供用于实现自定义编码器的API,但计划在将来的版本中使用。此外,
Dataset
API旨在与Java和Scala同样良好地工作。使用Java对象时,重要的是它们完全符合bean。
示例Dataset
API SQL样式:
dataset.filter(_.age < 21);
进一步阅读... databricks article
Apache Spark提供三种类型的API
以下是RDD,Dataframe和Dataset之间的API比较。
Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨群集节点分区的元素的集合,可以并行操作。
Spark在Spark 1.3版本中引入了Dataframes。 Dataframe克服了RDD所面临的主要挑战。
DataFrame是组织到命名列中的分布式数据集合。它在概念上等同于关系数据库或R / Python Dataframe中的表。与Dataframe一起,Spark还引入了催化剂优化器,它利用高级编程功能构建可扩展的查询优化器。
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
例:
case class Person(name : String , age : Int)
val dataframe = sqlContext.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
当您使用多个转换和聚合步骤时,这尤其具有挑战性。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口。它是一个强类型,不可变的对象集合,映射到关系模式。
在数据集的核心,API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换。表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率。 Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContext.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
例:
ds.select(col("name").as[String], $"age".as[Int]).collect()
不支持Python和R:从1.6版开始,Datasets仅支持Scala和Java。 Python支持将引入Python 2.0。
与现有的RDD和Dataframe API相比,Datasets API带来了一些优势,具有更好的类型安全性和函数式编程。由于API中的类型转换要求的挑战,您仍然不会需要类型安全性并且会使代码变得脆弱。
Spark提供的主要抽象是弹性分布式数据集(RDD),它是跨群集节点分区的元素的集合,可以并行操作。
它可以轻松高效地处理结构化数据以及非结构化数据。
Spark在Spark 1.3版本中引入了Dataframes。 Dataframe克服了RDD所面临的主要挑战。
DataFrame是组织到命名列中的分布式数据集合。它在概念上等同于关系数据库或R / Python Dataframe中的表。与Dataframe一起,Spark还引入了催化剂优化器,它利用高级编程功能构建可扩展的查询优化器。
1.Analyzing a logical plan to resolve references
2.Logical plan optimization
3.Physical planning
4.Code generation to compile parts of the query to Java bytecode.
例:
case class Person(name : String , age : Int)
val dataframe = sqlContect.read.json("people.json")
dataframe.filter("salary > 10000").show
=> throws Exception : cannot resolve 'salary' given input age , name
当您使用多个转换和聚合步骤时,这尤其具有挑战性。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
personDF.rdd // returns RDD[Row] , does not returns RDD[Person]
Dataset API是DataFrames的扩展,它提供了一种类型安全的,面向对象的编程接口。它是一个强类型,不可变的对象集合,映射到关系模式。
在数据集的核心,API是一个称为编码器的新概念,它负责在JVM对象和表格表示之间进行转换。表格表示使用Spark内部Tungsten二进制格式存储,允许对序列化数据进行操作并提高内存利用率。 Spark 1.6支持自动生成各种类型的编码器,包括基本类型(例如String,Integer,Long),Scala案例类和Java Bean。
例:
case class Person(name : String , age : Int)
val personRDD = sc.makeRDD(Seq(Person("A",10),Person("B",20)))
val personDF = sqlContect.createDataframe(personRDD)
val ds:Dataset[Person] = personDF.as[Person]
ds.filter(p => p.age > 25)
ds.filter(p => p.salary > 25)
// error : value salary is not a member of person
ds.rdd // returns RDD[Person]
例:
ds.select(col("name").as[String], $"age".as[Int]).collect()
不支持Python和R:从1.6版开始,Datasets仅支持Scala和Java。 Python支持将引入Python 2.0。
与现有的RDD和Dataframe API相比,Datasets API带来了一些优势,具有更好的类型安全性和函数式编程。由于API中的类型转换要求的挑战,您仍然不会需要类型安全性并且会使代码变得脆弱。
RDD
是一个容错的容错集合,可以并行操作。
DataFrame
是一个组织成命名列的数据集。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。
Dataset
是一个分布式数据集合。数据集是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。
注意:
Scala / Java中的行数据集(
Dataset[Row]
)通常称为DataFrame。
问:你可以将一个转换为另一个,如RDD到DataFrame,反之亦然?
1. RDD
与DataFrame
与.toDF()
val rowsRdd: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val df = spark.createDataFrame(rowsRdd).toDF("id", "val1", "val2")
df.show()
+------+----+----+
| id|val1|val2|
+------+----+----+
| first| 2.0| 7.0|
|second| 3.5| 2.5|
| third| 7.0| 5.9|
+------+----+----+
更多方式:Convert an RDD object to Dataframe in Spark
2. DataFrame
/ DataSet
与RDD
用.rdd()
方法
val rowsRdd: RDD[Row] = df.rdd() // DataFrame to RDD
简单地说RDD
是核心组件,但DataFrame
是spark 1.30中引入的API。
数据分区的集合称为RDD
。这些RDD
必须遵循以下几个属性:
这里RDD
是结构化的或非结构化的。
DataFrame
是Scala,Java,Python和R中提供的API。它允许处理任何类型的结构化和半结构化数据。要定义DataFrame
,将分布式数据的集合组织到名为DataFrame
的命名列中。您可以轻松优化RDDs
中的DataFrame
。您可以使用DataFrame
一次处理JSON数据,镶木地板数据,HiveQL数据。
val sampleRDD = sqlContext.jsonFile("hdfs://localhost:9000/jsondata.json")
val sample_DF = sampleRDD.toDF()
这里Sample_DF视为DataFrame
。 sampleRDD
(原始数据)称为RDD
。
因为DataFrame
是弱类型的,开发人员没有获得类型系统的好处。例如,假设您想从SQL读取内容并在其上运行一些聚合:
val people = sqlContext.read.parquet("...")
val department = sqlContext.read.parquet("...")
people.filter("age > 30")
.join(department, people("deptId") === department("id"))
.groupBy(department("name"), "gender")
.agg(avg(people("salary")), max(people("age")))
当你说people("deptId")
,你没有回到Int
或Long
时,你会得到一个你需要操作的Column
物体。在具有诸如Scala之类的丰富类型系统的语言中,最终会失去所有类型安全性,从而增加了在编译时可以发现的事物的运行时错误的数量。
相反,DataSet[T]
是打字的。当你这样做时:
val people: People = val people = sqlContext.read.parquet("...").as[People]
你实际上得到了一个People
对象,其中deptId
是一个实际的整数类型而不是列类型,因此利用了类型系统。
从Spark 2.0开始,DataFrame和DataSet API将统一起来,其中DataFrame
将成为DataSet[Row]
的类型别名。
大多数答案都是正确的,只想在这里添加一点
在Spark 2.0中,两个API(DataFrame + DataSet)将统一到一个API中。
“统一DataFrame和数据集:在Scala和Java中,DataFrame和Dataset已经统一,即DataFrame只是Row数据集的类型别名。在Python和R中,由于缺乏类型安全性,DataFrame是主要的编程接口。”
数据集与RDD类似,但是,它们不使用Java序列化或Kryo,而是使用专用的编码器来序列化对象以便通过网络进行处理或传输。
Spark SQL支持两种不同的方法将现有RDD转换为数据集。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以使代码更加简洁,并且在编写Spark应用程序时已经了解了模式。
创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有RDD。虽然此方法更详细,但它允许您在直到运行时才知道列及其类型时构造数据集。
在这里,您可以找到RDD到Dataframe对话的答案
DataFrame等同于RDBMS中的表,也可以通过类似于RDD中“本机”分布式集合的方式进行操作。与RDD不同,Dataframes跟踪架构并支持各种关系操作,从而实现更优化的执行。每个DataFrame对象代表一个逻辑计划,但由于它们的“惰性”特性,在用户调用特定的“输出操作”之前不会执行任何操作。