在 Tornado 重写方法中调用异步函数

问题描述 投票:0回答:1

我正在尝试在龙卷风重写方法中调用异步函数。 最初,函数

generate()
是同步的,但这似乎阻塞了整个程序,包括
on_connection_close
方法

class Reader:
  
    def __init__(self, topic, group_id):
        self.topic = topic
        self.consumer = AIOKafkaConsumer(topic, bootstrap_servers="kafka:9092",
                                         group_id=group_id)

    async def read(self, sender):
        print("enter read...")
        #it seems the code below is blocking
        await self.consumer.start()
        async for message in self.consumer:      
            sender(json.dumps(consumed_message))
            await self.consumer.commit()

class SampleSockeHandler(tornado.websocket.WebSocketHandler):


    def on_connection_close(self):
        self.app.close()

    def on_message(self, message):
        generate(message)
        #await generate(message)

    async def generate(self, message):
      
        await self.reader.read(message)



print("enter read...")
仅在第一个
read()
调用中执行一次。在随后的
read()
调用中,它不再打印。 现在,当我注释掉它下面的所有代码时,即

await self.consumer.start()
        async for message in self.consumer:      
            sender(json.dumps(consumed_message))
            await self.consumer.commit()

又可以用了

如何解决这个问题?难道不应该是异步调用吗?


async def main():

    app = Application()

    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    asyncio.run(main())
python async-await python-asyncio tornado kafka-python
1个回答
0
投票

on_message()
可以是协程,并且如果您使用
async
,则必须是
await generate(message)

https://www.tornadoweb.org/en/stable/websocket.html#tornado.websocket.WebSocketHandler.on_message

© www.soinside.com 2019 - 2024. All rights reserved.