{
"titlename": "periodic",
"atom": [
{
"usage": "neutron",
"dailydata": [
{
"utcacquisitiontime": "2017-03-27T22:00:00Z",
"datatimezone": "+02:00",
"intervalvalue": 28128,
"intervaltime": 15
},
{
"utcacquisitiontime": "2017-03-27T22:15:00Z",
"datatimezone": "+02:00",
"intervalvalue": 25687,
"intervaltime": 15
}
]
}
]
}
我正在写我的读行,为:
sqlContext.read.json("user/files_fold/testing-data.json").printSchema
但我没有得到所需的结果 -
root
|-- _corrupt_record: string (nullable = true)
请帮助我
I建议使用
val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
map(tuple => tuple._2.replace("\n", "").trim)
val df = sqlContext.read.json(json)
您应该具有最终有效的dataframe
+--------------------------------------------------------------------------------------------------------+---------+
|atom |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+
有效
schema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
Spark2.2引入了多行选项,可用于加载JSON(而非JSONL)文件:
spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
.json("/path/to/user.json")
这已经得到了其他贡献者的很好回答,但是我有一个问题是我如何访问dataFrame的每个嵌套值/单位。
,对于收集,我们可以使用爆炸和结构类型,我们可以通过
dot(.)
scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json")
a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]
scala> a.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
scala> val b = a.withColumn("exploded_atom", explode(col("atom")))
b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field]
scala> b.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
scala>
scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata")))
c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields]
scala>
scala> c.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
|-- exploded_atom_struct: struct (nullable = true)
| |-- datatimezone: string (nullable = true)
| |-- intervaltime: long (nullable = true)
| |-- intervalvalue: long (nullable = true)
| |-- utcacquisitiontime: string (nullable = true)
scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime"))
d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields]
scala> d.printSchema
root
|-- atom: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dailydata: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- datatimezone: string (nullable = true)
| | | | |-- intervaltime: long (nullable = true)
| | | | |-- intervalvalue: long (nullable = true)
| | | | |-- utcacquisitiontime: string (nullable = true)
| | |-- usage: string (nullable = true)
|-- titlename: string (nullable = true)
|-- exploded_atom: struct (nullable = true)
| |-- dailydata: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- datatimezone: string (nullable = true)
| | | |-- intervaltime: long (nullable = true)
| | | |-- intervalvalue: long (nullable = true)
| | | |-- utcacquisitiontime: string (nullable = true)
| |-- usage: string (nullable = true)
|-- exploded_atom_struct: struct (nullable = true)
| |-- datatimezone: string (nullable = true)
| |-- intervaltime: long (nullable = true)
| |-- intervalvalue: long (nullable = true)
| |-- utcacquisitiontime: string (nullable = true)
|-- exploded_atom_struct_last: string (nullable = true)
scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*"))
d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields]
scala> d.show
+---------+------------+------------+-------------+--------------------+
|titlename|datatimezone|intervaltime|intervalvalue| utcacquisitiontime|
+---------+------------+------------+-------------+--------------------+
| periodic| +02:00| 15| 28128|2017-03-27T22:00:00Z|
| periodic| +02:00| 15| 25687|2017-03-27T22:15:00Z|
+---------+------------+------------+-------------+--------------------+
因此,如果有人有类似的问题看到这个问题,则考虑将其发布在此处。
可能与存储在文件中的JSON对象有关,您是否可以打印它或确保它是您在问题中提供的内容?我是因为我接受了那个,而且运行良好:val json =
"""
|{
| "titlename": "periodic",
| "atom": [
| {
| "usage": "neutron",
| "dailydata": [
| {
| "utcacquisitiontime": "2017-03-27T22:00:00Z",
| "datatimezone": "+02:00",
| "intervalvalue": 28128,
| "intervaltime": 15
| },
| {
| "utcacquisitiontime": "2017-03-27T22:15:00Z",
| "datatimezone": "+02:00",
| "intervalvalue": 25687,
| "intervaltime": 15
| }
| ]
| }
| ]
|}
""".stripMargin
val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
.json(spark.sparkContext.parallelize(Seq(json)))
.printSchema()
ApacheSpark SQLDocs
,
{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}
val jsonDF = sqlContext.read.json("file") jsonDF: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]
您只需要在您的阅读语句中添加此语句即可。之所以发生,是因为您的JSON是多行选项(“ Multiline”,true)。
spark.read.option("multiLine", true).option("mode", "PERMISSIVE") .json("/path/to/user.json")