我目前正在 Linux 上的 RStudio 中开发一个项目,其中我尝试并行化一个 for 循环,该循环迭代自定义函数的多个参数组合。为了实现这一目标,我使用
foreach
和 doParallel
包。但是,在执行这些循环期间,我遇到了内存管理的重大问题。
当我运行 foreach 循环时,内存使用量随着循环的进行而不断增加。看来所使用的内存没有被释放回系统,即使不再需要它也是如此。
即使我完全停止正在运行的进程(通过按控制台中的
stop
按钮),使用的内存也不会被释放。
我已经尝试过:
立即释放 Ram 的唯一方法是停止集群或重新启动 R 会话。
我还查看了任务管理器,似乎为每个并行进程打开了一个 R 会话(我在 11 个核心上并行化),并且通过使用 gc() 和 rm() 我只能影响主 R 会话(图片上用蓝色标记)
(https://i.sstatic.net/gwwFem7I.png)
是否有其他人面临类似的问题,或者可以提供有关如何管理 R 中并行
foreach
循环中的内存使用情况的见解?或者是否还有其他消耗 RAM 更少的方法来并行化我的代码?
这是我的代码:
library(foreach)
library(doParallel)
library(doSNOW)
cl <- makeCluster(11, outfile = "", type = "SOCK")
registerDoSNOW(cl)
total_tasks = length(15:35) * length(1:20)
pb <- txtProgressBar(min = 0, max = total_tasks, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list (progress = progress)
start_time <- Sys.time()
set.seed(123)
error_count <- 0
export_list <- c("train_RGLM_pois", "predict_RGLM", "gini", "dat_neu_Freq", "bootstrap_df", "subsample_df", "random_feature_select2", "stepAIC", "cramersv", "error_count")
x <-
foreach(i=15:30, .combine='cbind', .export = export_list, .options.snow = opts) %:%
foreach(j=1:15, .combine='c', .export = export_list) %dopar% {
if ((j+1) <= i){
a <- 1:80000
b <- 200001:280000
c <- 400001:480000
d <- 600001:680000
model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, b, c),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 1)
prediction <- predict_RGLM(model_list, dat_neu_Freq[d,])
gini1 <- gini(data_real = dat_neu_Freq$Anzahl_VK[d], data_pred = prediction)
model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, b, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 2)
prediction <- predict_RGLM(model_list, dat_neu_Freq[c,])
gini2 <- gini(data_real = dat_neu_Freq$Anzahl_VK[c], data_pred = prediction)
model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, c, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 3)
prediction <- predict_RGLM(model_list, dat_neu_Freq[b,])
gini3 <- gini(data_real = dat_neu_Freq$Anzahl_VK[b], data_pred = prediction)
model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(b, c, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 4)
prediction <- predict_RGLM(model_list, dat_neu_Freq[a,])
gini4 <- gini(data_real = dat_neu_Freq$Anzahl_VK[a], data_pred = prediction)
#result <- paste(as.character(MSE1 + MSE2 + MSE3 + MSE4), ",", as.character(gini1 + gini2 + gini3 + gini4), ",", as.character(dev1 + dev2 + dev3 + dev4))
result <- (gini1 + gini2 + gini3 + gini4)/4
} else {
result <- 0
}
result
}
end_time <- Sys.time()
end_time-start_time
close(pb)
stopCluster(cl)
x # Anz_selected_features nach rechts, Anz_included_features nach unten
print(paste("Es gab", error_count, "Fehler"))
以及被调用的函数:
library(MASS)
library(confintr)
train_RGLM_pois <- function(data, zielvariable_as_string, offsetvariable_as_string, count_glms, count_features_selected, max_features_included, CV_cycle = 0){
model_liste <- list()
alleGini <- c()
i <- 0
while (i < count_glms){
tryCatch({
ind <- sample(1:nrow(data), round(nrow(data)*0.7), replace = FALSE)
BS <- data[ind,] #Bootstrap Stichprobe
OOB <- data[-ind,] #Out Of Bag Stichprobe für Testen des Modells
model_leer <- glm(as.formula(paste("BS[[zielvariable_as_string]] ~ 1 + offset(log(", offsetvariable_as_string, "))", sep = "" )) , data = BS, family = poisson(link = "log"), control = glm.control(maxit = 100)) #poisson für Frequency. Für Severity wärs dann Gamma(link = "inverse")
model_voll <- glm(as.formula(paste(zielvariable_as_string, " ~ ", paste(names(BS[, -which(names(BS) %in% c(zielvariable_as_string, offsetvariable_as_string))]), collapse = " + "), " + offset(log(", offsetvariable_as_string, "))", sep = "")), data = BS, family = poisson(link = "log"), control = glm.control(maxit = 100))
n = nrow(BS)
print(paste("iteration", i + 1, "on CV Cycle", CV_cycle, "with", count_features_selected, "selec and", max_features_included, "included"))
capture.output({BIC_output <- stepAIC(model_leer, direction = "forward", k = log(n), scope = list(lower = model_leer, upper = model_voll))}, file = "/dev/null")
#Der String für die finale Formel des StepBIC
varStrings <- strsplit(as.character(BIC_output$call)[2], " \\+ ")[[1]]
#Finale Formel des StepBic evtl kürzen
finale_formel <- paste(paste(varStrings[1:min(max_features_included, length(varStrings))], collapse = " + "), " + offset(log(", offsetvariable_as_string, "))", sep = "")
model_final <- glm(as.formula(finale_formel), data = BS, family = poisson(link = "log"))
#modelpredictions auf der OOB Stichprobe
pred <- predict.glm(model_final, newdata = OOB, type = "response")
OOB$pred <- pred
gini <- gini(OOB[[zielvariable_as_string]], OOB$pred)
rm(BS, OOB)
capture.output({gc()}, file = "/dev/null")
alleGini[i + 1] <- gini
model_liste[[paste("model", i + 1, sep = "_")]] <- model_final
i = i + 1
}, error = function(e){
assign("error_count", error_count + 1, envir = .GlobalEnv)
print(paste("Error", e))
})
}
model_liste[[count_glms + 1]] <- alleGini
return(model_liste)
}
random_feature_select <- function(data, zielvariable, offset = "", size){
zielvariable_index <- which(names(data) == zielvariable)
offset_index <- which(names(data) == offset)
ind <- sample(setdiff(1:ncol(data), c(zielvariable_index, offset_index)), size, replace = FALSE)
return(data[,c(zielvariable_index, offset_index, ind)])
}
gini <- function(data_real, data_pred){
data <- cbind(data_real, data_pred)
sorted_data <- data[order(data[, 2]), ]
n <- nrow(sorted_data)
erg <- (2 * sum(seq_along(sorted_data[, 1]) * sorted_data[, 1])) / (n * sum(sorted_data[, 1])) - ((n + 1) / n)
return(erg)
}
提前非常感谢!
致以诚挚的问候 芬恩
也许你的进程被冻结了,你必须运行类似
pkill -9 [debugging process name]
的东西?