我正在尝试在龙卷风重写方法中调用异步函数。 最初,函数
generate()
是同步的,但这似乎阻塞了整个程序,包括on_connection_close
方法
class Reader:
def __init__(self, topic, group_id):
self.topic = topic
self.consumer = AIOKafkaConsumer(topic, bootstrap_servers="kafka:9092",
group_id=group_id)
async def read(self, sender):
print("enter read...")
#it seems the code below is blocking
await self.consumer.start()
async for message in self.consumer:
sender(json.dumps(consumed_message))
await self.consumer.commit()
class SampleSockeHandler(tornado.websocket.WebSocketHandler):
def on_connection_close(self):
self.app.close()
def on_message(self, message):
generate(message)
#await generate(message)
async def generate(self, message):
await self.reader.read(message)
print("enter read...")
仅在第一个 read()
调用中执行一次。在随后的 read()
调用中,它不再打印。
现在,当我注释掉它下面的所有代码时,即
await self.consumer.start()
async for message in self.consumer:
sender(json.dumps(consumed_message))
await self.consumer.commit()
又可以用了
如何解决这个问题?难道不应该是异步调用吗?
async def main():
app = Application()
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
asyncio.run(main())
on_message()
可以是协程,并且如果您使用 async
,则必须是 await generate(message)
。
https://www.tornadoweb.org/en/stable/websocket.html#tornado.websocket.WebSocketHandler.on_message