我使用 mpirun 通过 MPI 并行化执行 R 脚本,使用 Snowfall 包。
我设置了并行化 '''
sfInit(并行= TRUE) ''' 那么我有 *** MPI_Init 发生错误 *** 在 NULL 通信器上 *** MPI_ERRORS_ARE_FATAL(此通信器中的进程现在将中止, *** 以及可能是您的 MPI 工作) [red075.cluster.local:190363] MPI_INIT 完成之前本地中止已成功完成,但无法聚合错误消息,并且无法保证所有其他进程都被终止!
当我使用 mpirun 启动脚本并在脚本中添加 '''sfInit(parallel = TRUE, type = "MPI")'' 时,出现错误 makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 sfInit(parallel = TRUE, type = "MPI") 中的错误: 雪簇启动失败! makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 执行停止
我尝试不使用 sfInit,但代码将以顺序模式而不是并行模式运行。 '''
sfSource("CopulaFunctions.R") ''' 那么我有: 调用降雪函数而不先调用“sfInit”或在 sfStop() 之后调用。 现在调用“sfInit()”。 降雪1.84-6.3初始化:顺序执行,一个CPU。
===== 编辑:添加 SLURM 作业脚本 在 R 代码中我使用
library(snowfall)
sfSetMaxCPUs( number = 40 )
sfInit(parallel = TRUE, cpus = 40, type = "MPI") ...etc
是的,我在 MPI 集群上提交了 Slurm 作业
#!/bin/bash
#SBATCH --nodes=1 # Number of nodes requested
#SBATCH --ntasks-per-node=40 # Tasks per node
#SBATCH --time=20:00:00
module load openmpi/4.1.4/intel
module load gsl/2.7
module load R
# Run R script in batch mode
mpirun -n 40 /local/software/R/4.2.1/build-gcc/lib64/R/library/snow/RMPISNOW CMD BATCH test_rmpi.R outputrmpi.out
========更新:这是修复错误并能够成功运行代码后的代码。
library(xts)
library(rugarch)
library(mvtnorm)
library(copula)
library(pbdMPI)
library(parallel)
cores <- 40
# here reading the data (reading or defining n, N, lag, q, V,ll)
# DAX 30
DAX.data <- read.csv("GDAXI.csv", header=TRUE) # make sure csv-file is in working directory!
DAX <- rev(DAX.data$Close) # DAX closing prices from 2000-01-01 to 2020-12-31
Datum.DAX <- rev(DAX.data$Date)[-1] # read in dates and ...
Datum.DAX <- as.Date(Datum.DAX, "%m/%d/%y") # ... convert dates.
lr.DAX <- diff( log(DAX) ) # Investigate losses
lr.DAX <- xts(x = lr.DAX, order.by = Datum.DAX) # create xts-object for easier manipulation
# GSPC
GSPC.data <- read.csv("GSPC.csv", header=TRUE) # make sure csv-file is in working directory!
GSPC <- rev(GSPC.data$Close) # GSPC closing prices from 2000-01-01 to 2020-12-31
Datum.GSPC <- rev(GSPC.data$Date)[-1] # read in dates and ...
Datum.GSPC <- as.Date(Datum.GSPC, "%m/%d/%y") # ... convert dates.
lr.GSPC <- diff( log(GSPC) ) # Investigate losses
lr.GSPC <- xts(x = lr.GSPC, order.by = Datum.GSPC) # create xts-object for easier manipulation
# merge both DAX and GSPC
V.lr <- merge.xts(lr.DAX, lr.GSPC, all=FALSE) # merged log-returns and ...
V.ll <- -V.lr # ... log-losses
Datum <- index(V.lr) # save dates for later use
N <- nrow(V.lr) # total length of time series (includes some observations after 2019)
n <- 1000 # length of rolling window
# ===========================================================
# 2. Rolling-window analysis of CoVaR and VaR forecasts
# ===========================================================
lag <- 1 # use estimated model to forecast (lag)-steps-ahead, then re-estimate (i.e., we use daily re-estimates)
p <- 0.95 # equals \alpha in paper
q <- 0.95
source("CopulaFunctions.R")
source("MLEstimation.R")
source("RMApplication.R")
wrapper <- function(i, n, lag, p, q, spec, V.ll){
(set.seed(12345 + i) ) # set rng stream for ith loop item
# 1. Estimate rolling-window parameters
R <- t(coredata(V.ll[(i-n+1) : (i+lag), ]))
Est <- GARCH.est(R, spec, lag, v=10)
if (is.logical(Est) && !Est) {
print("Estimation failed, skipping...")
return(NA)
} # if estimators did not converge
# 2. Produce rolling window CoVaR forecasts
R <- t(coredata( V.ll[(i+1) : (i+lag), ] ))
CoVaR.forecasts.t <- CoVaR.t.forecast(R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.t, lag, p, q)
CoVaR.forecasts.Gauss <- CoVaR.Gauss.forecast( R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.Gauss, lag, p, q)
return( list(CoVaR.forecasts.t = CoVaR.forecasts.t,
CoVaR.forecasts.Gauss = CoVaR.forecasts.Gauss) )
}
loop <- seq.int(from = n, to = N- 1, by = lag)
# GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "fGARCH", garchOrder = c(1, 1), submodel = "GARCH"),
mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
distribution.model = "norm") # simple GARCH(1,1) specification
Forecasts.GARCH <- mclapply(loop, wrapper,n =n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)
print(Forecasts.GARCH)
# GJR-GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "gjrGARCH", garchOrder = c(1, 1)),
mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
distribution.model = "norm") # GJR-GARCH(1,1) specification
Forecasts.GJR <- mclapply(loop,wrapper,n= n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)
print(Forecasts.GJR)
finalize()
我现在唯一的问题是如何在2个节点中运行它,我尝试请求2个节点,但不幸的是它计算了每次迭代两次。
任何建议如何处理。谢谢你
从您在另一个问题中的评论中,我发现您尝试运行的代码很容易修改,以便与 MPI(通过 pbdMPI)和 Unix fork(通过
parallel::mclapply()
)的组合运行。
雪包早于 MPI 和 Slurm。尽管它已经更新,但它是为管理者-工作者编程而设计的,这并不是 MPI 在当今 Linux 集群上的典型使用方式。
此外,每个节点运行 40 个 MPI 实例将为每个节点制作 40 个数据副本。更好的方法是在节点内使用
parallel::mclapply
并使用 MPI 跨节点进行扩展。
尝试以下方法
code.R
library(xts)
library(rugarch)
library(mvtnorm)
library(copula)
library(pbdMPI)
library(parallel)
cores <- 40
# here reading the data (reading or defining n, N, lag, q, V,ll)
source("CopulaFunctions.R")
source("MLEstimation.R")
source("RMApplication.R")
wrapper <- function(i, loop, n, lag, p, q, spec, V.ll){
comm.set.stream(i) # set rng stream for ith loop item
i <- loop[i] # select ith loop item
# 1. Estimate rolling-window parameters
R <- t(coredata(V.ll[(i-n+1) : (i+lag), ]))
Est <- GARCH.est(R, spec, lag, v=10)
if(is.na(Est)){return(NA)} # if estimators did not converge
# 2. Produce rolling window CoVaR forecasts
R <- t(coredata( V.ll[(i+1) : (i+lag), ] ))
CoVaR.forecasts.t <- CoVaR.t.forecast(R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.t, lag, p, q)
CoVaR.forecasts.Gauss <- CoVaR.Gauss.forecast( R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.Gauss, lag, p, q)
return( list(CoVaR.forecasts.t = CoVaR.forecasts.t,
CoVaR.forecasts.Gauss = CoVaR.forecasts.Gauss) )
}
loop <- seq.int(from = n, to = N-lag, by = lag) # loop over these
my_vec <- comm.chunk(length(loop), form = "vector", rng = TRUE, seed = 12345)
# GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "fGARCH", garchOrder = c(1, 1), submodel = "GARCH"),
mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
distribution.model = "norm") # simple GARCH(1,1) specification
Forecasts.GARCH <- mclapply(my_vec, wrapper, loop=loop, n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)
saveRDS(Forecasts.GARCH, file = paste0("GARCH", comm.rank(), ".rds"))
# GJR-GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "gjrGARCH", garchOrder = c(1, 1)),
mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
distribution.model = "norm") # GJR-GARCH(1,1) specification
Forecasts.GJR <- mclapply(my_vec, wrapper, loop=loop, n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)
saveRDS(Forecasts.GJR, file = paste0("GJR", comm.rank(), ".rds"))
finalize()
使用此 SLURM.sh 将其提交到 Slurm:
#!/bin/bash
#SBATCH --output=output.job9.log
#SBATCH --error=error.job9.log
#SBATCH --nodes=2
#SBATCH --exclusive
#SBATCH --time=22:30:00
module load openmpi/4.1.4/intel
module load gsl/2.7
module load R/4.2.1
export OMPI_MCA_mpi_warn_on_fork=0
time mpirun --map-by ppr:1:node Rscript code.R
这将运行
code.R
的两个副本,每个副本都在其自己的循环部分上工作。它假设您的节点具有 40 个或更多核心。使用 mclapply()
的一大优点是输入数据位于共享内存中并且不会被复制。同时,您可以使用 SLURM.sh 脚本扩展到任意数量的节点 - code.R 通过将 length(loop)
拆分为 my_vec
块进行调整。每个循环索引都有自己的 RNG,因此可以通过节点和核心之间的任何分割来重现。
请注意,我已删除您的打印语句,因为最好只返回结果。不建议从并行运行的 40 个核心进行打印。另请注意,每个节点将结果(包含
length(my_vec)
元素的列表)保存在唯一命名的 .rds
文件中。您的运行时间是通过 shell 脚本中的 time
完成的。
我没有运行这个,因为我没有你的数据。让我知道这对你来说是如何运行的。