如果你能想到一个好的,请更新标题!
我有以下结构的数据:
chr pos A_block A_val
2 05 7 A,T,C
2 11 7 T,C,G
2 15 7 AT,C,G
2 21 7 C,A,GT
2 31 7 T,C,CA
2 42 9 T,C,G
2 55 9 C,G,GC
2 61 9 A,GC,T
2 05 12 AC,TG,G
2 11 12 A,TC,TG
预期输出:为了学习,我只想重写输出文件,与输入文件相同,但使用我在下面提出的过程。
我想:step 01:
一次仅读取两个连续块的值(前7和9) - > step 02:
将该数据存储在字典中,其中block numbers
作为主要唯一键 - > step 03:
将该字典返回到预定义函数进行解析。 - >现在,读取块(9和12) - >重复相同的过程直到结束。
我想的是:
import req_packages
from collections import defaultdict
''' make a function that takes data from two blocks at a time '''
def parse_two_blocks(someData):
for key, vals in someData:
do ... something
write the obtained output
clear memory # to prevent memory buildup
''' Now, read the input file'''
with open('HaploBlock_toy.txt') as HaploBlocks:
header = HaploBlocks.readline()
# only reads the first line as header
''' create a empty dict or default dict. Which ever is better?'''
Hap_Dict = {}
Hap_Dict = defaultdict(list)
''' for rest of the lines '''
for lines in HaploBlocks:
values = lines.strip('\n').split('\t')
''' append the data to the dict for unique keys on the for loop, until the number of unique keys is 2 '''
Block = values[2]
Hap_Dict[Block].append(values[3])
do something to count the number of keys - how?
if keys_count > 2:
return parse_two_blocks(Hap_Dict)
elif keys_count < 2 or no new keys: # This one is odd and won't work I know.
end the program
因此,当代码被执行时,它将从块7和9读取数据,直到字典被填充并返回到预定义的函数。解析完成后,它现在可以保留前一个解析的后一个块中的数据。这样它只需要读取剩余的块。
预期输出:我现在面临的主要问题是能够一次读取两个块。我不想在`parse_two_blocks(someData)'中添加我想要解析信息的内在细节 - 这只是另一个问题。但是,让我们尝试重写与输入相同的输出。
将输入解析为块的动态列表(生成器)。迭代对。在评估对时,应该完成所有操作。也就是说,这些行中没有一行应该一次读取或存储整个csv文件。
#!/usr/bin/env python3
data = """chr pos A_block A_val
2 05 7 A,T,C
2 11 7 T,C,G
2 15 7 AT,C,G
2 21 7 C,A,GT
2 31 7 T,C,CA
2 42 9 T,C,G
2 55 9 C,G,GC
2 61 9 A,GC,T
2 05 12 AC,TG,G
2 11 12 A,TC,TG"""
import csv
import io
import itertools
import collections
import operator
from pprint import pprint
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def one():
# read rows as tuples of values
c = csv.reader(io.StringIO(data), dialect=csv.excel_tab)
# read header row
keys = next(c)
block_index = keys.index('A_block')
# group rows by block numbers
blocks = itertools.groupby(c, key=operator.itemgetter(block_index))
# extract just the row values for each block
row_values = (tuple(v) for k, v in blocks)
# rearrange the values by column
unzipped_values = (zip(*v) for v in row_values)
# create a dictionary for each block
dict_blocks = (dict(zip(keys, v)) for v in unzipped_values)
yield from pairwise(dict_blocks)
def two():
c = csv.DictReader(io.StringIO(data), dialect=csv.excel_tab)
blocks = itertools.groupby(c, key=lambda x: x['A_block'])
yield from pairwise((k, list(v)) for k, v in blocks)
for a, b in one():
pprint(a)
pprint(b)
print()
产量(one
):
{'A_block': ('7', '7', '7', '7', '7'),
'A_val': ('A,T,C', 'T,C,G', 'AT,C,G', 'C,A,GT', 'T,C,CA'),
'chr': ('2', '2', '2', '2', '2'),
'pos': ('05', '11', '15', '21', '31')}
{'A_block': ('9', '9', '9'),
'A_val': ('T,C,G', 'C,G,GC', 'A,GC,T'),
'chr': ('2', '2', '2'),
'pos': ('42', '55', '61')}
{'A_block': ('9', '9', '9'),
'A_val': ('T,C,G', 'C,G,GC', 'A,GC,T'),
'chr': ('2', '2', '2'),
'pos': ('42', '55', '61')}
{'A_block': ('12', '12'),
'A_val': ('AC,TG,G', 'A,TC,TG'),
'chr': ('2', '2'),
'pos': ('05', '11')}
获取一个字符串并返回一个包含string内容的类文件对象。
来自csv.DictReader(file_object, dialect)
的csv module
返回每行的有序dict,其中从第一行获取的字段名称用作字段值的字典键。
groupby(iterable, key_function)
创建一个从迭代中返回连续键和组的迭代器。关键是计算每个元素的键值的函数。
lambda x: x['A_block']
一个临时函数,它接受一个名为
x
的输入并返回键'A_block'
的值
(k, list(v)) for k, v in blocks
groupby()
为值返回一个迭代器(只能使用一次)。这会将迭代器转换为列表。
“c - >(c0,c1),(c1,c2),(c2,cz),......”