Linux 上 R 中并行 forEach 循环中的 RAM 使用问题

问题描述 投票:0回答:1

我目前正在 Linux 上的 RStudio 中开发一个项目,其中我尝试并行化一个 for 循环,该循环迭代自定义函数的多个参数组合。为了实现这一目标,我使用

foreach
doParallel
包。但是,在执行这些循环期间,我遇到了内存管理的重大问题。

当我运行 foreach 循环时,内存使用量随着循环的进行而不断增加。看来所使用的内存没有被释放回系统,即使不再需要它也是如此。

即使我完全停止正在运行的进程(通过按控制台中的

stop
按钮),使用的内存也不会被释放。

我已经尝试过:

  • 在 foreach 循环中使用 rm(...) 和 gc() 来删除不再需要的变量,这不会改变任何东西
  • 停止进程后使用 gc(),这也不会释放已用的 RAM
  • 清除所有对象,包括隐藏的对象,RAM使用率几乎没有变化(除了我用它删除的明显对象的大小)

立即释放 Ram 的唯一方法是停止集群或重新启动 R 会话。

我还查看了任务管理器,似乎为每个并行进程打开了一个 R 会话(我在 11 个核心上并行化),并且通过使用 gc() 和 rm() 我只能影响主 R 会话(图片上用蓝色标记)

https://i.sstatic.net/gwwFem7I.png

是否有其他人面临类似的问题,或者可以提供有关如何管理 R 中并行

foreach
循环中的内存使用情况的见解?或者是否还有其他消耗 RAM 更少的方法来并行化我的代码?

这是我的代码:

library(foreach)
library(doParallel)
library(doSNOW)
cl <- makeCluster(11, outfile = "", type = "SOCK")
registerDoSNOW(cl)
total_tasks = length(15:35) * length(1:20)
pb <- txtProgressBar(min = 0, max = total_tasks, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list (progress = progress)
start_time <- Sys.time()
set.seed(123)
error_count <- 0
export_list <- c("train_RGLM_pois", "predict_RGLM", "gini", "dat_neu_Freq", "bootstrap_df", "subsample_df", "random_feature_select2", "stepAIC", "cramersv", "error_count")
x <-
  foreach(i=15:30, .combine='cbind', .export = export_list, .options.snow = opts) %:%
  foreach(j=1:15, .combine='c', .export = export_list) %dopar% {
    if ((j+1) <= i){
      a <- 1:80000
      b <- 200001:280000
      c <- 400001:480000
      d <- 600001:680000
      model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, b, c),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 1)
      prediction <- predict_RGLM(model_list, dat_neu_Freq[d,])
      gini1 <- gini(data_real = dat_neu_Freq$Anzahl_VK[d], data_pred = prediction)
      
      model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, b, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 2)
      prediction <- predict_RGLM(model_list, dat_neu_Freq[c,])
      gini2 <- gini(data_real = dat_neu_Freq$Anzahl_VK[c], data_pred = prediction)
      
      model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(a, c, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 3)
      prediction <- predict_RGLM(model_list, dat_neu_Freq[b,])
      gini3 <- gini(data_real = dat_neu_Freq$Anzahl_VK[b], data_pred = prediction)
      
      model_list <- train_RGLM_pois(random_feature_select(dat_neu_Freq[c(b, c, d),], "Anzahl_VK", "JE",i), "Anzahl_VK", "JE", 20, i, j, 4)
      prediction <- predict_RGLM(model_list, dat_neu_Freq[a,])
      gini4 <- gini(data_real = dat_neu_Freq$Anzahl_VK[a], data_pred = prediction)
      
      #result <- paste(as.character(MSE1 + MSE2 + MSE3 + MSE4), ",", as.character(gini1 + gini2 + gini3 + gini4), ",", as.character(dev1 + dev2 + dev3 + dev4))
      result <- (gini1 + gini2 + gini3 + gini4)/4
    } else {
      result <- 0
    }
    result
  }
end_time <- Sys.time()
end_time-start_time
close(pb)
stopCluster(cl)
x # Anz_selected_features nach rechts, Anz_included_features nach unten
print(paste("Es gab", error_count, "Fehler"))

以及被调用的函数:

library(MASS)
library(confintr)

train_RGLM_pois <- function(data, zielvariable_as_string, offsetvariable_as_string, count_glms, count_features_selected, max_features_included, CV_cycle = 0){
  model_liste <- list()
  alleGini <- c()
  i <- 0
  while (i < count_glms){
    tryCatch({
      ind <- sample(1:nrow(data), round(nrow(data)*0.7), replace = FALSE)
      BS <- data[ind,]                                                              #Bootstrap Stichprobe
      OOB <- data[-ind,]                                                            #Out Of Bag Stichprobe für Testen des Modells
      model_leer <- glm(as.formula(paste("BS[[zielvariable_as_string]] ~ 1 + offset(log(", offsetvariable_as_string, "))", sep = "" )) , data = BS, family = poisson(link = "log"), control = glm.control(maxit = 100))    #poisson für Frequency. Für Severity wärs dann Gamma(link = "inverse")
      model_voll <- glm(as.formula(paste(zielvariable_as_string, " ~ ", paste(names(BS[, -which(names(BS) %in% c(zielvariable_as_string, offsetvariable_as_string))]), collapse = " + "), " + offset(log(", offsetvariable_as_string, "))", sep = "")), data = BS, family = poisson(link = "log"), control = glm.control(maxit = 100))
      n = nrow(BS)
      print(paste("iteration", i + 1, "on CV Cycle", CV_cycle, "with", count_features_selected, "selec and", max_features_included, "included"))
      capture.output({BIC_output <- stepAIC(model_leer, direction = "forward", k = log(n), scope = list(lower = model_leer, upper = model_voll))}, file = "/dev/null")
      #Der String für die finale Formel des StepBIC 
      varStrings <- strsplit(as.character(BIC_output$call)[2], " \\+ ")[[1]]
      #Finale Formel des StepBic evtl kürzen
      finale_formel <- paste(paste(varStrings[1:min(max_features_included, length(varStrings))], collapse = " + "), " + offset(log(", offsetvariable_as_string, "))", sep = "")
      model_final <- glm(as.formula(finale_formel), data = BS, family = poisson(link = "log"))
      #modelpredictions auf der OOB Stichprobe
      pred <- predict.glm(model_final, newdata = OOB, type = "response")
      OOB$pred <- pred
      gini <- gini(OOB[[zielvariable_as_string]], OOB$pred)
      rm(BS, OOB)
      capture.output({gc()}, file = "/dev/null")
      alleGini[i + 1] <- gini
      model_liste[[paste("model", i + 1, sep = "_")]] <- model_final
      i = i + 1
    }, error = function(e){
      assign("error_count", error_count + 1, envir = .GlobalEnv)
      print(paste("Error", e))
    })
  }
  model_liste[[count_glms + 1]] <- alleGini
  return(model_liste)
}


random_feature_select <- function(data, zielvariable, offset = "", size){
  zielvariable_index <- which(names(data) == zielvariable)
  offset_index <- which(names(data) == offset)
  ind <- sample(setdiff(1:ncol(data), c(zielvariable_index, offset_index)), size, replace = FALSE)
  return(data[,c(zielvariable_index, offset_index, ind)])
}


gini <- function(data_real, data_pred){
  data <- cbind(data_real, data_pred)
  sorted_data <- data[order(data[, 2]), ]
  n <- nrow(sorted_data)
  erg <- (2 * sum(seq_along(sorted_data[, 1]) * sorted_data[, 1])) / (n * sum(sorted_data[, 1])) - ((n + 1) / n)
  return(erg)
}

提前非常感谢!

致以诚挚的问候 芬恩

r foreach doparallel
1个回答
0
投票

也许你的进程被冻结了,你必须运行类似

pkill -9 [debugging process name]
的东西?

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.