我使用以下脚本来监听来自公共电报频道和群组的新消息。
import configparser
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events, sync
from telethon.tl.functions.messages import (GetHistoryRequest)
from telethon.tl.types import (
PeerChannel
)
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxxxx'
#target channels that you want to listen to:
input_channels = ('https://t.me/xxxxxx','https://t.me/xxxx','https://t.me/xxxxxxx')
#create a client
client = TelegramClient('anon', api_id, api_hash)
# Listen to messages from target channels
@client.on(events.NewMessage(chats=input_channels))
async def newMessageListener(event):
# Get message text
newMessage = event.message.message
print(newMessage)
with client:
client.run_until_disconnected()
当通道关闭时,我收到以下错误:ValueError:没有用户将“close_channel_name”作为用户名,并且我停止接收任何数据。
有办法识别无效通道吗?
到目前为止,我发现以下内容可以识别有效通道,但可能有更好的方法:
client.start()
result = client.get_entity('https://t.me/xxxxxx')
以下内容适用于在电视马拉松上捕获错误(在协程内)
import asyncio
from telethon import TelegramClient, events
session_name = 'anon'
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxx'
chat_list = ('https://t.me/xxxxxxxx', 'https://t.me/xxxxxxxxxx')
async def main():
async with TelegramClient(session_name, api_id, api_hash) as client:
@client.on(events.NewMessage(chats=chat_list))
async def handler(event):
print(event.message.message)
await client.run_until_disconnected()
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ValueError):
print(context['exception'])
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(main())
loop.run_forever()
对 Stamatis 的答案进行的小研究:如果您使用
loop.create_task(..) + loop.run_forever()
运行任务,它就会起作用。
如果您使用
运行异步任务asyncio.run_until_complete(task())
,或telethon_bot.run_until_complete(task())
,或bot.run_until_disconnected()
,您将无法捕获所需的异常。
Python 关于 loop.run_until_complete(future) 的文档说:
Return the Future’s result or raise its exception.
在这种情况下,您可能会发现传统的错误:
try:
bot.run_until_compete(your_async_task())
except Exception as e:
handle_the_exception(e)
我不知道如何处理
bot.run_until_disconnected()
情况:两者都不起作用。
我尝试过但失败了:
try:
loop.create_task(task_with_error()) # Works, handles error the Stamatis's way
# Still doesn't catch anything (neither below or in Stamatis handler).
# bot.run_until_disconnected()
loop.create_task(bot._run_until_disconnected())
loop.run_forever()
except Exception as e:
print("==============Finally!==============")
print(e)