我有一个电报机器人和一个直接与 api twitter 交互的程序,它们位于同一服务器上且位于同一目录中。我需要当用户将数据传递给处理程序时,他们通过 aioprocessing 转到另一个程序,然后转到队列
我的程序.PY
logging.basicConfig(level=logging.INFO)
task_pool = []
account_queue = aioprocessing.AioQueue()
class Account_Data:
def __init__(self, auth_token, proxy_ip, proxy_port, proxy_login, proxy_pass, bearer_token, x_csrf_token, user_agent, nickname, conversations, gif_number, text_message, messages_in_second, work_durations, break_cycle):
self.auth_token = auth_token
self.proxy_ip = proxy_ip
self.proxy_port = proxy_port
self.proxy_login = proxy_login
self.proxy_pass = proxy_pass
self.bearer_token = bearer_token
self.x_csrf_token = x_csrf_token
self.user_agent = user_agent
self.nickname = nickname
self.conversations = conversations
self.gif_number = gif_number
self.text_message = text_message
self.messages_in_second = messages_in_second
self.work_durations = work_durations
self.break_cycle = break_cycle
async def add_account_task(account):
# Проверяем, существует ли уже задача для данного аккаунта
if not any(task.get_name() == account.nickname for task in task_pool):
logging.info(f"Добавляем задачу для аккаунта: {account.nickname}")
task = asyncio.create_task(process_chat(account), name=account.nickname)
task_pool.append(task)
else:
logging.info(f"Задача для аккаунта {account.nickname} уже существует")
async def process_queue():
while True:
# Проверяем очередь на наличие новых аккаунтов для добавления
if not account_queue.empty():
logging.info("Получение новых данных из очереди")
account_data = await account_queue.coro_get() # Получаем данные аккаунта из очереди
await add_account_task(account_data) # Добавляем задачу для аккаунта
await asyncio.sleep(1) # Небольшая задержка, чтобы избежать постоянной проверки
async def work_with_chats(accounts):
accounts_data_list = []
for nickname in accounts:
account_data = await get_user_retweet_data(nickname)
if account_data:
account_obj = Account_Data(
auth_token=account_data['auth_token'],
proxy_ip=account_data['proxy_ip'],
proxy_port=account_data['proxy_port'],
proxy_login=account_data['proxy_login'],
proxy_pass=account_data['proxy_pass'],
bearer_token=account_data['bearer_token'],
x_csrf_token=account_data['x_csrf_token'],
user_agent=account_data['user_agent'],
nickname=nickname,
conversations = list(filter(lambda x: len(x.split("-")) == 1, account_data['conversations'])),
gif_number=account_data['gif_number'],
text_message=account_data['text_message'],
messages_in_second=account_data['messages_in_second'],
work_durations=account_data["work_duration"],
break_cycle = False
)
accounts_data_list.append(account_obj)
else:
logging.error(f"Не удалось извлечь данные для аккаунта {nickname}")
for account in accounts_data_list:
await add_account_task(account) # Добавляем задачу для каждого аккаунта
async def work_with_gif(account, conversation_id, session):
...................
async def ReTwit(account, conversation_id, session):
...........................................
async def sending_message(account, conversation_id, session):
..............................................
async def process_chat(account):
end_time = datetime.now() + timedelta(seconds=account.work_durations)
while True:
if account.break_cycle is True:
logging.info(f"Удаляю задачу для {account.nickname}")
task_pool[:] = [task for task in task_pool if task.get_name() != account.nickname]
break
for conversation_id in account.conversations:
if datetime.now() >= end_time:
# await message.answer(f"Процесс рассылок для аккаунта {account.nickname} успешно завершен по истечению времени")
account.break_cycle = True # Здесь должно быть присваивание, а не сравнение
break
proxies = f"socks5://{account.proxy_login}:{account.proxy_pass}@{account.proxy_ip}:{account.proxy_port}"
async with httpx.AsyncClient(proxies=proxies) as session:
await ReTwit(account, conversation_id, session)
await work_with_gif(account, conversation_id, session)
await sending_message(account, conversation_id, session)
await asyncio.sleep(3600 / account.messages_in_second)
async def task_manager():
while True:
if task_pool:
logging.info(f"Выполняем {len(task_pool)} задач.")
else:
logging.info("Пул задач пуст, ждем новые задачи.")
await asyncio.sleep(5)
async def stop_account_task(nickname):
"""Функция для остановки задачи спамера по никнейму"""
for task in task_pool:
if task.get_name() == nickname:
logging.info(f"Останавливаем задачу для {nickname}")
task.cancel() # Отменяем задачу
try:
if not task.done():
await task # Ожидаем завершения задачи, если она ещё не завершена
except asyncio.CancelledError:
logging.info(f"Задача для аккаунта {nickname} была отменена")
# Удаляем задачу из пула после её отмены
task_pool.remove(task)
logging.info(f"Задача для {nickname} удалена из пула задач")
break
else:
logging.warning(f"Задача для аккаунта {nickname} не найдена")
async def main():
await asyncio.gather(task_manager(), process_queue()) # Одновременное выполнение обработки задач и очереди
if __name__ == "__main__":
asyncio.run(main())
................................................ ......................................................
MESSAGE_HANDLER.PY
async def start_program(self, callback_query: CallbackQuery, state: FSMContext):
user_id = callback_query.from_user.id
# Проверка наличия выбранных никнеймов для данного пользователя
if user_id in self.user_selected_nicknames and self.user_selected_nicknames[user_id]:
selected_nicknames = self.user_selected_nicknames[user_id]
await callback_query.message.answer(f"Процесс спама для никнеймов: {', '.join(selected_nicknames)} запущен.")
# Вызов функции work_with_chats с никнеймами текущего пользователя, добавление в очередь
await self.task_queue.coro_put(list(selected_nicknames))
# Очистка списка никнеймов после завершения
self.user_selected_nicknames[user_id].clear()
await state.clear()
else:
await callback_query.message.answer("Вы не выбрали ни одного никнейма.")
查看 IPC 主题并选择适合您需求的解决方案。
https://docs.python.org/3/library/ipc.html
https://docs.python.org/3/library/signal.html
https://docs.python.org/3/library/mmap.html
我相信 MMAP 可能更适合您的情况。