将数据从 blob 存储提取到 Databricks[自动化]

问题描述 投票:0回答:1

我有按年、月和日期(嵌套文件夹)存储在不同文件夹中的 blob 数据,每天都会刷新。 我需要设计一个管道,它将有效地将历史数据从 blob 加载到 azure databricks。您能否指导一下在databricks中存储每日和历史数据的正确方法?

遵循的步骤:使用存储 blob 贡献者创建挂载点,并能够访问示例数据,例如 2024/18/7/table_name.parquet 的一个 parquet 文件。

每天自动化加载历史数据的方式应该是什么。 谢谢你

azure pyspark automation azure-blob-storage parquet
1个回答
0
投票

在这里,将数据从存储帐户复制到 databricks 时需要考虑两件事。

  1. 将所有文件复制到databricks中的同一个文件,即将源文件合并到databricks中的单个目标文件中。

    在这里,第一件事是您需要将所有旧日期文件复制到 databricks 文件中。然后每天将每日日期文件合并到同一个 databricks 文件中。

    用于将迄今为止所有日期文件复制到目标文件的代码。

    df1=spark.read.parquet('/mnt/mymount/2024/*/*/*.parquet',inferSchema=True)
    df1.write.parquet('/dbfs/FileStore/tables/target.parquet')
    dbutils.fs.ls('/dbfs/FileStore/tables')
    

    enter image description here

    现在,拿另一个笔记本并使用下面的代码将每日日期文件加载到同一位置。

    from datetime import datetime
    temp_date=datetime.today().strftime('%Y/%d/%m')
    mydate=temp_date[0:temp_date.rfind('/')]+'/'+str(int(temp_date.split('/')[-1]))
    print(mydate)
    
    mydf=spark.read.parquet('/dbfs/FileStore/tables/target.parquet',inferSchema=True)
    files_path='/mnt/mymount/'+mydate+'/*.parquet'
    mydf.union(spark.read.parquet(files_path,inferSchema=True))
    mydf.display()
    mydf.write.mode("overwrite").parquet('/dbfs/FileStore/tables/target.parquet')
    

    enter image description here

    您需要每天安排此笔记本,以便它复制每日数据。

    enter image description here

  2. 将所有文件作为单个文件复制到数据块中,具有相同的文件夹结构。

    使用下面的代码首先将到目前为止的文件复制到目标。

    import glob, os
    list_of_paths=[]
    for file in glob.iglob('/dbfs/mnt/mymount/2024/**/*.parquet',recursive=True):
        list_of_paths.append(file.replace('/dbfs',''))
    print(list_of_paths)
    
    for i in list_of_paths:
        spark.read.parquet(i,inferSchema=True).write.parquet('/dbfs/FileStore/tables/'+i.replace('/mnt/mymount/',''))
    

    此代码递归获取所有文件路径,并通过创建与源相同的结构复制到目标位置。

    enter image description here

    对于日常工作,请在新笔记本中使用以下代码。

    files_today_path='/mnt/mymount/'+mydate
    list_today_paths=[]
    for i in dbutils.fs.ls(files_today_path):
        list_today_paths.append(i.path)
    print(list_today_paths)
    for i in list_of_paths:
        spark.read.parquet(i,inferSchema=True).write.mode("overwrite").parquet('/dbfs/FileStore/tables/'+i.replace('dbfs:/mnt/mymount/',''))
    

    像上面一样获取文件夹的当前日期格式并使用它。每天安排这个笔记本。

© www.soinside.com 2019 - 2024. All rights reserved.