我正在尝试使用 asyncio 和 aiohttp 库异步发送 HTTP POST,但我想立即向服务器发出请求。在下面的代码中,生产者和消费者通过队列不断进行通信。当消费者遇到满足特定条件(在本例中为偶数条件)的数据时,它会尝试向服务器发送 POST 请求。
代码中有两个方法:#METHOD 1 和 #METHOD 2。但是,两者都存在一些问题。使用 # 方法 1,功能可以正常工作,但存在一个问题,即消费者在收到 POST 结果之前会被阻止(生产者没有被阻止,但消费者被阻止)。我想要的是消费者继续独立处理生产者产生的数据,即使到服务器的 POST 正在等待。
所以,我尝试了#方法2,但这里也有一个问题。 # 方法2 使用create_task 创建一个独立的任务来处理POST 请求。问题是create_task只调度事件循环中的任务;它不保证立即执行。
总之,我想立即执行向服务器发送POST请求(只需发送),但使用create_task异步处理服务器对POST响应的验证。如果 aiohttp 库无法做到这一点,我可能需要探索其他库。有办法实现我想要的吗?
import asyncio
import aiohttp
import random
async def my_send_post(session, url, headers, params):
resp = await session.post(url=url, headers=headers, json=params)
resp = await resp.json()
print(resp)
return
async def producer(queue):
x = 0
while True:
# produce an item
print(f'producing {x}')
# simulate i/o operation using sleep
await asyncio.sleep(1)
# put the item in the queue
await queue.put(x)
x += 1
async def consumer(queue):
session = aiohttp.ClientSession()
url = 'https://api.testurl.com/'
params = {}
headers = request_headers(params) # just get headers using other library
while True:
# wait for an item from the producer
item = await queue.get()
if item %2 == 0:
# METHOD 1
resp = await session.post(url=url, headers=headers, json=params)
resp = await resp.json()
print(resp)
# METHOD 2
asyncio.create_task(my_send_post(session=session, url=url, headers=headers, params=params))
else:
print(f"consuming odd number{item}")
async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
if __name__ == "__main__":
asyncio.run(main())`
问题是create_task只调度事件循环中的任务;它不保证立即执行。
没有任何方法可以保证这一点,除非您知道应用程序中的所有变量,但您应该能够足够接近。
首先要了解的是,create_task() 安排任务在循环的下一次迭代中运行(它显然不会立即执行任何内容,因为没有
await
以便屈服于事件循环)。
接下来要理解的是,
asyncio.sleep(0)
可用于让出事件循环进行 1 次迭代。
那么这只是当前正在运行哪些任务的问题。如果只有 2 个任务正在运行,那么我们知道在屈服于事件循环后,新任务将是下一个运行的任务,因此我们可以确定它会立即执行。
如果还有其他任务在运行,那么可能需要更长的时间(即它需要完成当前迭代中任何剩余任务的一个步骤,然后是在它之前安排的下一次迭代中的任何任务。但是,我们知道它将在当前任务恢复之前运行)。
这是一个非常详细的解释,但基本上,你可能只想这样做:
t = asyncio.create_task(...)
await asyncio.sleep(0)
此外,顺便说一句,您应该在某些时候保留对任务的引用并对其进行
await
,否则您可能会丢失异常(在使用调试模式时可能会收到有关此问题的警告(python -X dev
))。如果您想轻松地将任务扔到后台,我建议使用 aiojobs (尽管 asyncio 本身可能很快就会有一些东西来处理这个问题)。