(已解决(使用线程代替))asyncio ftp 从 1 个客户端同时并行推送到 n 个服务器

问题描述 投票:0回答:1

我尝试了“一切”,但没有运气......可能是我误用/误解了 asyncio。

[我的目标/用例]

将游戏从 1 台计算机(1Gb 网卡)以 10 个为一组推送到 16 个 Xbox(100Mb 网卡)(以利用 1:10 的带宽比)

[我的问题]

我的代码在每个文件/每个主机上同步运行:

  • 文件1推送到xbox1...然后xbox2...
  • 然后 File2 推送到 xbox1...然后 xbox2

[测试设置]

  • 1 个 FTP 客户端
  • 1 个 FTP 服务器,2 个 ≠ 用户

[我的代码]

我认为我面临的主要挑战是

ftp_push
函数循环自身并处理等待。

#! /usr/bin/env python3

import asyncio
import os
from ftplib import FTP
from tqdm import tqdm

game_dir = "~/360/Games/halo_3"
ftp_host = os.environ['ftp_host']
ftp_pass = os.environ['ftp_pass']
ftp_users = ["xbox1", "xbox2"]

def get_dir_size(path: str) -> int:
  """
  return path size in bytes
  """
  if not os.path.isdir(path):
    print(f"Error: {path} is not a directory")
    exit(1)
  size = 0
  with os.scandir(path) as x:
    for entry in x:
      if entry.is_file():
        size += entry.stat().st_size
      elif entry.is_dir():
        size += get_dir_size(entry.path)
  return size

async def ftp_login(host, user, passwd):
  ftp = FTP(host)
  ftp.login(user=user, passwd=passwd)
  print(ftp.getwelcome())
  return ftp

async def ftp_push(ftp_tqdm, path):
  ftp = ftp_tqdm[0]
  tqdm = ftp_tqdm[1]
  with os.scandir(path) as x:
    for entry in x:
      if entry.is_file():
        ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: tqdm.update(len(sent)))
      if entry.is_dir():
        ftp.mkd(os.path.basename(entry.path))
        ftp.cwd(os.path.basename(entry.path))
        await ftp_push(ftp_tqdm, entry.path)
        ftp.cwd('..')
  await asyncio.sleep(1)

async def main():
  dir_size = get_dir_size(game_dir)
  # create FTP instances (1 per xbox)
  ftps_tasks = await asyncio.wait([ftp_login(ftp_host, ftp_user, ftp_pass) for ftp_user in ftp_users])
  ftps = [i.result() for task in ftps_tasks for i in task]
  # create TQDM instances (1 per xbox)
  tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = ftp_suer, total = dir_size) for ftp_suer in ftp_users]
  # zip FTP & TQDM instances
  ftps_tqdms = list(zip(ftps, tqdms))
  # trigger ftp push
  await asyncio.wait([ftp_push(ftp_tqdm, game_dir) for ftp_tqdm in ftps_tqdms])


if __name__ == "__main__":
  try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
  except Exception as e:
    print (e)

如何让这段代码将数据推送到 // 中?

感谢您的宝贵帮助! 干杯!

python asynchronous async-await parallel-processing python-asyncio
1个回答
0
投票

[已解决]

嗨,

再次感谢@lord_haffi 花时间回答。我最终改变了使用 Python 线程的策略。

#! /usr/bin/env python3

import json
from live_xboxs import LiveXboxs
from ftp import Ftp
from multiprocessing.pool import ThreadPool
from toolbox import Toolbox
from tqdm import tqdm
import os

def starmap_ftp_push(ftp, path, xbox_tqdm, dir_tqdm):
  with os.scandir(path) as x:
    ls = ftp.nlst()
    for entry in x:
      if entry.name in ls:
        print(f"{xbox_tqdm.desc} => {entry.name} ({os.path.basename(path)}) exists in {ftp.pwd()}")
      else:
        if entry.is_file():
          ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: Toolbox.tqdm_update(xbox_tqdm, dir_tqdm, sent))
        if entry.is_dir():
          ftp.mkd(os.path.basename(entry.path))
          ftp.cwd(os.path.basename(entry.path))
          starmap_ftp_push(ftp, entry.path, xbox_tqdm, dir_tqdm)
          ftp.cwd('..')

def push_games(Ftps, live_xboxs):
  """
  Ftp push games in // to all alive xbox (chunks of max 10 xboxs)
  """
  print(f" push games to {GAMES_CWD}")
  threads = len(live_xboxs) if len(live_xboxs) <= 10 else 10
  with ThreadPool(threads) as pool:
    games_size = sum([Toolbox.get_dir_size(game) for game in GAMES])
    xbox_tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = xbox['name'], total = games_size, position = idx) for idx, xbox in enumerate(live_xboxs)]
    for game in GAMES:
      # Create tqdm instance (1 per game)
      game_size = Toolbox.get_dir_size(game)
      dir_tqdm = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = f"{xbox_tqdm.desc}/{game}", total = game_size, position = idx) for idx, xbox_tqdm in enumerate(xbox_tqdms, start=(len(xbox_tqdms)+1))]
      # Create a list of 1 game_dir the size of the ftps & tqdm lists
      game_path = [game for x in live_xboxs]
      ftps = [ftp.ftp for ftp in Ftps]
      args = list(zip(ftps, game_path, xbox_tqdms, dir_tqdm))
      [Ftp.ftp.cwd(GAMES_CWD) for Ftp in Ftps]
      pool.starmap(starmap_ftp_push, args)

# FTP login to each xbox
Ftps = [Ftp(xbox['ip'], xbox['name']) for xbox in live_xboxs]
# Push games
push_games(Ftps, live_xboxs)
© www.soinside.com 2019 - 2024. All rights reserved.