在 Python 中迭代文件和目录以清理 Avro 数据

问题描述 投票:0回答:1

Python 专家们晚上好,

我正在尝试实现一个 python 管道,它将跨多个日期、主题(即参与者)和时间戳预处理 .avro 数据文件。请注意,数据按日期和参与者组织在单独的文件夹中,每个主题的不同时间段有多个文件。文件结构看起来像这样:

enter image description here

实际结构的主要区别是真实的文件名,当然还有日期、主题和与时间相关的 avro 文件的数量。每次处理一个主题、日期和时间点需要进行大量的预处理。我想调整我当前的脚本,但不知道如何做。这是我迄今为止在单个 avro 文件上工作的片段:

1。导入目录:

    from avro.datafile import DataFileReader
    import json
    import csv
    import os

2。创建一个包含单个 Avro 文件路径的变量,以及另一个保存预处理数据的输出目录的变量:

avro_file_path = "/Users/ymhs/projects/01-01-2024/sub-001/Time1.avro"
output_dir = "/Users/ymhs/projects/preproc/"

3。将数据导出到 csv 文件以获取不同的感兴趣度量。我以“加速度计”数据的代码块为例。

acc = data["rawData"]["accelerometer"]

timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
             for i in range(len(acc["x"]))]
with open(os.path.join(output_dir, 'accelerometer.csv'), 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(["unix_timestamp", "x", "y", "z"])
    writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, acc["x"], acc["y"], acc["z"])])

这会生成一个可用于后续分析的 cvs 文件,但它只会生成单个 avro 文件的数据价值。此外,结果文件的结果命名约定在名称中不具有主题ID或日期。我不知道如何将这些组件添加到文件名中,因为它们是可变的。

我觉得这个问题的解决方案应该是直接的,但我是一个Python新手,任何建议将不胜感激。我非常感谢您能够提供的任何指导。

python python-3.x loops for-loop avro
1个回答
0
投票

您已经完成了大部分工作。 Python 能够迭代文件结构并将文件名存储为变量。 您可以使用当前代码一次处理每个文件,并在创建新文件时使用文件名变量。

pathlib
模块是标准库的一部分,可以帮助查找您想要的文件。

from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
import csv
import os
from pathlib import Path

avro_root_dir = Path("/Users/ymhs/projects/")
output_dir = Path("/Users/ymhs/projects/preproc/")

# match the format: mm-dd-yyyy/{anything}/{filename}.avro
for file in avro_root_dir.glob('[0-9][0-9]-[0-9][0-9]-[0-9][0-9][0-9][0-9]/*/*.avro'):
    with file.open('rb') as fp:
        # add your code to extract the `data` variable here
        data = DataFileReader(fp, DatumReader())
    acc = data["rawData"]["accelerometer"]
    timestamp = [round(acc["timestampStart"] + i * (1e6 / acc["samplingFrequency"]))
                 for i in range(len(acc["x"]))]

    _*, date, subject, filename = file.parts
    outfile = output_dir / date / subject / filename.replace('.avro', '.csv')
    with outfile.open('w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["unix_timestamp", "x", "y", "z"])
        writer.writerows([[ts, x, y, z] for ts, x, y, z in zip(timestamp, acc["x"], acc["y"], acc["z"])])
© www.soinside.com 2019 - 2024. All rights reserved.