我创建了一个类,并设置了 writer
和 reader
作为选项设置为 None
在 __init__
. 正确的用法需要调用 open
前,以确保 writer
和 reader
被正确初始化。
在 read_write
我检查的方法是 None
并在需要时自动进行初始化。但是Mypy仍然将其视为严格的可选检查错误。
import asyncio
from typing import Optional
class AsyncTest:
def __init__(self, host: str, port: int = 502) -> None:
self.host = host
self.port = port
self.writer: Optional[asyncio.StreamWriter] = None
self.reader: Optional[asyncio.StreamReader] = None
async def open(self) -> None:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
async def read_write(self) -> None:
if self.writer is None or self.reader is None:
await self.open()
self.writer.write(b'')
await self.writer.drain()
resp = await self.reader.readexactly(100)
async def close(self) -> None:
if self.writer:
self.writer.close()
await self.writer.wait_closed()
给出。
$ mypy test.py
test.py:19: error: Item "None" of "Optional[StreamWriter]" has no attribute "write"
test.py:20: error: Item "None" of "Optional[StreamWriter]" has no attribute "drain"
test.py:21: error: Item "None" of "Optional[StreamReader]" has no attribute "readexactly"
现在如果我把 read_write
方法,将对 self.open()
与呼吁 open_connection
Mypy不再抱怨了。
async def read_write(self) -> None:
if self.writer is None or self.reader is None:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
self.writer.write(b'')
await self.writer.drain()
resp = await self.reader.readexactly(100)
很明显,我可以在Mypy中禁用对该文件的严格的可选检查,作为一个临时的解决方案,但有什么方法可以让这个工作?这就需要Mypy按照方法调用到 open
并检查它是否设置了 self.writer
和 self.reader
属性。
在这个具体的小例子中 open_connection
可以直接放在里面。read_write
并完全去除 open
的方法,但这似乎不是最佳的,如果 open
方法增长。
MyPy 只知道签名,不知道副作用。的签名是 open
说不上 writer
和 reader
是在调用它之后定义的。
而不是从外部检查对象是否被 "打开",然后调用 open
否则,就把检查和打开移到一个方法中。这个方法可以静态地保证提供readwriter,并动态地将其记忆化。
class AsyncTest:
...
# method always provides valid reader/writer
async def _connection(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
# internally check if already opened
if self.reader is None or self.writer is None:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
return self.reader, self.writer
async def read_write(self) -> None:
# other methods call connection method unconditionally
reader, writer = self._connection()
writer.write(b'')
await writer.drain()
resp = await reader.readexactly(100)