同时在后台运行一个函数,同时发送python

问题描述 投票:0回答:1
algo.run

不断在后台运行,而

Main.send
则定期进行API调用。由于两者都在阻止操作,所以我也应该在

async def

中的方法中使用

Algorithm
吗?此外,我将如何处理
Main.send
尝试从磁盘读取文件的边缘情况,而
algo.run
则尝试同时保存文件?
任何建议将不胜感激!

import json
import random
import time

ENDPOINT = 'http://some_api.com/endpoint'  # Some external API

class Main:
    def __init__(self):
        algo = Algorithm()
    
        # self.send() ## These would block
        # algo.run()
    
    async def send(self):

        # Continuously listen on an endpoint
        while True:

            response = requests.get(ENDPOINT).json()
            if response['should_send']:
    
                # Load the result from disk
                with open('results.json', 'r') as file:
                    outputs = json.load(file)

                # Send results to API
                requests.post(ENDPOINT, outputs)
    
            time.sleep(60)

class Algorithm:
    def __init__(self):
        pass
    def run(self):
        # Some expensive computations, running repeatedly in background
        while True:
            outputs = {'result' : random.random()}
            time.sleep(60)

            # Save result to disk
            with open('results.json', 'w') as file:
                json.dump(outputs, file)

如果您的CPU依赖性算法不会暂停I/O(甚至它确实如此),请将其运行到另一个线程中。 Asyncio在其他线程中调用代码具有一些功能,这可能很棒(甚至比
concurrent.futures
.
python asynchronous concurrency python-asyncio
1个回答
0
投票
在您的情况下,这可能有效

import json import random import time import asyncio import httpx # (you will use this to replace requests) ENDPOINT = 'http://some_api.com/endpoint' # Some external API # let's keep things simple: # in Python things that don't need to be # a class, don't need to be a class! # if "send" has some state it would like async def main(): # this now being async, it can orchestrate the # lifetime of Algorithm classes: algo = Algorithm() # create the "send" task, which then will # run as the async-loop in the main thread # is idle! send_task = asyncio.create_task(send()) # sets a task that will run "algo" in another thread: algo_task = asyncio.create_task(asyncio.to_thread(algo.run)) # pass the control to the loop, staying idle and allowing # both tasks to run: asyncio.gather(send_task, algo_task) async def send(self): # Continuously listen on an endpoint with httpx.AsyncClient as client: while True: # "requests" is not really an asynchronous lib # use httpx instead: # response = (await httpx.get(ENDPOINT)).json() if response['should_send']: # Load the result from disk with open('results.json', 'r') as file: outputs = json.load(file) # Send results to API requests.post(ENDPOINT, outputs) # pauses 60 seconds while allowing other # async tasks to run in the same thread: await asyncio.sleep(60) class Algorithm: def __init__(self): pass def run(self): # Some expensive computations, running repeatedly in background while True: outputs = {'result' : random.random()} # this is running in other thread, no problem using synchronous sleep: time.sleep(60) # Save result to disk with open('results.json', 'w') as file: json.dump(outputs, file) if __name__ == "__main__": asyncio.run(main())

实际上,在这里,异步甚至不需要;我刚刚使用它,因为您在问题上提到了它,并且您的

send
方法被标记为异步。真正的交易是将非授权的“算法”代码在不同的线程中。

但是,Asyncio示例可以是您交织在同一项目中的其他I/O绑定任务的基础。

对于您的另一个问题,同时编写的文件编写的文件毫无疑问:由于您要发出单独的

open
呼叫,因此,OS会为您做正确的事情(好的,Windows可能会给您一个Oserror,如果您试图打开文件,而另一个文件则是MacOS,Linux和其他所有可疑的OSES,则只有其他想象的OSES):
假设文件打开以进行读取之后,写入将命中:如果“打开”操作已经解决,则该程序将正常从文件的上一个版本中读取,即使文件系统将在写入新版本的情况下显示新版本。
如果写作需要时间,但是在相反的情况下,有可能发送部分文件(或0 lenght文件)的风险:当节省开始并正在进行中时,并且该文件开放供阅读。 如果写作很快,并且发送文件可以始终等待其完成,那么您需要的只是锁(在这种情况下为螺纹。lock):

open

,例如,写入文件需要几秒钟,而您宁愿不等,模式是写入其他文件名,然后,一旦写作完成, 在作者任务中,使用锁定机制将新文件重命名为旧文件:

import json import random import time import asyncio import httpx # (you will use this to replace requests) import threading ENDPOINT = 'http://some_api.com/endpoint' # Some external API # let's keep things simple: # in Python things that don't need to be # a class, don't need to be a class! # if "send" has some state it would like async def main(): # this now being async, it can orchestrate the # lifetime of Algorithm classes: lock = threading.Lock() algo = Algorithm(lock) # create the "send" task, which then will # run as the async-loop in the main thread # is idle! send_task = asyncio.create_task(send(lock)) # sets a task that will run "algo" in another thread: algo_task = asyncio.create_task(asyncio.to_thread(algo.run)) # pass the control to the loop, staying idle and allowing # both tasks to run: asyncio.gather(send_task, algo_task) async def send(self, lock): # Continuously listen on an endpoint with httpx.AsyncClient as client: while True: # "requests" is not really an asynchronous lib # use httpx instead: # response = (await httpx.get(ENDPOINT)).json() if response['should_send']: # Load the result from disk with lock, open('results.json', 'r') as file: outputs = json.load(file) # Send results to API requests.post(ENDPOINT, outputs) # pauses 60 seconds while allowing other # async tasks to run in the same thread: await asyncio.sleep(60) class Algorithm: def __init__(self, lock): self.lock = lock def run(self): # Some expensive computations, running repeatedly in background while True: outputs = {'result' : random.random()} # this is running in other thread, no problem using synchronous sleep: time.sleep(60) # Save result to disk with lock, open('results.json', 'w') as file: json.dump(outputs, file) if __name__ == "__main__": asyncio.run(main())

	
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.