algo.run
不断在后台运行,而
Main.send
则定期进行API调用。由于两者都在阻止操作,所以我也应该在async def
中的方法中使用
Algorithm
吗?此外,我将如何处理Main.send
尝试从磁盘读取文件的边缘情况,而algo.run
则尝试同时保存文件?任何建议将不胜感激!
import json
import random
import time
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
class Main:
def __init__(self):
algo = Algorithm()
# self.send() ## These would block
# algo.run()
async def send(self):
# Continuously listen on an endpoint
while True:
response = requests.get(ENDPOINT).json()
if response['should_send']:
# Load the result from disk
with open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
time.sleep(60)
class Algorithm:
def __init__(self):
pass
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
time.sleep(60)
# Save result to disk
with open('results.json', 'w') as file:
json.dump(outputs, file)
如果您的CPU依赖性算法不会暂停I/O(甚至它确实如此),请将其运行到另一个线程中。 Asyncio在其他线程中调用代码具有一些功能,这可能很棒(甚至比
concurrent.futures
.import json
import random
import time
import asyncio
import httpx # (you will use this to replace requests)
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
# let's keep things simple:
# in Python things that don't need to be
# a class, don't need to be a class!
# if "send" has some state it would like
async def main():
# this now being async, it can orchestrate the
# lifetime of Algorithm classes:
algo = Algorithm()
# create the "send" task, which then will
# run as the async-loop in the main thread
# is idle!
send_task = asyncio.create_task(send())
# sets a task that will run "algo" in another thread:
algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
# pass the control to the loop, staying idle and allowing
# both tasks to run:
asyncio.gather(send_task, algo_task)
async def send(self):
# Continuously listen on an endpoint
with httpx.AsyncClient as client:
while True:
# "requests" is not really an asynchronous lib
# use httpx instead:
#
response = (await httpx.get(ENDPOINT)).json()
if response['should_send']:
# Load the result from disk
with open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
# pauses 60 seconds while allowing other
# async tasks to run in the same thread:
await asyncio.sleep(60)
class Algorithm:
def __init__(self):
pass
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
# this is running in other thread, no problem using synchronous sleep:
time.sleep(60)
# Save result to disk
with open('results.json', 'w') as file:
json.dump(outputs, file)
if __name__ == "__main__":
asyncio.run(main())
实际上,在这里,异步甚至不需要;我刚刚使用它,因为您在问题上提到了它,并且您的
send
方法被标记为异步。真正的交易是将非授权的“算法”代码在不同的线程中。
但是,Asyncio示例可以是您交织在同一项目中的其他I/O绑定任务的基础。
对于您的另一个问题,同时编写的文件编写的文件毫无疑问:由于您要发出单独的
open
呼叫,因此,OS会为您做正确的事情(好的,Windows可能会给您一个Oserror,如果您试图打开文件,而另一个文件则是MacOS,Linux和其他所有可疑的OSES,则只有其他想象的OSES): 假设文件打开以进行读取之后,写入将命中:如果“打开”操作已经解决,则该程序将正常从文件的上一个版本中读取,即使文件系统将在写入新版本的情况下显示新版本。如果写作需要时间,但是在相反的情况下,有可能发送部分文件(或0 lenght文件)的风险:当节省开始并正在进行中时,并且该文件开放供阅读。 如果写作很快,并且发送文件可以始终等待其完成,那么您需要的只是锁(在这种情况下为螺纹。lock):
open
,例如,写入文件需要几秒钟,而您宁愿不等,模式是写入其他文件名,然后,一旦写作完成,
在作者任务中,使用锁定机制将新文件重命名为旧文件:
import json
import random
import time
import asyncio
import httpx # (you will use this to replace requests)
import threading
ENDPOINT = 'http://some_api.com/endpoint' # Some external API
# let's keep things simple:
# in Python things that don't need to be
# a class, don't need to be a class!
# if "send" has some state it would like
async def main():
# this now being async, it can orchestrate the
# lifetime of Algorithm classes:
lock = threading.Lock()
algo = Algorithm(lock)
# create the "send" task, which then will
# run as the async-loop in the main thread
# is idle!
send_task = asyncio.create_task(send(lock))
# sets a task that will run "algo" in another thread:
algo_task = asyncio.create_task(asyncio.to_thread(algo.run))
# pass the control to the loop, staying idle and allowing
# both tasks to run:
asyncio.gather(send_task, algo_task)
async def send(self, lock):
# Continuously listen on an endpoint
with httpx.AsyncClient as client:
while True:
# "requests" is not really an asynchronous lib
# use httpx instead:
#
response = (await httpx.get(ENDPOINT)).json()
if response['should_send']:
# Load the result from disk
with lock, open('results.json', 'r') as file:
outputs = json.load(file)
# Send results to API
requests.post(ENDPOINT, outputs)
# pauses 60 seconds while allowing other
# async tasks to run in the same thread:
await asyncio.sleep(60)
class Algorithm:
def __init__(self, lock):
self.lock = lock
def run(self):
# Some expensive computations, running repeatedly in background
while True:
outputs = {'result' : random.random()}
# this is running in other thread, no problem using synchronous sleep:
time.sleep(60)
# Save result to disk
with lock, open('results.json', 'w') as file:
json.dump(outputs, file)
if __name__ == "__main__":
asyncio.run(main())