并行运行if循环

问题描述 投票:3回答:3

我有一个大约400万行的数据集,我需要循环。数据结构是重复的ID彼此依赖,但数据在ID之间是独立的。对于每个ID,[i + 1]行依赖于[i]。这是一个可重复的例子。我确实认识到这个例子在内部函数方面不实用,但它只是对我所拥有的结构的一个演示。

set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

for (i in 1:nrow(df)){
  if(i>1 && df[i,1]==df[i-1,1]){
    #Main functions go here
    df[i,4] = df[i-1,4]^2+df[i,3]
  }
  else {
    df[i,4] = 1
  }
}

问题实际上是真实函数的1000个循环需要大约90秒,因此400万行需要数天。以这种方式运行是不可行的。但是ID是独立的,不需要一起运行。我的问题是:有没有办法并行运行这种类型的循环?一个非常不优雅的解决方案是将文件拆分为50个部分而不拆分ID,只需在50个子文件上运行相同的代码即可。我认为应该有一种方法来编码。

编辑:添加月份列以显示行相互依赖的原因。要解决以下两条评论:

1)实际上有6-7行函数可以运行。我可以使用ifelse()多个函数吗? 2)所需的输出将是完整的数据帧。实际上有更多的列,但我需要数据框中的每一行。

   ids month  x      y
1    1     1 -1      1
2    1     2  1      2
3    1     3 10     14
4    1     4  2    198
5    1     5  3  39207
6    2     1 11      1
7    2     2  4      5
8    2     3 -4     21
9    2     4 -1    440
10   2     5  0 193600
11   3     1  8      1
12   3     2  4      5
13   3     3  4     29
14   3     4  3    844
15   3     5 -1 712335

EDIT2:我尝试从另一篇文章中应用foreach()包,但它似乎不起作用。这段代码将运行,但我认为问题是行在核心之间分配的方式。如果每一行按顺序发送到不同的核心,则相同的ID永远不会在同一个核心中。

library(foreach)
library(doParallel)


set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
y = rep(0,15)

df = as.data.frame(cbind(ids,month,x,y))

#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)

finalMatrix <- foreach(i=1:nrow(df), .combine=cbind) %dopar% {

  for (i in 1:nrow(df)){
    if(i>1 && df[i,1]==df[i-1,1]){
      #Main functions go here
      df[i,4] = df[i-1,4]^2+df[i,3]
    }
    else {
      df[i,4] = 1
    }
  }
}
#stop cluster
stopCluster(cl)
r for-loop parallel-processing
3个回答
1
投票

这是使用foreach的解决方案。很难说它在你的现实生活中如何运作,至少它与testdata一起工作......

首先,我生成一些testdata:

# function to generate testdata

genDat <- function(id){

  # observations per id, fixed or random
  n <- 50
  #n <- round(runif(1,5,1000))

  return(

    data.frame(id=id,month=rep(1:12,ceiling(n/12))[1:n],x=round(rnorm(n,2,5)),y=rep(0,n))

  )
}

#generate testdata

testdat <- do.call(rbind,lapply(1:90000,genDat))


> head(testdat)
  id month  x y
1  1     1  7 0
2  1     2  6 0
3  1     3 -9 0
4  1     4  3 0
5  1     5 -9 0
6  1     6  8 0


> str(testdat)
'data.frame':   4500000 obs. of  4 variables:
 $ id   : int  1 1 1 1 1 1 1 1 1 1 ...
 $ month: int  1 2 3 4 5 6 7 8 9 10 ...
 $ x    : num  7 6 -9 3 -9 8 -4 13 0 5 ...
 $ y    : num  0 0 0 0 0 0 0 0 0 0 ...

所以testdata有大约450万行,有9万个唯一ID。

现在,由于您的计算在ID之间是独立的,因此想法是将具有唯一ID的数据发送到每个核心...这最终也将摆脱ififelse条件的必要性。

为此,我首先生成一个带有开始和停止行索引的矩阵,以便将数据集拆分为唯一ID:

id_len <- rle(testdat$id)

ixmat <- cbind(c(1,head(cumsum(id_len$lengths)+1,-1)),cumsum(id_len$lengths))

然后可以将该矩阵传递给foreach以并行运行特定部分。

在这个例子中,我略微修改你的计算,以避免导致Inf的天文数值。

library(parallel)
library(doParallel)
library(iterators)

cl <- makeCluster(parallel::detectCores())
registerDoParallel(cl)   #create a cluster


r <-  foreach (i = iter(ixmat,by='row')) %dopar% {

  x <- testdat$x[i[1,1]:i[1,2]]
  y <- testdat$y[i[1,1]:i[1,2]]
  y[1] <- 1

  for(j in 2:length(y)){
    #y[j] <- (y[j-1]^2) + x[j] ##gets INF
    y[j] <- y[j-1] + x[j]
    }

  return(y)
}

parallel::stopCluster(cl)

最后,您可以替换原始数据框中的值:

testdat$y <- unlist(r)

至于时间,foreach循环在我的8核机器上运行大约40秒。


3
投票

所以,只需用Rcpp重新编码你的循环:

#include <Rcpp.h>
using namespace Rcpp;

// [[Rcpp::export]]
NumericVector fill_y(const NumericVector& x) {

  int n = x.length();
  NumericVector y(n); y[0] = 1;
  for (int i = 1; i < n; i++) {
    y[i] = pow(y[i - 1], 2) + x[i];
  }
  return y;
}

并且,要将其应用于每个组,请使用dplyr:

df %>%
  group_by(ids) %>%
  mutate(y2 = fill_y(x))

我认为这应该足够快,以便您不需要并行性。实际上我是在@Val's testdat上运行的,只花了2秒钟(用一台旧电脑)。

告诉我,如果没关系。否则,我将制作并行版本。


1
投票

基础R矩阵运算和来自meltdcast / data.table

正如上面的注释中所讨论的,此解决方案非常特定于示例中的用例,但可能适用于您的用例。

使用dcast.data.table包中的矩阵运算和melt.data.tabledata.table函数,可以实现从长格式到宽格式的快速转换,并且非常有效。

考虑到所有因素,较大的约束条件可能会比使用这些方法的处理时间多得多。

library(data.table)
set.seed(123)

id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)

month = rep(seq(1,5),3)

x = round(rnorm(15,2,5))
# y = rep(0,15) ## no need to pre-define y with this method

df = as.data.frame(cbind(ids,month,x))
setDT(df) ## Convert to data.table by reference

wide <- dcast.data.table(df, month ~ ids, value.var = "x") ## pivot to 'wide' format

mat <- data.matrix(wide[,-c("month")]) ## Convert to matrix
print(mat)

      1  2  3
[1,] -1 11  8
[2,]  1  4  4
[3,] 10 -4  4
[4,]  2 -1  3
[5,]  3  0 -1

然后以矩阵形式对其进行操作:

mat[1,] <- 1 ## fill the first row with 1's as in your example

for (i in 2:nrow(mat)){
  mat[i,] = mat[i-1L,]^2 + mat[i,]
}

print(mat)

         1      2      3
[1,]     1      1      1
[2,]     2      5      5
[3,]    14     21     29
[4,]   198    440    844
[5,] 39207 193600 712335

接下来,熔化回长格式,然后加入关键列idsmonth的原始数据:

yresult <- as.data.table(mat) ## convert back to data.table format
yresult[,month := wide[,month]] ## Add back the month column

ylong <- melt.data.table(yresult,
                         id.vars = "month",
                         variable.factor = FALSE,
                         variable.name = "ids",
                         value.name = "y") ## Pivot back to 'long' format

ylong[,ids := as.numeric(ids)] ## reclass ids to match input ids

setkey(ylong, ids, month) ## set keys for join on 'ids' and 'month'
setkey(df, ids,month)

merge(df,ylong) ## join data.table with the result

得出最终结果:

    ids month  x      y
 1:   1     1 -1      1
 2:   1     2  1      2
 3:   1     3 10     14
 4:   1     4  2    198
 5:   1     5  3  39207
 6:   2     1 11      1
 7:   2     2  4      5
 8:   2     3 -4     21
 9:   2     4 -1    440
10:   2     5  0 193600
11:   3     1  8      1
12:   3     2  4      5
13:   3     3  4     29
14:   3     4  3    844
15:   3     5 -1 712335

规模测试

为了测试和说明缩放,下面的函数testData通过交叉连接给定数量的id和给定的月数来生成数据集。然后,函数testFunc执行递归的行式矩阵运算。

testData <- function(id_count, month_count) {

  id_vector <- as.numeric(seq_len(id_count))
  months_vector <- seq_len(month_count)

  df <- CJ(ids = id_vector,month = months_vector)
  df[,x := rnorm(.N,0,0.1)]
  return(df)
}

testFunc <- function(df) {
  wide <- dcast.data.table(df,month ~ ids, value.var = "x")

  mat <- data.matrix(wide[,-c("month")])

  mat[1,] <- 1

  for (i in 2:nrow(mat)){
    mat[i,] = mat[i-1L,]^2 + mat[i,]
  }

  yresult <- as.data.table(mat)
  yresult[,month := wide[,month]]

  ylong <- melt.data.table(yresult,
                           id.vars = "month",
                           variable.factor = FALSE,
                           variable.name = "ids",
                           value.name = "y")

  ylong[,ids := as.numeric(ids)]

  setkey(ylong, ids, month)
  setkey(df, ids,month)

  merge(df,ylong)
}

With 90,000 ids and 45 months:

foo  <- testData(90000,45)

system.time({
  testFunc(foo)
})

   user  system elapsed 
  8.186   0.013   8.201 

使用单个线程,运行时间不到10秒。

With 100,000 ids and 1,000 months:

这三列输入data.table约为1.9GB

foo  <- testData(1e5,1e3)

system.time({
  testFunc(foo)
})

   user  system elapsed 
 52.790   4.046  57.031 

单线程运行时间不到一分钟似乎非常易于管理,具体取决于需要运行多少次。与往常一样,这可以通过改进我的代码或使用C++将递归部分转换为Rcpp来进一步加速,但避免学习C++的心理开销以及在工作流程中切换语言总是很好!

© www.soinside.com 2019 - 2024. All rights reserved.