Python 专家们晚上好,
我正在尝试实现一个 python 管道,它将跨多个日期、主题(即参与者)和时间戳预处理 .avro 数据文件。请注意,数据按日期和参与者组织在单独的文件夹中,每个主题的不同时间段有多个文件。文件结构看起来像这样:
实际结构的主要区别是真实的文件名,当然还有日期、主题和与时间相关的 avro 文件的数量。每次处理一个主题、日期和时间点需要进行大量的预处理。我想调整我当前的脚本,但不知道如何做。这是我迄今为止在单个 avro 文件上工作的片段:
1。导入目录:
from avro.datafile import DataFileReader
import json
import csv
import os
2。创建一个包含单个 Avro 文件路径的变量,以及另一个保存预处理数据的输出目录的变量:
avro_file_path = "/Users/ymhs/projects/01-01-2024/sub-001/Time1.avro"
output_dir = "/Users/ymhs/projects/preproc/"
3。将数据导出到 csv 文件以获取不同的感兴趣度量。我以“加速度计”数据的代码块为例。
acc = data["rawData"]["accelerometer"]
timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
for i in range(len(acc["x"]))]
with open(os.path.join(output_dir, 'accelerometer.csv'), 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "x", "y", "z"])
writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, acc["x"], acc["y"], acc["z"])])
这会生成一个可用于后续分析的 cvs 文件,但它只会生成单个 avro 文件的数据价值。此外,结果文件的结果命名约定在名称中不具有主题ID或日期。我不知道如何将这些组件添加到文件名中,因为它们是可变的。
我觉得这个问题的解决方案应该是直接的,但我是一个Python新手,任何建议将不胜感激。我非常感谢您能够提供的任何指导。
您已经完成了大部分工作。 Python 能够迭代文件结构并将文件名存储为变量。 您可以使用当前代码一次处理每个文件,并在创建新文件时使用文件名变量。
pathlib
模块是标准库的一部分,可以帮助查找您想要的文件。
from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os
from pathlib import Path
avro_root_dir = Path("/Users/ymhs/projects/")
output_dir = Path("/Users/ymhs/projects/preproc/")
# match the format: mm-dd-yyyy/{anything}/{filename}.avro
for file in avro_root_dir.glob('[0-9][0-9]-[0-9][0-9]-[0-9][0-9][0-9][0-9]/*/*.avro'):
with file.open('rb') as fp:
# add your code to extract the `data` variable here
data = DataFileReader(fp, DatumReader())
acc = data["rawData"]["accelerometer"]
timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
for i in range(len(acc["x"]))]
_*, date, subject, filename = file.parts
outfile = output_dir / date / subject / filename.replace('.avro', '.csv')
with outfile.open('w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["unix_timestamp", "x", "y", "z"])
writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, acc["x"], acc["y"], acc["z"])])