我正在尝试运行 python 6 个进程,其中 4 个进程超级快地完成,2 个进程需要更长的时间。现在问,这样就不会成为后续问题。
我正在尝试依次运行 4 个快速任务,然后与需要更长时间的 2 个任务一起运行该批次。所有功能都非常独特且不可迭代。
下面的代码可以工作,但将在 6 个核心上运行,我需要考虑到我的用户只有 4 个核心,并且不需要让他们的机器慢得像爬行一样。
我希望括号能解释我想要实现的目标 p1+p2+(p3->p4->p5-p6)
最后我想打印出完成情况,现在它会在流程开始时打印出完成时间,每个作业打印一次,而不是等待作业完成并打印一次。
我尝试过异步,在我达到分割作业的程度之前,我遇到了一个问题,它看起来已经完成,但根本不起作用(根本不处理文件),尽管它打印出完成一次最后,因为我正在寻找它
我在网上找到的所有帮助都显示了迭代过程,并将所有工作视为平等,而且我对 python 很陌生,如果我更好地为我的问题添加正确的单词,可能会找到我的答案
if __name__ == '__main__':
p1 = Process(target=job1)
p1.start()
p2 = Process(target=job2)
p2.start()
p3 = Process(target=job3)
p3.start()
p4 = Process(target=job4)
p4.start()
p5 = Process(target=job5)
p5.start()
p6 = Process(target=job6)
p6.start()
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
if __name__ == '__main__':
with Pool() as pool:
p1 = pool.map_async(job1,range(1))
p2 = pool.map_async(job2,range(1))
p3 = pool.map_async(job3,range(1))
p4 = pool.map_async(job4,range(1))
p5 = pool.map_async(job5,range(1))
p6 = pool.map_async(job6,range(1))
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
Process 完成工作,但打印错误,而 map_async 不完成工作
添加全套代码、注释和所需结果
# import required modules
import os
import pandas as pd
import warnings
from datetime import date
import time
from multiprocessing import Process
from multiprocessing.pool import Pool
warnings.simplefilter("ignore")
OverallStart = time.process_time()
# Get User Paths
ORIGINALPATH = "c:/OriginalPATH"
FINALPATH = "c:/FinalPATH/"
CONVERTED = FINALPATH+"Converted/"
PROCESSED = FINALPATH+"Processed/"
#Filenames
job1 = "Refused Details"
job2 = "CSA Details"
job3 = "CSAT Details"
job4 = "RCR Details"
job5 = "AHT Details"
job6 = "Transferred Details"
C = "Converted"
D = "Processed"
X = ".xlsx"
V = ".csv"
DT = date.today().strftime("_%d_%m_%Y")
#Define Functions
def fjob1():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job1+X):
print ("No "+job1+" found or already processed")
else:
RFSDdf = pd.read_excel(ORIGINALPATH+job1+X,header=2,engine='openpyxl')
RFSDdf.to_csv(CONVERTED+job1+"/"+job1+DT+V, index=False)
os.rename(ORIGINALPATH+job1+X, PROCESSED+job1+"/"+job1+DT+X)
print(job1+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob2():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job2+X):
print ("No "+job2+" found or already processed")
else:
TRSFDdf = pd.read_excel(ORIGINALPATH+job2+X,header=2,engine='openpyxl')
TRSFDdf.to_csv(CONVERTED+job2+"/"+job2+DT+V, index=False)
os.rename(ORIGINALPATH+job2+X, PROCESSED+job2+"/"+job2+DT+X)
print(job2+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob3():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job3+X):
print ("No "+job3+" found or already processed")
else:
CSAdf = pd.read_excel(ORIGINALPATH+job3+X,header=2,engine='openpyxl')
CSAdf.to_csv(CONVERTED+job3+"/"+job3+DT+V, index=False)
os.rename(ORIGINALPATH+job3+X, PROCESSED+job3+"/"+job3+DT+X)
print(job3+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob4():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job4+X):
print ("No "+job4+" found or already processed")
else:
CSATdf = pd.read_excel(ORIGINALPATH+job4+X,header=2,engine='openpyxl')
CSATdf.to_csv(CONVERTED+job4+"/"+job4+DT+V, index=False)
os.rename(ORIGINALPATH+job4+X, PROCESSED+job4+"/"+job4+DT+X)
print(job4+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob5():
start = time.process_time()
if not os.path.exists(ORIGINALPATH+job5+X):
print ("No "+job5+" found or already processed")
else:
RCRdf = pd.read_excel(ORIGINALPATH+job5+X,header=2,engine='openpyxl')
RCRdf.to_csv(CONVERTED+job5+"/"+job5+DT+V, index=False)
os.rename(ORIGINALPATH+job5+X, PROCESSED+job5+"/"+job5+DT+X)
print(job5+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
def fjob6():
start = time.process_time()
if not os.path.job6(ORIGINALPATH+job6+X):
print ("No "+job6+" found or already processed")
else:
AHTdf = pd.read_excel(ORIGINALPATH+job6+X,header=2,engine='openpyxl')
AHTdf.to_csv(CONVERTED+job6+"/"+job6+DT+V, index=False)
os.rename(ORIGINALPATH+job6+X, PROCESSED+job6+"/"+job6+DT+X)
print(job6+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
启动功能,方法一
if __name__ == '__main__':
p1 = Process(target=fjob1)
p1.start()
p2 = Process(target=fjob2)
p2.start()
p3 = Process(target=fjob3)
p3.start()
p4 = Process(target=fjob4)
p4.start()
p5 = Process(target=fjob5)
p5.start()
p6 = Process(target=fjob6)
p6.start()
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
方法1输出
全部处理时间为 0.0 秒
enter code here
全部处理时间为 0.0 秒
全部处理时间为 0.0 秒
全部处理时间为 0.0 秒
全部处理时间为 0.0 秒
全部处理时间为 0.0 秒
全部处理时间为 0.0 秒
job3 处理时间为 11.0 秒
job4 在 12.0 秒内处理
job5 处理时间为 14.0 秒
job6 处理时间为 17.0 秒
job2 处理时间为 41.0 秒
job1 处理时间为 58.0 秒
所有作业同时运行,并在调用我的所有函数后完成上述时间
启动功能,方法二
if __name__ == '__main__':
with Pool() as pool:
p1 = pool.map_async(fjob1,range(1))
p2 = pool.map_async(fjob2,range(1))
p3 = pool.map_async(fjob3,range(1))
p4 = pool.map_async(fjob4,range(1))
p5 = pool.map_async(fjob5,range(1))
p6 = pool.map_async(fjob6,range(1))
print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
方法2输出
全部处理时间为0.0秒
作业在开始时完成,并且没有发生文件转换 期望的输出
job3 在 11.0 秒内处理 job4 在 12.0 秒内处理 job5 处理时间为 14.0 秒 job2 处理时间为 41.0 秒 job6 处理时间为 17.0 秒 job1 处理时间为 58.0 秒 全部处理时间为 58.0 秒
主要是我想测量所有功能完成后的最终运行时间
其次,我希望job1、job2和job3同时启动。 job4 将等待 job3 完成,job5 将等待 job4 完成,job 6 将等待 job 6 完成
我将创建一个大小为 4 的多处理池。然后,我将提交两个运行缓慢的任务,并使用 'multiprocessing.Barrier' 实例等待它们启动。
Barrier
实例使用 parties=3 进行初始化。两个缓慢的任务和主进程然后在 wait
实例上 Barrier
。这 3 个进程将阻塞,直到所有进程都发出 wait
,然后这 3 个进程被释放。这样,主进程将不会继续提交运行缓慢的任务,直到它确定这 2 个运行缓慢的任务正在池的 2 个进程中运行。然后,我将提交 4 个快速运行的任务,这将使用剩余的 2 个池进程。
import time
def init_pool_processes(b):
global barrier
barrier = b
def slow_task(i):
t = time.time()
barrier.wait()
print('slow task', i, 'started', flush=True)
# Emulate doing something useful:
s = 0
for _ in range(200_000_000):
s += 1
print('slow task', i, 'ended.', 'running time =', time.time() - t, flush=True)
def fast_task(i):
t = time.time()
print('fast task', i, 'started', flush=True)
s = 0
for _ in range(30_000_000):
s += 1
print('fast task', i, 'ended.', 'running time =', time.time() - t, flush=True)
if __name__ == '__main__':
from multiprocessing import Pool, Barrier
barrier = Barrier(parties=3)
pool = Pool(4, initializer=init_pool_processes, initargs=(barrier,))
t = time.time()
# Submit the slow tasks:
for i in (1, 2):
pool.apply_async(slow_task, args=(i,))
# Wait for the slow tasks to start:
barrier.wait()
# Now submit the 4 fast tasks, which will use the remaining
# 2 pool processes:
for i in (1, 2, 3, 4):
pool.apply_async(fast_task, args=(i,))
# Wait for all tasks to complete:
pool.close()
pool.join()
print('Total running time =', time.time() - t)
打印:
slow task 2 started
slow task 1 started
fast task 1 started
fast task 2 started
fast task 2 ended. running time = 1.9650518894195557
fast task 3 started
fast task 1 ended. running time = 2.0210554599761963
fast task 4 started
fast task 4 ended. running time = 2.0868680477142334
fast task 3 ended. running time = 2.1768722534179688
slow task 2 ended. running time = 11.62192964553833
slow task 1 ended. running time = 11.636930227279663
Total running time = 11.7549307346344