分布式包的问题

问题描述 投票:0回答:1

我在使用分布式软件包时遇到了麻烦。我有一个返回 100 x 4 数组的函数。该数组是通过对使用分布式 for 构造的 SharedArray 的某些维度进行平均而生成的,代码看起来像

@everywhere function GenerateMatrix(H::sparsearray, indices::Array{Int}, a::Float64)
   Gavg = SharedArray{Float64}(100, length(indices))
   Z = Array{Float64}(undef, 100, 4)
   G = lu(a*sparse(I, size(H)) - H) #new array, same size as H
   @sync @distributed for i=1:length(indices)
          main_ind = indices[i]
          Gmain = G[:, main_ind]
           ...
          #Other for operations that fill the matrix Gavg
    end
    Z[:,1] = mean(G_avg, dims=2)
    #And other operations up to Z[:,4]
    return Z
 end

这个函数本身没有任何问题,当我尝试生成几个这样的 Z 数组并一遍又一遍地平均它们,重写相同的文件并将每个 1000 因子保存为检查点时,问题就出现了,如下

using DelimitedFiles
function IterateFiles(a::Float64, N)
        for j=1:N
             ...
             #Create H and the respective indices vector
             Z = GenerateMatrix(H, indices, a)
             if j==1
                 writedlm("Z_sample"*string(j), Z)
             else
                Zprev = readdlm("Z_sample"*string(j-1))
                Ztemp = (Zprev *(j-1)+ Z)/j  
                writedlm("Z_sample"*string(j), Ztemp)
                if (j-1)%1000 != 0 #this deletes the previous file unless it corresponds to a checkpoint
                   rm("Z_sample"*string(j-1)) 
                end
        end

此代码执行得很好并按预期生成文件,但是在函数

IterateFiles
多次迭代(随机数)之后,它停止生成文件。它不会产生错误消息。当我在终端中使用
ps auxr
检查工作人员时,一开始显示程序运行良好,并且正在使用选定数量的分布式处理器,但稍后
ps auxr
只显示发送的原始指令但分布式中没有其他工作人员,发生这种情况后不会生成新文件。

我虽然这是一个内存错误,但这似乎不是问题。没有错误消息,运行代码的主指令也没有停止,

julia -p 8 code.jl
仍然显示为传出工作,但不再产生结果。

我不知道会发生什么。我非常感谢您对此提供的任何见解或帮助。

parallel-processing julia distributed
1个回答
0
投票

您正在对文件进行 I/O,因此您可能无法捕获引发的错误,因为您引用的文件丢失、为空或无法访问。您可以向 readdlm 和 writedlm 添加锁和 try/catch。如果(正如我在这里假设的那样)分隔文件处理之外的代码是您想要多线程的缓慢步骤,您可以用锁包装文件处理:

using DelimitedFiles

rlock = ReentrantLock()

function IterateFiles(a::Float64, N)
    for j=1:N

        #Create H and the respective indices vector
        Z = GenerateMatrix(H, indices, a)
        try
            lock(rlock)
            if j == 1
                writedlm("Z_sample"*string(j), Z)
            else
                Zprev = readdlm("Z_sample"*string(j-1))
                Ztemp = (Zprev *(j-1)+ Z)/j  
                writedlm("Z_sample"*string(j), Ztemp)
            end
            if (j-1)%1000 != 0 #this deletes the previous file unless it corresponds to a checkpoint
                rm("Z_sample"*string(j-1)) 
            end
            unlock(rlock)
        catch y
            @warn y
            unlock(rlock)
            rethrow()
        end
    end
end
© www.soinside.com 2019 - 2024. All rights reserved.