我有一个大约400万行的数据集,我需要循环。数据结构是重复的ID彼此依赖,但数据在ID之间是独立的。对于每个ID,[i + 1]行依赖于[i]。这是一个可重复的例子。我确实认识到这个例子在内部函数方面不实用,但它只是对我所拥有的结构的一个演示。
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
y = rep(0,15)
df = as.data.frame(cbind(ids,month,x,y))
for (i in 1:nrow(df)){
if(i>1 && df[i,1]==df[i-1,1]){
#Main functions go here
df[i,4] = df[i-1,4]^2+df[i,3]
}
else {
df[i,4] = 1
}
}
问题实际上是真实函数的1000个循环需要大约90秒,因此400万行需要数天。以这种方式运行是不可行的。但是ID是独立的,不需要一起运行。我的问题是:有没有办法并行运行这种类型的循环?一个非常不优雅的解决方案是将文件拆分为50个部分而不拆分ID,只需在50个子文件上运行相同的代码即可。我认为应该有一种方法来编码。
编辑:添加月份列以显示行相互依赖的原因。要解决以下两条评论:
1)实际上有6-7行函数可以运行。我可以使用ifelse()多个函数吗? 2)所需的输出将是完整的数据帧。实际上有更多的列,但我需要数据框中的每一行。
ids month x y
1 1 1 -1 1
2 1 2 1 2
3 1 3 10 14
4 1 4 2 198
5 1 5 3 39207
6 2 1 11 1
7 2 2 4 5
8 2 3 -4 21
9 2 4 -1 440
10 2 5 0 193600
11 3 1 8 1
12 3 2 4 5
13 3 3 4 29
14 3 4 3 844
15 3 5 -1 712335
EDIT2:我尝试从另一篇文章中应用foreach()包,但它似乎不起作用。这段代码将运行,但我认为问题是行在核心之间分配的方式。如果每一行按顺序发送到不同的核心,则相同的ID永远不会在同一个核心中。
library(foreach)
library(doParallel)
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
y = rep(0,15)
df = as.data.frame(cbind(ids,month,x,y))
#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)
finalMatrix <- foreach(i=1:nrow(df), .combine=cbind) %dopar% {
for (i in 1:nrow(df)){
if(i>1 && df[i,1]==df[i-1,1]){
#Main functions go here
df[i,4] = df[i-1,4]^2+df[i,3]
}
else {
df[i,4] = 1
}
}
}
#stop cluster
stopCluster(cl)
这是使用foreach
的解决方案。很难说它在你的现实生活中如何运作,至少它与testdata一起工作......
首先,我生成一些testdata:
# function to generate testdata
genDat <- function(id){
# observations per id, fixed or random
n <- 50
#n <- round(runif(1,5,1000))
return(
data.frame(id=id,month=rep(1:12,ceiling(n/12))[1:n],x=round(rnorm(n,2,5)),y=rep(0,n))
)
}
#generate testdata
testdat <- do.call(rbind,lapply(1:90000,genDat))
> head(testdat)
id month x y
1 1 1 7 0
2 1 2 6 0
3 1 3 -9 0
4 1 4 3 0
5 1 5 -9 0
6 1 6 8 0
> str(testdat)
'data.frame': 4500000 obs. of 4 variables:
$ id : int 1 1 1 1 1 1 1 1 1 1 ...
$ month: int 1 2 3 4 5 6 7 8 9 10 ...
$ x : num 7 6 -9 3 -9 8 -4 13 0 5 ...
$ y : num 0 0 0 0 0 0 0 0 0 0 ...
所以testdata有大约450万行,有9万个唯一ID。
现在,由于您的计算在ID之间是独立的,因此想法是将具有唯一ID的数据发送到每个核心...这最终也将摆脱if
或ifelse
条件的必要性。
为此,我首先生成一个带有开始和停止行索引的矩阵,以便将数据集拆分为唯一ID:
id_len <- rle(testdat$id)
ixmat <- cbind(c(1,head(cumsum(id_len$lengths)+1,-1)),cumsum(id_len$lengths))
然后可以将该矩阵传递给foreach
以并行运行特定部分。
在这个例子中,我略微修改你的计算,以避免导致Inf
的天文数值。
library(parallel)
library(doParallel)
library(iterators)
cl <- makeCluster(parallel::detectCores())
registerDoParallel(cl) #create a cluster
r <- foreach (i = iter(ixmat,by='row')) %dopar% {
x <- testdat$x[i[1,1]:i[1,2]]
y <- testdat$y[i[1,1]:i[1,2]]
y[1] <- 1
for(j in 2:length(y)){
#y[j] <- (y[j-1]^2) + x[j] ##gets INF
y[j] <- y[j-1] + x[j]
}
return(y)
}
parallel::stopCluster(cl)
最后,您可以替换原始数据框中的值:
testdat$y <- unlist(r)
至于时间,foreach循环在我的8核机器上运行大约40秒。
所以,只需用Rcpp重新编码你的循环:
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::export]]
NumericVector fill_y(const NumericVector& x) {
int n = x.length();
NumericVector y(n); y[0] = 1;
for (int i = 1; i < n; i++) {
y[i] = pow(y[i - 1], 2) + x[i];
}
return y;
}
并且,要将其应用于每个组,请使用dplyr:
df %>%
group_by(ids) %>%
mutate(y2 = fill_y(x))
我认为这应该足够快,以便您不需要并行性。实际上我是在@Val's testdat
上运行的,只花了2秒钟(用一台旧电脑)。
告诉我,如果没关系。否则,我将制作并行版本。
melt
的dcast
/ data.table
正如上面的注释中所讨论的,此解决方案非常特定于示例中的用例,但可能适用于您的用例。
使用dcast.data.table
包中的矩阵运算和melt.data.table
和data.table
函数,可以实现从长格式到宽格式的快速转换,并且非常有效。
考虑到所有因素,较大的约束条件可能会比使用这些方法的处理时间多得多。
library(data.table)
set.seed(123)
id1 = rep(1,5)
id2 = rep(2,5)
id3 = rep(3,5)
ids = c(id1,id2,id3)
month = rep(seq(1,5),3)
x = round(rnorm(15,2,5))
# y = rep(0,15) ## no need to pre-define y with this method
df = as.data.frame(cbind(ids,month,x))
setDT(df) ## Convert to data.table by reference
wide <- dcast.data.table(df, month ~ ids, value.var = "x") ## pivot to 'wide' format
mat <- data.matrix(wide[,-c("month")]) ## Convert to matrix
print(mat)
给
1 2 3
[1,] -1 11 8
[2,] 1 4 4
[3,] 10 -4 4
[4,] 2 -1 3
[5,] 3 0 -1
然后以矩阵形式对其进行操作:
mat[1,] <- 1 ## fill the first row with 1's as in your example
for (i in 2:nrow(mat)){
mat[i,] = mat[i-1L,]^2 + mat[i,]
}
print(mat)
给
1 2 3
[1,] 1 1 1
[2,] 2 5 5
[3,] 14 21 29
[4,] 198 440 844
[5,] 39207 193600 712335
接下来,熔化回长格式,然后加入关键列ids
和month
的原始数据:
yresult <- as.data.table(mat) ## convert back to data.table format
yresult[,month := wide[,month]] ## Add back the month column
ylong <- melt.data.table(yresult,
id.vars = "month",
variable.factor = FALSE,
variable.name = "ids",
value.name = "y") ## Pivot back to 'long' format
ylong[,ids := as.numeric(ids)] ## reclass ids to match input ids
setkey(ylong, ids, month) ## set keys for join on 'ids' and 'month'
setkey(df, ids,month)
merge(df,ylong) ## join data.table with the result
得出最终结果:
ids month x y
1: 1 1 -1 1
2: 1 2 1 2
3: 1 3 10 14
4: 1 4 2 198
5: 1 5 3 39207
6: 2 1 11 1
7: 2 2 4 5
8: 2 3 -4 21
9: 2 4 -1 440
10: 2 5 0 193600
11: 3 1 8 1
12: 3 2 4 5
13: 3 3 4 29
14: 3 4 3 844
15: 3 5 -1 712335
为了测试和说明缩放,下面的函数testData
通过交叉连接给定数量的id和给定的月数来生成数据集。然后,函数testFunc
执行递归的行式矩阵运算。
testData <- function(id_count, month_count) {
id_vector <- as.numeric(seq_len(id_count))
months_vector <- seq_len(month_count)
df <- CJ(ids = id_vector,month = months_vector)
df[,x := rnorm(.N,0,0.1)]
return(df)
}
testFunc <- function(df) {
wide <- dcast.data.table(df,month ~ ids, value.var = "x")
mat <- data.matrix(wide[,-c("month")])
mat[1,] <- 1
for (i in 2:nrow(mat)){
mat[i,] = mat[i-1L,]^2 + mat[i,]
}
yresult <- as.data.table(mat)
yresult[,month := wide[,month]]
ylong <- melt.data.table(yresult,
id.vars = "month",
variable.factor = FALSE,
variable.name = "ids",
value.name = "y")
ylong[,ids := as.numeric(ids)]
setkey(ylong, ids, month)
setkey(df, ids,month)
merge(df,ylong)
}
ids
and 45 months
:foo <- testData(90000,45)
system.time({
testFunc(foo)
})
user system elapsed
8.186 0.013 8.201
使用单个线程,运行时间不到10秒。
ids
and 1,000 months
:这三列输入data.table约为1.9GB
foo <- testData(1e5,1e3)
system.time({
testFunc(foo)
})
user system elapsed
52.790 4.046 57.031
单线程运行时间不到一分钟似乎非常易于管理,具体取决于需要运行多少次。与往常一样,这可以通过改进我的代码或使用C++
将递归部分转换为Rcpp
来进一步加速,但避免学习C++
的心理开销以及在工作流程中切换语言总是很好!