我尝试了“一切”,但没有运气......可能是我误用/误解了 asyncio。
[我的目标/用例]
将游戏从 1 台计算机(1Gb 网卡)以 10 个为一组推送到 16 个 Xbox(100Mb 网卡)(以利用 1:10 的带宽比)
[我的问题]
我的代码在每个文件/每个主机上同步运行:
[测试设置]
[我的代码]
我认为我面临的主要挑战是
ftp_push
函数循环自身并处理等待。
#! /usr/bin/env python3
import asyncio
import os
from ftplib import FTP
from tqdm import tqdm
game_dir = "~/360/Games/halo_3"
ftp_host = os.environ['ftp_host']
ftp_pass = os.environ['ftp_pass']
ftp_users = ["xbox1", "xbox2"]
def get_dir_size(path: str) -> int:
"""
return path size in bytes
"""
if not os.path.isdir(path):
print(f"Error: {path} is not a directory")
exit(1)
size = 0
with os.scandir(path) as x:
for entry in x:
if entry.is_file():
size += entry.stat().st_size
elif entry.is_dir():
size += get_dir_size(entry.path)
return size
async def ftp_login(host, user, passwd):
ftp = FTP(host)
ftp.login(user=user, passwd=passwd)
print(ftp.getwelcome())
return ftp
async def ftp_push(ftp_tqdm, path):
ftp = ftp_tqdm[0]
tqdm = ftp_tqdm[1]
with os.scandir(path) as x:
for entry in x:
if entry.is_file():
ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: tqdm.update(len(sent)))
if entry.is_dir():
ftp.mkd(os.path.basename(entry.path))
ftp.cwd(os.path.basename(entry.path))
await ftp_push(ftp_tqdm, entry.path)
ftp.cwd('..')
await asyncio.sleep(1)
async def main():
dir_size = get_dir_size(game_dir)
# create FTP instances (1 per xbox)
ftps_tasks = await asyncio.wait([ftp_login(ftp_host, ftp_user, ftp_pass) for ftp_user in ftp_users])
ftps = [i.result() for task in ftps_tasks for i in task]
# create TQDM instances (1 per xbox)
tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = ftp_suer, total = dir_size) for ftp_suer in ftp_users]
# zip FTP & TQDM instances
ftps_tqdms = list(zip(ftps, tqdms))
# trigger ftp push
await asyncio.wait([ftp_push(ftp_tqdm, game_dir) for ftp_tqdm in ftps_tqdms])
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
except Exception as e:
print (e)
如何让这段代码将数据推送到 // 中?
感谢您的宝贵帮助! 干杯!
[已解决]
嗨,
再次感谢@lord_haffi 花时间回答。我最终改变了使用 Python 线程的策略。
#! /usr/bin/env python3
import json
from live_xboxs import LiveXboxs
from ftp import Ftp
from multiprocessing.pool import ThreadPool
from toolbox import Toolbox
from tqdm import tqdm
import os
def starmap_ftp_push(ftp, path, xbox_tqdm, dir_tqdm):
with os.scandir(path) as x:
ls = ftp.nlst()
for entry in x:
if entry.name in ls:
print(f"{xbox_tqdm.desc} => {entry.name} ({os.path.basename(path)}) exists in {ftp.pwd()}")
else:
if entry.is_file():
ftp.storbinary(f'STOR {os.path.basename(entry.path)}', open(entry.path, 'rb'), 2048, callback = lambda sent: Toolbox.tqdm_update(xbox_tqdm, dir_tqdm, sent))
if entry.is_dir():
ftp.mkd(os.path.basename(entry.path))
ftp.cwd(os.path.basename(entry.path))
starmap_ftp_push(ftp, entry.path, xbox_tqdm, dir_tqdm)
ftp.cwd('..')
def push_games(Ftps, live_xboxs):
"""
Ftp push games in // to all alive xbox (chunks of max 10 xboxs)
"""
print(f" push games to {GAMES_CWD}")
threads = len(live_xboxs) if len(live_xboxs) <= 10 else 10
with ThreadPool(threads) as pool:
games_size = sum([Toolbox.get_dir_size(game) for game in GAMES])
xbox_tqdms = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = xbox['name'], total = games_size, position = idx) for idx, xbox in enumerate(live_xboxs)]
for game in GAMES:
# Create tqdm instance (1 per game)
game_size = Toolbox.get_dir_size(game)
dir_tqdm = [tqdm(unit = 'B', unit_scale = True, leave = False, miniters = 1, desc = f"{xbox_tqdm.desc}/{game}", total = game_size, position = idx) for idx, xbox_tqdm in enumerate(xbox_tqdms, start=(len(xbox_tqdms)+1))]
# Create a list of 1 game_dir the size of the ftps & tqdm lists
game_path = [game for x in live_xboxs]
ftps = [ftp.ftp for ftp in Ftps]
args = list(zip(ftps, game_path, xbox_tqdms, dir_tqdm))
[Ftp.ftp.cwd(GAMES_CWD) for Ftp in Ftps]
pool.starmap(starmap_ftp_push, args)
# FTP login to each xbox
Ftps = [Ftp(xbox['ip'], xbox['name']) for xbox in live_xboxs]
# Push games
push_games(Ftps, live_xboxs)