我的文件结构中有多个JSON文件:
等等。
所有 JSON 文件都具有相同的架构。
但是,我在将此数据提取到 Databricks 中时遇到问题。我已将
recursiveFileLookup
选项设置为 true
,我可以摄取它,但最终每行一个文档(每个文件位于一行)。
有什么方法可以将Databricks中的数据合并为一行吗?
此外,如果有人有将此数据引入 Azure 数据工厂中的数据流的解决方案,请分享!
谢谢!
我在Azure databricks中尝试了以下方法:
使用
recursiveFileLookup
读取文件路径并将数据合并到一行中。
df_files = spark.read.option("recursiveFileLookup", "true").json("/FileStore/tables/Commodity1/Interval/")
df_files.show(truncate=False)
+----------+----------+-----+
|commodity |interval |value|
+----------+----------+-----+
|Commodity1|2024-01-01|100 |
|Commodity1|2024-01-01|200 |
|Commodity1|2024-01-01|300 |
|Commodity1|2024-01-02|150 |
|Commodity1|2024-01-02|250 |
+----------+----------+-----+
merged_df = df_files.agg(collect_list("value").alias("merged_values"))
merged_df.show(truncate=False)
+-------------------------+
|merged_values |
+-------------------------+
|[100, 200, 300, 150, 250]|
+-------------------------+
grouped_df = df_files.groupBy("interval").agg(collect_list("value").alias("merged_values"))
grouped_df.show(truncate=False)
结果:
+----------+---------------+
|interval |merged_values |
+----------+---------------+
|2024-01-01|[100, 200, 300]|
|2024-01-02|[150, 250] |
+----------+---------------+
上面的代码读取嵌套目录结构中的所有JSON文件 将所有值聚合到一行内的单个列表中并按间隔分组并收集每个日期的值