使用PySpark将嵌套的JSON解析为Spark DataFrame

问题描述 投票:0回答:1

我真的很乐意为使用PySpark-SQL解析嵌套JSON数据提供一些帮助。数据具有以下架构(空格是出于保密目的而进行的编辑...)

模式

root
 |-- location_info: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant_type: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |-- other_data: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- other_data_1 string (nullable = true)
 |    |    |    |    |-- other_data_2: string (nullable = true)
 |    |    |    |    |-- other_data_3: string (nullable = true)
 |    |    |    |    |-- other_data_4: string (nullable = true)
 |    |    |    |    |-- other_data_5: string (nullable = true)
 |    |    |
 |    |    |-- latitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- longitude: string (nullable = true)
 |    |    |
 |    |    |
 |    |    |
 |    |    |-- timezone: string (nullable = true)
 |-- restaurant_id: string (nullable = true)

我的目标我本质上想将数据放入以下数据框

restaurant_id | latitude | longtitude | timezone 

我已经尝试过

dfj = spark.read.option("multiLine", False).json("/file/path")

result = dfj.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )

# SQL operation
result.createOrReplaceTempView('result')

subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp  
FROM result

'''
).show()  

# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
          .values()
          .flatMap(lambda x: x
                   .replace("{", "#!#")
                   .split("#!#")))

但是奇怪的是,它仅向我提供第一个对象或餐厅ID

+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+

我的研究表明,这可能与从源头构造JSON文件的方式有关。例如:

{}{
}{
}

因此不是多行或其他内容。也想知道该怎么办?

非常感谢您的阅读,我们将不胜感激。我知道我总是可以依靠SO来提供帮助

apache-spark pyspark apache-spark-sql bigdata databricks
1个回答
0
投票

我能够通过阅读上文所述的JSON文件来解决此问题,希望对您有所帮助! :

# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")                              
          .values()
          .flatMap(lambda x: x
                   .replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))


# explode here to have restaurant_id, and nested data
exploded_source_df_1 =  source_df_1.select(col('restaurant_id'),
  explode(col('location_info')).alias('location_info') )


# Via SQL operation : this will solve the problem for parsing 
exploded_source_df_1.createOrReplaceTempView('result_1')

subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timestamp 
from result_1
'''
).persist()
© www.soinside.com 2019 - 2024. All rights reserved.