解析器从 notcoin Telegram 机器人迷你应用程序收集新矿池的信息

问题描述 投票:0回答:1

我正在开发 notcoin 迷你应用程序和 Telegram 频道之间的机器人中介。我的任务是通知我创建的频道有关 notcoin_bot Telegram 机器人中出现的新池的信息。 1

我尝试做什么

main.py:

from dotenv import load_dotenv
import asyncio
import logging
import bot

load_dotenv()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(bot.start())

bot.py:

import asyncio
import aiohttp
from os import environ
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message
from bs4 import BeautifulSoup


dp = Dispatcher()

additional_message = "Join now to start earning!"
channel_id = ""
sent_announcements = []
check_interval = 60 * 60

#Function triggered by the "/start" command
async def start():
    global bot
    #Create a Bot instance using the Telegram API token from environment variables
    bot = Bot(
        environ.get("TOKEN"),
        default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)

    await bot.delete_webhook(drop_pending_updates=True)
    await dp.start_polling(bot)

@dp.message(CommandStart())
#Function handling messages with the "/start" command
async def getUserByID(message: Message):
    await message.answer(f"Hello, Ваш ID: {message.from_user.id}")

#Function to fetch and parse pool data
async def fetch_and_parse_pools():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://t.me/notcoin_bot") as response:
            soup = BeautifulSoup(await response.text(), "html.parser")

    pools = soup.find_all("div", class_="pool")
    for pool in pools:
        pool_name = pool.find("h3").text
        pool_reward = pool.find("span", class_="reward").text
    
        if pool_name not in sent_announcements:
            sent_announcements.append(pool_name)

            announcement_text = f"**Новый пул:** {pool_name} | **Награда:** {pool_reward} в   час.\n{additional_message}"
            await bot.send_message(channel_id, announcement_text)


#Function to schedule pool checking (improved version)
async def scheduled(delay):
    await asyncio.sleep(delay)
    loop = asyncio.get_event_loop()
    loop.create_task(fetch_and_parse_pools())

    loop = asyncio.get_event_loop()
    loop.create_task(scheduled(check_interval))

在编码过程中,我尝试完成任务并测试代码,但除了问候之外,代码的逻辑都不起作用。

python python-3.x telegram telegram-bot
1个回答
0
投票

您确定使用正确的 URL 进行解析吗?

© www.soinside.com 2019 - 2024. All rights reserved.