如何使用Scala读取Spark中的JSON文件?

问题描述 投票:0回答:5
我想以下面的格式读取JSON文件:-

{ "titlename": "periodic", "atom": [ { "usage": "neutron", "dailydata": [ { "utcacquisitiontime": "2017-03-27T22:00:00Z", "datatimezone": "+02:00", "intervalvalue": 28128, "intervaltime": 15 }, { "utcacquisitiontime": "2017-03-27T22:15:00Z", "datatimezone": "+02:00", "intervalvalue": 25687, "intervaltime": 15 } ] } ] }
我正在写我的读行,为:

sqlContext.read.json("user/files_fold/testing-data.json").printSchema

但我没有得到所需的结果 - 

root |-- _corrupt_record: string (nullable = true)

请帮助我
    

I建议使用
json scala apache-spark
5个回答
5
投票
读取文件并应用一些功能将其转换为单行JSON格式。

val json = sc.wholeTextFiles("/user/files_fold/testing-data.json").
  map(tuple => tuple._2.replace("\n", "").trim)

val df = sqlContext.read.json(json)
您应该具有最终有效的

dataframe
+--------------------------------------------------------------------------------------------------------+---------+
|atom                                                                                                    |titlename|
+--------------------------------------------------------------------------------------------------------+---------+
|[[WrappedArray([+02:00,15,28128,2017-03-27T22:00:00Z], [+02:00,15,25687,2017-03-27T22:15:00Z]),neutron]]|periodic |
+--------------------------------------------------------------------------------------------------------+---------+

有效
schema

root
 |-- atom: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dailydata: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- datatimezone: string (nullable = true)
 |    |    |    |    |-- intervaltime: long (nullable = true)
 |    |    |    |    |-- intervalvalue: long (nullable = true)
 |    |    |    |    |-- utcacquisitiontime: string (nullable = true)
 |    |    |-- usage: string (nullable = true)
 |-- titlename: string (nullable = true)


Spark2.2引入了多行选项,可用于加载JSON(而非JSONL)文件:
spark.read
.option("multiLine", true).option("mode", "PERMISSIVE")
  .json("/path/to/user.json")

3
投票

这已经得到了其他贡献者的很好回答,但是我有一个问题是我如何访问dataFrame的每个嵌套值/单位。
,对于收集,我们可以使用爆炸和结构类型,我们可以通过
dot(.)

1
投票
直接调用该设备。

scala> val a = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json("file:///home/hdfs/spark_2.json") a: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string] scala> a.printSchema root |-- atom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- dailydata: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- datatimezone: string (nullable = true) | | | | |-- intervaltime: long (nullable = true) | | | | |-- intervalvalue: long (nullable = true) | | | | |-- utcacquisitiontime: string (nullable = true) | | |-- usage: string (nullable = true) |-- titlename: string (nullable = true) scala> val b = a.withColumn("exploded_atom", explode(col("atom"))) b: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 1 more field] scala> b.printSchema root |-- atom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- dailydata: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- datatimezone: string (nullable = true) | | | | |-- intervaltime: long (nullable = true) | | | | |-- intervalvalue: long (nullable = true) | | | | |-- utcacquisitiontime: string (nullable = true) | | |-- usage: string (nullable = true) |-- titlename: string (nullable = true) |-- exploded_atom: struct (nullable = true) | |-- dailydata: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- datatimezone: string (nullable = true) | | | |-- intervaltime: long (nullable = true) | | | |-- intervalvalue: long (nullable = true) | | | |-- utcacquisitiontime: string (nullable = true) | |-- usage: string (nullable = true) scala> scala> val c = b.withColumn("exploded_atom_struct", explode(col("`exploded_atom`.dailydata"))) c: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 2 more fields] scala> scala> c.printSchema root |-- atom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- dailydata: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- datatimezone: string (nullable = true) | | | | |-- intervaltime: long (nullable = true) | | | | |-- intervalvalue: long (nullable = true) | | | | |-- utcacquisitiontime: string (nullable = true) | | |-- usage: string (nullable = true) |-- titlename: string (nullable = true) |-- exploded_atom: struct (nullable = true) | |-- dailydata: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- datatimezone: string (nullable = true) | | | |-- intervaltime: long (nullable = true) | | | |-- intervalvalue: long (nullable = true) | | | |-- utcacquisitiontime: string (nullable = true) | |-- usage: string (nullable = true) |-- exploded_atom_struct: struct (nullable = true) | |-- datatimezone: string (nullable = true) | |-- intervaltime: long (nullable = true) | |-- intervalvalue: long (nullable = true) | |-- utcacquisitiontime: string (nullable = true) scala> val d = c.withColumn("exploded_atom_struct_last", col("`exploded_atom_struct`.utcacquisitiontime")) d: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string ... 3 more fields] scala> d.printSchema root |-- atom: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- dailydata: array (nullable = true) | | | |-- element: struct (containsNull = true) | | | | |-- datatimezone: string (nullable = true) | | | | |-- intervaltime: long (nullable = true) | | | | |-- intervalvalue: long (nullable = true) | | | | |-- utcacquisitiontime: string (nullable = true) | | |-- usage: string (nullable = true) |-- titlename: string (nullable = true) |-- exploded_atom: struct (nullable = true) | |-- dailydata: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- datatimezone: string (nullable = true) | | | |-- intervaltime: long (nullable = true) | | | |-- intervalvalue: long (nullable = true) | | | |-- utcacquisitiontime: string (nullable = true) | |-- usage: string (nullable = true) |-- exploded_atom_struct: struct (nullable = true) | |-- datatimezone: string (nullable = true) | |-- intervaltime: long (nullable = true) | |-- intervalvalue: long (nullable = true) | |-- utcacquisitiontime: string (nullable = true) |-- exploded_atom_struct_last: string (nullable = true) scala> val d = c.select(col("titlename"), col("exploded_atom_struct.*")) d: org.apache.spark.sql.DataFrame = [titlename: string, datatimezone: string ... 3 more fields] scala> d.show +---------+------------+------------+-------------+--------------------+ |titlename|datatimezone|intervaltime|intervalvalue| utcacquisitiontime| +---------+------------+------------+-------------+--------------------+ | periodic| +02:00| 15| 28128|2017-03-27T22:00:00Z| | periodic| +02:00| 15| 25687|2017-03-27T22:15:00Z| +---------+------------+------------+-------------+--------------------+

因此,如果有人有类似的问题看到这个问题,则考虑将其发布在此处。


可能与存储在文件中的JSON对象有关,您是否可以打印它或确保它是您在问题中提供的内容?我是因为我接受了那个,而且运行良好:
val json =
  """
    |{
    |  "titlename": "periodic",
    |  "atom": [
    |    {
    |      "usage": "neutron",
    |      "dailydata": [
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:00:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 28128,
    |          "intervaltime": 15
    |        },
    |        {
    |          "utcacquisitiontime": "2017-03-27T22:15:00Z",
    |          "datatimezone": "+02:00",
    |          "intervalvalue": 25687,
    |          "intervaltime": 15
    |        }
    |      ]
    |    }
    |  ]
    |}
  """.stripMargin

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.read
  .json(spark.sparkContext.parallelize(Seq(json)))
  .printSchema()


0
投票

ApacheSpark SQLDocs


  

0
投票

{ "titlename": "periodic","atom": [{ "usage": "neutron", "dailydata": [ {"utcacquisitiontime": "2017-03-27T22:00:00Z","datatimezone": "+02:00","intervalvalue": 28128,"intervaltime":15},{"utcacquisitiontime": "2017-03-27T22:15:00Z","datatimezone": "+02:00", "intervalvalue": 25687,"intervaltime": 15 }]}]}

然后:

val jsonDF = sqlContext.read.json("file") jsonDF: org.apache.spark.sql.DataFrame = [atom: array<struct<dailydata:array<struct<datatimezone:string,intervaltime:bigint,intervalvalue:bigint,utcacquisitiontime:string>>,usage:string>>, titlename: string]

您只需要在您的阅读语句中添加此语句即可。之所以发生,是因为您的JSON是多行选项(“ Multiline”,true)。
spark.read.option("multiLine", true).option("mode", "PERMISSIVE")  .json("/path/to/user.json")

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.