我有以下简化代码:
import asyncio
import websockets
async def read_from_ws(websocket):
async for message in websocket:
print(f"Received message: {message}")
async def write_to_ws(websocket):
while True:
message = input("Enter a message to send: ")
await websocket.send(message)
async def main():
uri = "ws://localhost:12345"
async with websockets.connect(uri) as websocket:
# Launch two concurrent tasks: one for reading and one for writing
await asyncio.gather(
read_from_ws(websocket),
write_to_ws(websocket),
)
asyncio.run(main())
如您所见,有两个异步任务,一个仅处理读取,一个仅处理写入。
有人向我提到“Websocket 不是线程或任务安全的,它可能会导致数据损坏”,我应该使用
asyncio.Lock()
并包装读/写。
这到底意味着什么?
我使用的是python3.11。
与线程代码相比,基于 asyncio 的代码的最大优点之一是代码根本不会从函数中间跳转到函数中间的另一个函数 - 这只会发生在
await
表达式(或 async for
和 async with
) 声明 -
这意味着上面的代码应该没问题。我真的很怀疑
websockets 代码将是异步不安全的,以至于写入可能会在读取过程中发生 - 即使它们,(底层)套接字机制也应该只允许一个发生而不会干扰另一个。
但是,为了安全起见,并且不依赖其他人不做作业,添加显式锁可能是一个好主意。当一个 Lock 对象被“获取”时,任何其他试图“获取”的并发代码都必须等待它被释放。 这是创建 asyncio.Lock() 实例并使用这些方法的问题。
通常,人们会在
with
块中使用锁,它以更方便的方式进行获取和释放 - 但这不是一种合适的使用形式,因为您需要在
的新交互时激活锁for
循环执行)无论如何,这就是在这段代码中使用锁的方式:
import asyncio
import websockets
async def read_from_ws(websocket, lock):
# the "with" form can't be used here,
# because the whole "for" bock would have to be contained in it
# and it would hold the lock for the duration of the for loop
# (in this case, "forever")
await lock.acquire()
try:
async for message in websocket:
# this inner try/finally not really needed in this simple loop body
# but if you have a more complex for-body, with multiple
# execution paths (if/else, "continue" statements, and possibl exceptions raising, do it!
try:
# just the "async for" statement itself uses the websocket.
# so we free it to be used by others:
await lock.release()
print(f"Received message: {message}")
finally:
lock.acquire()
finally:
await lock.release()
async def write_to_ws(websocket, lock):
while True:
message = input("Enter a message to send: ")
# here we can use the simpler form of acquiring/releasing the lock
# which does away with the "try/finally" needs:
async with lock:
await websocket.send(message)
async def main():
lock = asyncio.Lock()
uri = "ws://localhost:12345"
async with websockets.connect(uri) as websocket:
# Launch two concurrent tasks: one for reading and one for writing
await asyncio.gather(
read_from_ws(websocket, lock),
write_to_ws(websocket, lock),
)
asyncio.run(main())