我正在尝试使用 Mastodon.py 异步传输来自 Mastodon 的通知。要运行异步代码,我尝试使用 Trio
我尝试过以下方法:
...
from mastodon import Mastodon, StreamListener
...
def main():
...
access_token = os.getenv("MASTODON_ACCESS_TOKEN")
api_base_url = os.getenv("MASTODON_API_BASE_URL")
# Login
mastodon = Mastodon(
access_token=access_token,
api_base_url=api_base_url
)
# Show username
logger.info(f"Logged in as {mastodon.account_verify_credentials()['username']}")
...
logger.info("Starting user stream")
user = mastodon.stream_user(TheStreamListener())
logger.info("Started user stream")
try:
trio.run(user)
except KeyboardInterrupt:
logger.info("Stopping user stream")
user.close()
logger.info("Stopped user stream")
class TheStreamListener(StreamListener):
def on_update(self, status):
logger.info(f"Got update: {status['content']}")
...
def on_notification(self, notification):
if notification['type'] == 'mention':
logger.opt(colors=True).info(f"Got <blue>mention</blue> from {notification['account']['username']}") # noqa E501
...
main()
我预计在运行
Started user stream
后会记录 trio.run()
。但是,该流似乎不是异步的,因为记录的最后一条消息是Starting user stream
。我知道 run_async
有一个 mastodon.stream_user()
参数,但无法让它异步运行。
我已经尝试过
mastodon.stream_user(TheStreamListener(), run_async=True, reconnect_async=True)
。但是,我不确定如何在流立即退出时保持流运行,同时运行其他代码。
我最终成功实现了以下代码:
def main():
logger.info("Starting user stream")
stream = mastodon.stream_user(TheStreamListener(),
run_async=True)logger.info("Started user stream")
trio.run(sleep_or_not, stream)
async def sleep_or_not(stream):
# Experimenting with running other code while the stream is running
try:
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.sleep_forever)
nursery.start_soon(print_time)
except KeyboardInterrupt:
...