我真的很乐意为使用PySpark-SQL解析嵌套JSON数据提供一些帮助。数据具有以下架构(空格是出于保密目的而进行的编辑...)
模式
root
|-- location_info: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- restaurant_type: string (nullable = true)
| | |
| | |
| | |-- other_data: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- other_data_1 string (nullable = true)
| | | | |-- other_data_2: string (nullable = true)
| | | | |-- other_data_3: string (nullable = true)
| | | | |-- other_data_4: string (nullable = true)
| | | | |-- other_data_5: string (nullable = true)
| | |
| | |-- latitude: string (nullable = true)
| | |
| | |
| | |
| | |
| | |
| | |-- longitude: string (nullable = true)
| | |
| | |
| | |
| | |-- timezone: string (nullable = true)
|-- restaurant_id: string (nullable = true)
我的目标我本质上想将数据放入以下数据框
restaurant_id | latitude | longtitude | timezone
我已经尝试过
dfj = spark.read.option("multiLine", False).json("/file/path")
result = dfj.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# SQL operation
result.createOrReplaceTempView('result')
subset_data = spark.sql(
'''
SELECT restaurant_id, location_info.latitude,location_info.longitude,location_info.timestamp
FROM result
'''
).show()
# Also tried this to read in
source_df_1 = spark.read.json(sc.wholeTextFiles("/file/path")
.values()
.flatMap(lambda x: x
.replace("{", "#!#")
.split("#!#")))
但是奇怪的是,它仅向我提供第一个对象或餐厅ID
+-------+-----------+------------+--------------------+
|restaurant_id|latitude|longitude|timestamp|
+-------+-----------+------------+--------------------+
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:00:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
| 25|2.0|-8.0|2020-03-06T03:01:...|
+-------+-----------+------------+--------------------+
我的研究表明,这可能与从源头构造JSON文件的方式有关。例如:
{}{
}{
}
因此不是多行或其他内容。也想知道该怎么办?
非常感谢您的阅读,我们将不胜感激。我知道我总是可以依靠SO来提供帮助
我能够通过阅读上文所述的JSON文件来解决此问题,希望对您有所帮助! :
# Reading multiple files in the dir
source_df_1 = spark.read.json(sc.wholeTextFiles("file_path/*")
.values()
.flatMap(lambda x: x
.replace('{"restaurant_id','\n{"restaurant_id' ).split('\n')))
# explode here to have restaurant_id, and nested data
exploded_source_df_1 = source_df_1.select(col('restaurant_id'),
explode(col('location_info')).alias('location_info') )
# Via SQL operation : this will solve the problem for parsing
exploded_source_df_1.createOrReplaceTempView('result_1')
subset_data_1 = spark.sql(
'''
SELECT restaurant_id, location_infos.latitude,location_infos.longitude,location_infos.timestamp
from result_1
'''
).persist()