我有服务器 IP:192.168.33.10 启动 schudeler
dask scheduler --host 0.0.0.0
这是该服务器中的主机,我有文件“/var/shared/job_skills.csv”,工作人员是
192.168.33.11,192.168.33.12 使用此 cmd 启动dask worker 192.168.33.10:8786 --local-directory /var/test --dashboard-address 8787 --host 0.0.0.0 --worker-port 39040 --nanny-port 39042
我想在master中启动脚本read_csv.py并将任务分发给worker,比如分块数据并进行聚合,每个worker返回结果并打印结果。
我想做这样的 主脚本“read_csv.py”中的这个脚本
import dask
import dask.dataframe as dd
from dask.distributed import Client
dask.config.set({"dataframe.convert-string": False})
client = Client("192.168.33.10:8786")
df = dd.read_csv("/var/shared/foo.csv")
df['job_skills'] = df['job_skills'].fillna('')
df = df["job_skills"].str.split(',').explode().str.strip()
grouped = df.value_counts().compute()
print(grouped)
工人们给我这样的:
2024-02-29 14:30:04,180 - distributed.worker - WARNING - Compute Failed
Key: ('str-strip-956897fad2adeffa85aa604734f0febb', 0)
Function: execute_task
args: ((subgraph_callable-50304a7f18bfe19fd3ff56b4a6d6db4f, 'str', 'strip', (), {}, 'explode-e2622e44a85e1396024ff799e4f97b6e', 'split', {'pat': ',', 'n': -1, 'expand': False}, 'getitem-f2b8974c7433cce59ce3453d7f35940e', 'job_skills', '', 'getitem-b9a23a03a236420b7c31ada8ec6055df', [(<function read_block_from_file at 0x7fd18829f920>, <OpenFile '/var/shared/foo.csv'>, 0, 1398, b'\n'), None, True, True]))
kwargs: {}
Exception: "FileNotFoundError(2, 'No such file or directory')"
我该如何解决这个问题?
解决此问题的一种方法,无需将文件复制给工作人员。
df = dd.read_csv("ssh://server:port//var/shared/foo.csv")
现在每个工作人员将直接从源中读取,并且仅读取将要处理的那些字节。
其他人可能的替代方案: