将每行一个文档的递归 JSON 数据摄取到 Databricks 或 Azure 数据工厂中

问题描述 投票:0回答:1

我的文件结构中有多个JSON文件:

  • 商品1
    • 间隔
      • YYYY
        • MM
          • DD
            • 文件1.json
    • 间隔
      • YYYY
        • MM
          • DD
            • 文件2.json
          • DD
            • 文件3.json
        • MM
          • DD
            • 文件4.json
          • DD
            • 文件5.json

等等。

所有 JSON 文件都具有相同的架构。

但是,我在将此数据提取到 Databricks 中时遇到问题。我已将

recursiveFileLookup
选项设置为
true
,我可以摄取它,但最终每行一个文档(每个文件位于一行)。

有什么方法可以将Databricks中的数据合并为一行吗?

此外,如果有人有将此数据引入 Azure 数据工厂中的数据流的解决方案,请分享!

谢谢!

arrays json azure-data-factory azure-databricks data-ingestion
1个回答
0
投票

我在Azure databricks中尝试了以下方法:

使用

recursiveFileLookup
读取文件路径并将数据合并到一行中。

df_files = spark.read.option("recursiveFileLookup", "true").json("/FileStore/tables/Commodity1/Interval/")
df_files.show(truncate=False)
+----------+----------+-----+
|commodity |interval  |value|
+----------+----------+-----+
|Commodity1|2024-01-01|100  |
|Commodity1|2024-01-01|200  |
|Commodity1|2024-01-01|300  |
|Commodity1|2024-01-02|150  |
|Commodity1|2024-01-02|250  |
+----------+----------+-----+
merged_df = df_files.agg(collect_list("value").alias("merged_values"))
merged_df.show(truncate=False)
+-------------------------+
|merged_values            |
+-------------------------+
|[100, 200, 300, 150, 250]|
+-------------------------+
grouped_df = df_files.groupBy("interval").agg(collect_list("value").alias("merged_values"))
grouped_df.show(truncate=False)

结果:

+----------+---------------+
|interval  |merged_values  |
+----------+---------------+
|2024-01-01|[100, 200, 300]|
|2024-01-02|[150, 250]     |
+----------+---------------+

上面的代码读取嵌套目录结构中的所有JSON文件 将所有值聚合到一行内的单个列表中并按间隔分组并收集每个日期的值

© www.soinside.com 2019 - 2024. All rights reserved.