如何组织从一个程序到另一个程序的数据传输?

问题描述 投票:0回答:1

我有一个电报机器人和一个直接与 api twitter 交互的程序,它们位于同一服务器上且位于同一目录中。我需要当用户将数据传递给处理程序时,他们通过 aioprocessing 转到另一个程序,然后转到队列

我的程序.PY

logging.basicConfig(level=logging.INFO)





task_pool = []
account_queue = aioprocessing.AioQueue()

class Account_Data:
        def __init__(self, auth_token, proxy_ip, proxy_port, proxy_login, proxy_pass, bearer_token, x_csrf_token, user_agent, nickname, conversations, gif_number, text_message, messages_in_second, work_durations, break_cycle):
            self.auth_token = auth_token
            self.proxy_ip = proxy_ip
            self.proxy_port = proxy_port
            self.proxy_login = proxy_login
            self.proxy_pass = proxy_pass
            self.bearer_token = bearer_token
            self.x_csrf_token = x_csrf_token
            self.user_agent = user_agent
            self.nickname = nickname
            self.conversations = conversations
            self.gif_number = gif_number
            self.text_message = text_message
            self.messages_in_second = messages_in_second
            self.work_durations = work_durations
            self.break_cycle = break_cycle

async def add_account_task(account):
    # Проверяем, существует ли уже задача для данного аккаунта
    if not any(task.get_name() == account.nickname for task in task_pool):
        logging.info(f"Добавляем задачу для аккаунта: {account.nickname}")
        task = asyncio.create_task(process_chat(account), name=account.nickname)
        task_pool.append(task)
    else:
        logging.info(f"Задача для аккаунта {account.nickname} уже существует")


async def process_queue():
    while True:
        # Проверяем очередь на наличие новых аккаунтов для добавления
        if not account_queue.empty():
            logging.info("Получение новых данных из очереди")
            account_data = await account_queue.coro_get()  # Получаем данные аккаунта из очереди
            await add_account_task(account_data)  # Добавляем задачу для аккаунта
        await asyncio.sleep(1)  # Небольшая задержка, чтобы избежать постоянной проверки



async def work_with_chats(accounts):

    accounts_data_list = []

    for nickname in accounts:
        account_data = await get_user_retweet_data(nickname)
        if account_data:
            account_obj = Account_Data(
                auth_token=account_data['auth_token'],
                proxy_ip=account_data['proxy_ip'],
                proxy_port=account_data['proxy_port'],
                proxy_login=account_data['proxy_login'],
                proxy_pass=account_data['proxy_pass'],
                bearer_token=account_data['bearer_token'],
                x_csrf_token=account_data['x_csrf_token'],
                user_agent=account_data['user_agent'],
                nickname=nickname,
                conversations = list(filter(lambda x: len(x.split("-")) == 1, account_data['conversations'])),
                gif_number=account_data['gif_number'],
                text_message=account_data['text_message'],
                messages_in_second=account_data['messages_in_second'],
                work_durations=account_data["work_duration"],
                break_cycle = False
            )
            accounts_data_list.append(account_obj)
        else:
            logging.error(f"Не удалось извлечь данные для аккаунта {nickname}")

        for account in accounts_data_list:
            await add_account_task(account)  # Добавляем задачу для каждого аккаунта    


async def work_with_gif(account, conversation_id, session):
                        ...................
async def ReTwit(account, conversation_id, session):
...........................................    

async def sending_message(account, conversation_id, session):
..............................................


async def process_chat(account):
    end_time = datetime.now() + timedelta(seconds=account.work_durations)
    while True:
        if account.break_cycle is True:
            logging.info(f"Удаляю задачу для {account.nickname}")
            task_pool[:] = [task for task in task_pool if task.get_name() != account.nickname]
            break
        for conversation_id in account.conversations:
            if datetime.now() >= end_time:
                # await message.answer(f"Процесс рассылок для аккаунта {account.nickname} успешно завершен по истечению времени")
                account.break_cycle = True  # Здесь должно быть присваивание, а не сравнение
                break

            proxies = f"socks5://{account.proxy_login}:{account.proxy_pass}@{account.proxy_ip}:{account.proxy_port}"
            async with httpx.AsyncClient(proxies=proxies) as session:
                await ReTwit(account, conversation_id, session)
                await work_with_gif(account, conversation_id, session)
                await sending_message(account, conversation_id, session)
                await asyncio.sleep(3600 / account.messages_in_second)
    
async def task_manager():
    while True:
        if task_pool:
            logging.info(f"Выполняем {len(task_pool)} задач.")
        else:
            logging.info("Пул задач пуст, ждем новые задачи.")
        await asyncio.sleep(5)



async def stop_account_task(nickname):
    """Функция для остановки задачи спамера по никнейму"""
    for task in task_pool:
        if task.get_name() == nickname:
            logging.info(f"Останавливаем задачу для {nickname}")
            task.cancel()  # Отменяем задачу
            try:
                if not task.done():
                    await task  # Ожидаем завершения задачи, если она ещё не завершена
            except asyncio.CancelledError:
                logging.info(f"Задача для аккаунта {nickname} была отменена")
            # Удаляем задачу из пула после её отмены
            task_pool.remove(task)
            logging.info(f"Задача для {nickname} удалена из пула задач")
            break
    else:
        logging.warning(f"Задача для аккаунта {nickname} не найдена")


async def main():
    await asyncio.gather(task_manager(), process_queue())  # Одновременное выполнение обработки задач и очереди


if __name__ == "__main__":
    asyncio.run(main())

................................................ ......................................................

MESSAGE_HANDLER.PY

async def start_program(self, callback_query: CallbackQuery, state: FSMContext):
    user_id = callback_query.from_user.id

    # Проверка наличия выбранных никнеймов для данного пользователя
    if user_id in self.user_selected_nicknames and self.user_selected_nicknames[user_id]:
        selected_nicknames = self.user_selected_nicknames[user_id]
        await callback_query.message.answer(f"Процесс спама для никнеймов: {', '.join(selected_nicknames)} запущен.")
        
        # Вызов функции work_with_chats с никнеймами текущего пользователя, добавление в очередь
        await self.task_queue.coro_put(list(selected_nicknames))
        
        # Очистка списка никнеймов после завершения
        self.user_selected_nicknames[user_id].clear()
        await state.clear()
    else:
        await callback_query.message.answer("Вы не выбрали ни одного никнейма.")
python queue microservices python-asyncio telegram-bot
1个回答
0
投票

查看 IPC 主题并选择适合您需求的解决方案。

https://docs.python.org/3/library/ipc.html

https://docs.python.org/3/library/signal.html

https://docs.python.org/3/library/mmap.html

我相信 MMAP 可能更适合您的情况。

© www.soinside.com 2019 - 2024. All rights reserved.