我正在尝试将使用多处理和队列来检测网络上的主机的代码转换为异步
#!/usr/bin/env python3
import asyncio
import socket
import time
import subprocess
import os
def pinger(ip):
"""
Do Ping
:return: ip
"""
print(f"{ip}")
DEVNULL = open(os.devnull, 'w')
try:
subprocess.check_call(
['ping', '-c1', ip],
stdout=DEVNULL
)
return True
except:
pass
def get_my_ip():
"""
Find my IP address
:return:
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("192.168.1.1", 80))
ip = s.getsockname()[0]
print("My ip is: {}".format(ip))
s.close()
return ip
async def main():
pool_size = 255
start = 102
# get my IP and compose a base like 192.168.1.xxx
ip_parts = get_my_ip().split('.')
base_ip = ip_parts[0] + '.' + ip_parts[1] + '.' + ip_parts[2] + '.'
tasks = []
for i in range(start, pool_size):
task = asyncio.create_task(pinger(f"{base_ip}{i}"))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
但这会导致奇怪的结果:
task = asyncio.create_task(pinger(f"{base_ip}{i}"))
File "/usr/lib/python3.10/asyncio/tasks.py", line 337, in create_task
task = loop.create_task(coro)
File "/usr/lib/python3.10/asyncio/base_events.py", line 438, in create_task
task = tasks.Task(coro, loop=self, name=name)
TypeError: a coroutine was expected, got True
我做错了什么?我想一次性 ping 所有可能的 IP。
正如错误消息所说!我们来看看签名:
asyncio.create_task(coro,*,名称=无,上下文=无)
将 coro Corutine 包装到任务中并安排其执行。返回任务对象。
Pinger 不是协程!
>>> def some_synchronous_func():
... pass
>>>
>>> some_synchronous_func
<function some_synchronous_func at 0x0000028389343100>
但是异步函数是返回协程的函数。
>>> async def some_async_func():
... pass
>>>
>>> some_async_func
<function some_async_func at 0x0000028388D8BC40>
>>> some_async_func()
<coroutine object some_async_func at 0x000002838906FD70>
要解决此问题,您可以:
asyncio.to_thread()
asyncio.subprocess_open()
相反,如这个答案所写,您可以使子进程调用异步建议使用后者,因为它是异步原生的并且不会创建额外的线程。
import asyncio
import socket
import subprocess
from typing import Awaitable
# --- Config ---
IP_CHECK_START = 102
IP_CHECK_END = 255
# --- Async Check Call Implementations ---
async def _check_call(task_id: int, cmd: str) -> Awaitable[bool]:
"""Run a command and return True if successful."""
print(f"[Task {task_id}] started")
try:
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
await proc.wait()
if proc.returncode:
raise subprocess.CalledProcessError(proc.returncode, cmd)
finally:
print(f"[Task {task_id}] done")
async def _check_call_threaded(task_id: int, cmd: str) -> Awaitable[bool]:
"""Use to_thread to delegate check_call"""
print(f"[Task {task_id}] started")
try:
await asyncio.to_thread(
subprocess.check_call, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
finally:
print(f"[Task {task_id}] done")
# --- Utility Func ---
async def ping_dat(task_id: int, ip: str) -> Awaitable[bool]:
"""
Ping that!
Args:
task_id: Task's ID
ip: IP address to ping
Returns:
"""
try:
# native call
await _check_call(task_id, f"ping /w 2 {ip}")
# or threaded call - not recommended
# await _check_call_threaded(task_id, f"ping /w 2 {ip}")
print(f"[Task {task_id}] {ip} is reachable")
return True
except subprocess.CalledProcessError:
# on failed command with return code other than 0,
# CalledProcessError is raised for subprocess.check_call
print(f"[Task {task_id}] {ip} is not reachable")
return False
def get_my_ip():
"""
Find my IP address
:return:
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("192.168.1.1", 80))
ip = s.getsockname()[0]
print("My ip is: {}".format(ip))
s.close()
return ip
# --- Driver ---
async def main():
# gonna check only few for demo
# get my IP and compose a base like 192.168.1.xxx
ip_parts = get_my_ip().split('.')
base_ip = ".".join(ip_parts[:-1]) + "."
tasks = [
asyncio.create_task(ping_dat(idx, f"{base_ip}{addr}"))
for idx, addr in enumerate(range(IP_CHECK_START, IP_CHECK_END + 1))
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
My ip is: [REDACTED]
[Task 0] started
[Task 1] started
[Task 2] started
[Task 3] started
[Task 4] started
[Task 3] done
[Task 3] [REDACTED] is reachable
[Task 1] done
[Task 1] [REDACTED] is not reachable
[Task 0] done
[Task 0] [REDACTED] is not reachable
[Task 4] done
[Task 4] [REDACTED] is not reachable
[Task 2] done
[Task 2] [REDACTED] is not reachable
如果您对异步本身感兴趣,请阅读下面的内容
复制我在其他答案
写的内容IO操作基本上就是这样:
“嘿操作系统,请为我做这个 IO。完成后叫醒我。” 线程 1 进入睡眠状态
一段时间后,操作系统打线程1“你的IO操作完成了,采取 然后回去工作。”
所以它所做的就是什么都不做 - 对于这种情况,又名 IO Bound 的东西, GIL可以安全地释放并让其他线程运行。内置 time.sleep、open()等函数实现了此类GIL释放 他们的 C 代码中的逻辑。
async
-await
调用链都应以异步库的 Awaitable
结束。>>> async def do_that():
... print("Doing that")
... await do_this()
... print("Done that")
>>>
>>> async def do_this():
... print("Doing this")
... await asyncio.sleep(5) # <---
... print("Done this")
>>>
>>> asyncio.run(do_that())
Doing that
Doing this
Done this
Done that
这是因为 asyncio 在内部(iirc)什么都不做,直到操作系统在某些事件发生时唤醒它们。在
async
-await
链的末尾有这样的代码,可以为我们监听事件,所以我们不必自己做。
...是的,老实说,这就是编写有效的异步代码所需的全部内容。如果有兴趣,您也可以在这里阅读更多有关 asyncio 缺点的内容!