我有一个 python 脚本,旨在处理一些大文件并将结果写入新的 txt 文件中。我将其简化为代码示例 1。 代码示例1:
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os
def process(args):
parm1, parm2, parm3, output_file = args
scores = []
# do something with scores...
result = '\t'.join(scores)
output_file.write(result)
def main(large_file_path, output_path, max_processes):
#do something ...
with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
arg_list = []
parm1, parm2, parm3 = '1', 0, 0
for line in vcf_file:
if line.startswith('#'):
continue
else:
#do something to update parm1, parm2, parm3...
arg_list.append((parm1, parm2, parm3, output_file))
with ProcessPoolExecutor(max_processes) as executor:
executor.map(process, arg_list, chunksize=int(max_processes/2))
if __name__ == "__main__":
large_path = "/path/to/large_file"
output_path = f"para_scores.txt"
max_processes = int(os.cpu_count()/2)# Set the maximum number of processes
main(large_path, output_path, max_processes)
我意识到,如果
arg_list
很大,那么 large_file
可能会很大。我不确定是否有足够的可用内存。然后我尝试使用yield生成器而不是仅仅使用Python列表作为代码示例2,它运行正常但不生成任何东西。
代码示例2:
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from collections import OrderedDict
import os
def process(args):
parm1, parm2, parm3, output_file = args
scores = []
# do something with scores...
result = '\t'.join(scores)
output_file.write(result)
def main(large_file_path, output_path, max_processes):
#do something ...
with open(large_file_path, 'r') as large_file, open(output_path, 'w') as output_file:
def arg_generator(large_file, output_file):
parm1, parm2, parm3 = '1', 0, 0
for line in vcf_file:
if line.startswith('#'):
continue
else:
#do something to update parm1, parm2, parm3...
yield (parm1, parm2, parm3, output_file)
with ProcessPoolExecutor(max_processes) as executor:
executor.map(process, arg_generator(large_file, output_file), chunksize=int(max_processes/2))
if __name__ == "__main__":
large_path = "/path/to/large_file"
output_path = f"para_scores.txt"
max_processes = int(os.cpu_count()/2)# Set the maximum number of processes
main(large_path, output_path, max_processes)
我在 ubuntu 20.04.6 LTS 服务器、python 3.9.18 上运行代码。
那么
ProcessPoolExecutor
可以与 Python 中的 yield
生成器一起使用吗?或者executor.map
的使用有问题?我应该怎么做才能让它发挥作用?
是的,可以。这是一个例子:
from concurrent.futures import ProcessPoolExecutor
def process(x):
print(x * 2)
def gen():
for i in range(3):
yield i
def main():
print('starting')
with ProcessPoolExecutor() as executor:
executor.map(process, gen())
print('done')
main()
输出:
starting
0
2
4
done
进程不知道它们的参数来自哪里,它们接收单个项目,而不是整个生成器。使用发电机没有什么区别。
但是,尝试传递打开的文件句柄似乎确实会奇怪地破坏事情:
from concurrent.futures import ProcessPoolExecutor
def process(f):
print('processing', f)
def main():
print('starting')
with open('path_to_file.txt') as f:
with ProcessPoolExecutor() as executor:
executor.map(process, [f])
print('done')
main()
输出缺少“处理”:
starting
done
而是将文件名作为字符串传递给每个进程。为每个进程使用不同的文件名,这样它们就不会互相覆盖。