我有按年、月和日期(嵌套文件夹)存储在不同文件夹中的 blob 数据,每天都会刷新。 我需要设计一个管道,它将有效地将历史数据从 blob 加载到 azure databricks。您能否指导一下在databricks中存储每日和历史数据的正确方法?
遵循的步骤:使用存储 blob 贡献者创建挂载点,并能够访问示例数据,例如 2024/18/7/table_name.parquet 的一个 parquet 文件。
每天自动化加载历史数据的方式应该是什么。 谢谢你
在这里,将数据从存储帐户复制到 databricks 时需要考虑两件事。
将所有文件复制到databricks中的同一个文件,即将源文件合并到databricks中的单个目标文件中。
在这里,第一件事是您需要将所有旧日期文件复制到 databricks 文件中。然后每天将每日日期文件合并到同一个 databricks 文件中。
用于将迄今为止所有日期文件复制到目标文件的代码。
df1=spark.read.parquet('/mnt/mymount/2024/*/*/*.parquet',inferSchema=True)
df1.write.parquet('/dbfs/FileStore/tables/target.parquet')
dbutils.fs.ls('/dbfs/FileStore/tables')
现在,拿另一个笔记本并使用下面的代码将每日日期文件加载到同一位置。
from datetime import datetime
temp_date=datetime.today().strftime('%Y/%d/%m')
mydate=temp_date[0:temp_date.rfind('/')]+'/'+str(int(temp_date.split('/')[-1]))
print(mydate)
mydf=spark.read.parquet('/dbfs/FileStore/tables/target.parquet',inferSchema=True)
files_path='/mnt/mymount/'+mydate+'/*.parquet'
mydf.union(spark.read.parquet(files_path,inferSchema=True))
mydf.display()
mydf.write.mode("overwrite").parquet('/dbfs/FileStore/tables/target.parquet')
您需要每天安排此笔记本,以便它复制每日数据。
将所有文件作为单个文件复制到数据块中,具有相同的文件夹结构。
使用下面的代码首先将到目前为止的文件复制到目标。
import glob, os
list_of_paths=[]
for file in glob.iglob('/dbfs/mnt/mymount/2024/**/*.parquet',recursive=True):
list_of_paths.append(file.replace('/dbfs',''))
print(list_of_paths)
for i in list_of_paths:
spark.read.parquet(i,inferSchema=True).write.parquet('/dbfs/FileStore/tables/'+i.replace('/mnt/mymount/',''))
此代码递归获取所有文件路径,并通过创建与源相同的结构复制到目标位置。
对于日常工作,请在新笔记本中使用以下代码。
files_today_path='/mnt/mymount/'+mydate
list_today_paths=[]
for i in dbutils.fs.ls(files_today_path):
list_today_paths.append(i.path)
print(list_today_paths)
for i in list_of_paths:
spark.read.parquet(i,inferSchema=True).write.mode("overwrite").parquet('/dbfs/FileStore/tables/'+i.replace('dbfs:/mnt/mymount/',''))
像上面一样获取文件夹的当前日期格式并使用它。每天安排这个笔记本。