使用asyncio检测局域网内所有在线主机

问题描述 投票:0回答:1

我正在尝试将使用多处理和队列来检测网络上的主机的代码转换为异步

#!/usr/bin/env python3

import asyncio
import socket
import time
import subprocess
import os



def pinger(ip):
    """
    Do Ping
    :return: ip
    """
    print(f"{ip}")
    DEVNULL = open(os.devnull, 'w')
    try:
        subprocess.check_call(
            ['ping', '-c1', ip],
            stdout=DEVNULL
        )
        return True
    except:
        pass


def get_my_ip():
    """
    Find my IP address
    :return:
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("192.168.1.1", 80))
    ip = s.getsockname()[0]
    print("My ip is: {}".format(ip))
    s.close()
    return ip


async def main():
    pool_size = 255
    start = 102
    # get my IP and compose a base like 192.168.1.xxx
    ip_parts = get_my_ip().split('.')
    base_ip = ip_parts[0] + '.' + ip_parts[1] + '.' + ip_parts[2] + '.'

    tasks = []

    for i in range(start, pool_size):
        task = asyncio.create_task(pinger(f"{base_ip}{i}"))
        tasks.append(task)

    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

但这会导致奇怪的结果:

    task = asyncio.create_task(pinger(f"{base_ip}{i}"))
  File "/usr/lib/python3.10/asyncio/tasks.py", line 337, in create_task
    task = loop.create_task(coro)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 438, in create_task
    task = tasks.Task(coro, loop=self, name=name)
TypeError: a coroutine was expected, got True

我做错了什么?我想一次性 ping 所有可能的 IP。

python-3.x python-asyncio python-3.10
1个回答
0
投票

问题

正如错误消息所说!我们来看看签名:

asyncio.create_task(coro,*,名称=无,上下文=无)

将 coro Corutine 包装到任务中并安排其执行。返回任务对象。

Pinger 不是协程!

>>> def some_synchronous_func():
...     pass
>>>
>>> some_synchronous_func
<function some_synchronous_func at 0x0000028389343100>

但是异步函数是返回协程的函数。

>>> async def some_async_func():
...     pass
>>>
>>> some_async_func
<function some_async_func at 0x0000028388D8BC40>

>>> some_async_func()
<coroutine object some_async_func at 0x000002838906FD70>


修复

要解决此问题,您可以:

  • 使用
    asyncio.to_thread()
  • 指定同步函数调用
  • asyncio.subprocess_open()
    相反,如这个答案所写,您可以使子进程调用异步

建议使用后者,因为它是异步原生的并且不会创建额外的线程。


import asyncio
import socket
import subprocess


from typing import Awaitable


# --- Config ---

IP_CHECK_START = 102
IP_CHECK_END = 255


# --- Async Check Call Implementations ---

async def _check_call(task_id: int, cmd: str) -> Awaitable[bool]:
    """Run a command and return True if successful."""

    print(f"[Task {task_id}] started")
    try:
        proc = await asyncio.create_subprocess_shell(
            cmd,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )

        await proc.wait()
        if proc.returncode:
            raise subprocess.CalledProcessError(proc.returncode, cmd)
    finally:
        print(f"[Task {task_id}] done")


async def _check_call_threaded(task_id: int, cmd: str) -> Awaitable[bool]:
    """Use to_thread to delegate check_call"""

    print(f"[Task {task_id}] started")
    try:
        await asyncio.to_thread(
            subprocess.check_call, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
        )
    finally:
        print(f"[Task {task_id}] done")


# --- Utility Func ---

async def ping_dat(task_id: int, ip: str) -> Awaitable[bool]:
    """
    Ping that!

    Args:
        task_id: Task's ID
        ip: IP address to ping

    Returns:

    """

    try:
        # native call
        await _check_call(task_id, f"ping /w 2 {ip}")

        # or threaded call - not recommended
        # await _check_call_threaded(task_id, f"ping /w 2 {ip}")

        print(f"[Task {task_id}] {ip} is reachable")
        return True

    except subprocess.CalledProcessError:
        # on failed command with return code other than 0,
        # CalledProcessError is raised for subprocess.check_call
        print(f"[Task {task_id}] {ip} is not reachable")
        return False


def get_my_ip():
    """
    Find my IP address
    :return:
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect(("192.168.1.1", 80))
    ip = s.getsockname()[0]
    print("My ip is: {}".format(ip))
    s.close()
    return ip


# --- Driver ---

async def main():
    # gonna check only few for demo

    # get my IP and compose a base like 192.168.1.xxx
    ip_parts = get_my_ip().split('.')
    base_ip = ".".join(ip_parts[:-1]) + "."

    tasks = [
        asyncio.create_task(ping_dat(idx, f"{base_ip}{addr}"))
        for idx, addr in enumerate(range(IP_CHECK_START, IP_CHECK_END + 1))
    ]

    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())
My ip is: [REDACTED]
[Task 0] started
[Task 1] started
[Task 2] started
[Task 3] started
[Task 4] started
[Task 3] done
[Task 3] [REDACTED] is reachable
[Task 1] done
[Task 1] [REDACTED] is not reachable
[Task 0] done
[Task 0] [REDACTED] is not reachable
[Task 4] done
[Task 4] [REDACTED] is not reachable
[Task 2] done
[Task 2] [REDACTED] is not reachable

如果您对异步本身感兴趣,请阅读下面的内容



异步原生编程规则

1.请记住(异步)并发与并行之间的区别。前者辛苦,后者辛苦!

复制我在其他答案

写的内容

IO操作基本上就是这样:

“嘿操作系统,请为我做这个 IO。完成后叫醒我。” 线程 1 进入睡眠状态

一段时间后,操作系统打线程1“你的IO操作完成了,采取 然后回去工作。”

所以它所做的就是什么都不做 - 对于这种情况,又名 IO Bound 的东西, GIL可以安全地释放并让其他线程运行。内置 time.sleep、open()等函数实现了此类GIL释放 他们的 C 代码中的逻辑。


2.所有
async
-
await
调用链都应以异步库的
Awaitable
结束。

>>> async def do_that():
...     print("Doing that")
...     await do_this()
...     print("Done that")
>>>
>>> async def do_this():
...     print("Doing this")
...     await asyncio.sleep(5)  # <---
...     print("Done this")
>>>
>>> asyncio.run(do_that())
Doing that
Doing this
Done this
Done that

这是因为 asyncio 在内部(iirc)什么都不做,直到操作系统在某些事件发生时唤醒它们。在

async
-
await
链的末尾有这样的代码,可以为我们监听事件,所以我们不必自己做。


...是的,老实说,这就是编写有效的异步代码所需的全部内容。如果有兴趣,您也可以在这里阅读更多有关 asyncio 缺点的内容!

© www.soinside.com 2019 - 2024. All rights reserved.