我正在尝试使用Python多重处理同时处理一些而不是其他,然后在最后打印所花费的总时间

问题描述 投票:0回答:1

我正在尝试运行 python 6 个进程,其中 4 个进程超级快地完成,2 个进程需要更长的时间。现在问,这样就不会成为后续问题。

我正在尝试依次运行 4 个快速任务,然后与需要更长时间的 2 个任务一起运行该批次。所有功能都非常独特且不可迭代。

下面的代码可以工作,但将在 6 个核心上运行,我需要考虑到我的用户只有 4 个核心,并且不需要让他们的机器慢得像爬行一样。

我希望括号能解释我想要实现的目标 p1+p2+(p3->p4->p5-p6)

最后我想打印出完成情况,现在它会在流程开始时打印出完成时间,每个作业打印一次,而不是等待作业完成并打印一次。

我尝试过异步,在我达到分割作业的程度之前,我遇到了一个问题,它看起来已经完成,但根本不起作用(根本不处理文件),尽管它打印出完成一次最后,因为我正在寻找它

我在网上找到的所有帮助都显示了迭代过程,并将所有工作视为平等,而且我对 python 很陌生,如果我更好地为我的问题添加正确的单词,可能会找到我的答案

    if __name__ == '__main__':
        p1 = Process(target=job1)
        p1.start()
        p2 = Process(target=job2)
        p2.start()
        p3 = Process(target=job3)
        p3.start()
        p4 = Process(target=job4)
        p4.start()
        p5 = Process(target=job5)
        p5.start()
        p6 = Process(target=job6)
        p6.start()
    
    print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')
    
    
    if __name__ == '__main__':
        with Pool() as pool:
            p1 = pool.map_async(job1,range(1))
            p2 = pool.map_async(job2,range(1))
            p3 = pool.map_async(job3,range(1))
            p4 = pool.map_async(job4,range(1))
            p5 = pool.map_async(job5,range(1))
            p6 = pool.map_async(job6,range(1))
            print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

Process 完成工作,但打印错误,而 map_async 不完成工作

添加全套代码、注释和所需结果

    # import required modules
    import os
    import pandas as pd
    import warnings
    from datetime import date
    import time
    from multiprocessing import Process
    from multiprocessing.pool import Pool
    warnings.simplefilter("ignore")
    OverallStart  = time.process_time()
    
    
    # Get User Paths
    ORIGINALPATH = "c:/OriginalPATH"
    FINALPATH = "c:/FinalPATH/"
    CONVERTED = FINALPATH+"Converted/"
    PROCESSED = FINALPATH+"Processed/"
    
    #Filenames
    job1 = "Refused Details"
    job2 = "CSA Details"
    job3 = "CSAT Details"
    job4 = "RCR Details"
    job5 = "AHT Details"
    job6 = "Transferred Details"
    C = "Converted"
    D = "Processed"
    X = ".xlsx"
    V = ".csv"
    DT = date.today().strftime("_%d_%m_%Y")
    
    
    #Define Functions
    
    def fjob1(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job1+X):
            print ("No "+job1+" found or already processed")
        else:
            RFSDdf = pd.read_excel(ORIGINALPATH+job1+X,header=2,engine='openpyxl')
            RFSDdf.to_csv(CONVERTED+job1+"/"+job1+DT+V, index=False)
            os.rename(ORIGINALPATH+job1+X, PROCESSED+job1+"/"+job1+DT+X)
            print(job1+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob2(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job2+X):
            print ("No "+job2+" found or already processed")
        else:
            TRSFDdf = pd.read_excel(ORIGINALPATH+job2+X,header=2,engine='openpyxl')
            TRSFDdf.to_csv(CONVERTED+job2+"/"+job2+DT+V, index=False)
            os.rename(ORIGINALPATH+job2+X, PROCESSED+job2+"/"+job2+DT+X)
            print(job2+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob3(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job3+X):
            print ("No "+job3+" found or already processed")
        else:
            CSAdf = pd.read_excel(ORIGINALPATH+job3+X,header=2,engine='openpyxl')
            CSAdf.to_csv(CONVERTED+job3+"/"+job3+DT+V, index=False)
            os.rename(ORIGINALPATH+job3+X, PROCESSED+job3+"/"+job3+DT+X)
            print(job3+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob4(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job4+X):
            print ("No "+job4+" found or already processed")
        else:
            CSATdf = pd.read_excel(ORIGINALPATH+job4+X,header=2,engine='openpyxl')
            CSATdf.to_csv(CONVERTED+job4+"/"+job4+DT+V, index=False)
            os.rename(ORIGINALPATH+job4+X, PROCESSED+job4+"/"+job4+DT+X)
            print(job4+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob5(): 
        start = time.process_time()
        if not os.path.exists(ORIGINALPATH+job5+X):
            print ("No "+job5+" found or already processed")
        else:
            RCRdf = pd.read_excel(ORIGINALPATH+job5+X,header=2,engine='openpyxl')
            RCRdf.to_csv(CONVERTED+job5+"/"+job5+DT+V, index=False)
            os.rename(ORIGINALPATH+job5+X, PROCESSED+job5+"/"+job5+DT+X)
            print(job5+' Processed in '+str(round(time.process_time() - start,1))+' seconds')
    
    def fjob6(): 
        start = time.process_time()
        if not os.path.job6(ORIGINALPATH+job6+X):
            print ("No "+job6+" found or already processed")
        else:
            AHTdf = pd.read_excel(ORIGINALPATH+job6+X,header=2,engine='openpyxl')
            AHTdf.to_csv(CONVERTED+job6+"/"+job6+DT+V, index=False)
            os.rename(ORIGINALPATH+job6+X, PROCESSED+job6+"/"+job6+DT+X)
            print(job6+' Processed in '+str(round(time.process_time() - start,1))+' seconds')

启动功能,方法一

    if __name__ == '__main__':
        p1 = Process(target=fjob1)
        p1.start()
        p2 = Process(target=fjob2)
        p2.start()
        p3 = Process(target=fjob3)
        p3.start()
        p4 = Process(target=fjob4)
        p4.start()
        p5 = Process(target=fjob5)
        p5.start()
        p6 = Process(target=fjob6)
        p6.start()
    
    print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

方法1输出

全部处理时间为 0.0 秒

enter code here
全部处理时间为 0.0 秒 全部处理时间为 0.0 秒 全部处理时间为 0.0 秒 全部处理时间为 0.0 秒 全部处理时间为 0.0 秒 全部处理时间为 0.0 秒 job3 处理时间为 11.0 秒 job4 在 12.0 秒内处理 job5 处理时间为 14.0 秒 job6 处理时间为 17.0 秒 job2 处理时间为 41.0 秒 job1 处理时间为 58.0 秒

所有作业同时运行,并在调用我的所有函数后完成上述时间

启动功能,方法二

    if __name__ == '__main__':
        with Pool() as pool:
            p1 = pool.map_async(fjob1,range(1))
            p2 = pool.map_async(fjob2,range(1))
            p3 = pool.map_async(fjob3,range(1))
            p4 = pool.map_async(fjob4,range(1))
            p5 = pool.map_async(fjob5,range(1))
            p6 = pool.map_async(fjob6,range(1))
            print('All processed in '+str(round(time.process_time() - OverallStart,1))+' seconds')

方法2输出

全部处理时间为0.0秒

作业在开始时完成,并且没有发生文件转换 期望的输出

job3 在 11.0 秒内处理 job4 在 12.0 秒内处理 job5 处理时间为 14.0 秒 job2 处理时间为 41.0 秒 job6 处理时间为 17.0 秒 job1 处理时间为 58.0 秒 全部处理时间为 58.0 秒

主要是我想测量所有功能完成后的最终运行时间

其次,我希望job1、job2和job3同时启动。 job4 将等待 job3 完成,job5 将等待 job4 完成,job 6 将等待 job 6 完成

python python-3.x asynchronous multiprocessing threadpool
1个回答
0
投票

我将创建一个大小为 4 的多处理池。然后,我将提交两个运行缓慢的任务,并使用 'multiprocessing.Barrier' 实例等待它们启动。

Barrier
实例使用 parties=3 进行初始化。两个缓慢的任务和主进程然后在
wait
实例上
Barrier
。这 3 个进程将阻塞,直到所有进程都发出
wait
,然后这 3 个进程被释放。这样,主进程将不会继续提交运行缓慢的任务,直到它确定这 2 个运行缓慢的任务正在池的 2 个进程中运行。然后,我将提交 4 个快速运行的任务,这将使用剩余的 2 个池进程。

import time

def init_pool_processes(b):
    global barrier

    barrier = b

def slow_task(i):
    t = time.time()
    barrier.wait()
    print('slow task', i, 'started', flush=True)

    # Emulate doing something useful:
    s = 0
    for _ in range(200_000_000):
        s += 1

    print('slow task', i, 'ended.', 'running time =', time.time() - t, flush=True)

def fast_task(i):
    t = time.time()
    print('fast task', i, 'started', flush=True)

    s = 0
    for _ in range(30_000_000):
        s += 1

    print('fast task', i, 'ended.', 'running time =', time.time() - t, flush=True)

if __name__ == '__main__':
    from multiprocessing import Pool, Barrier

    barrier = Barrier(parties=3)

    pool = Pool(4, initializer=init_pool_processes, initargs=(barrier,))

    t = time.time()
    # Submit the slow tasks:
    for i in (1, 2):
        pool.apply_async(slow_task, args=(i,))

    # Wait for the slow tasks to start:
    barrier.wait()

    # Now submit the 4 fast tasks, which will use the remaining
    # 2 pool processes:
    for i in (1, 2, 3, 4):
        pool.apply_async(fast_task, args=(i,))

    # Wait for all tasks to complete:
    pool.close()
    pool.join()
    print('Total running time =', time.time() - t)

打印:

slow task 2 started
slow task 1 started
fast task 1 started
fast task 2 started
fast task 2 ended. running time = 1.9650518894195557
fast task 3 started
fast task 1 ended. running time = 2.0210554599761963
fast task 4 started
fast task 4 ended. running time = 2.0868680477142334
fast task 3 ended. running time = 2.1768722534179688
slow task 2 ended. running time = 11.62192964553833
slow task 1 ended. running time = 11.636930227279663
Total running time = 11.7549307346344
© www.soinside.com 2019 - 2024. All rights reserved.