我想创建一个 Rscript,它使用
dopar
来计算使用多个内核的多个文件的行数,然后输出文件名的 TSV 以及行数(除以 4)。
我使用的是 Ubuntu 20.04。如果我通过打开 R 并复制/粘贴来迭代使用以下脚本,它就会起作用。如果我在命令行中使用
Rscript
使用脚本,则会出现以下错误:ssh: connect to host 0.0.0.20 port 22: Connection timed out
。我认为这是 PSOCK 集群注册的问题。如何在命令行中使用 Rscript
使该脚本正常工作?
(注意:迭代输入它,我会加载库,然后只需定义
raw.path
、min.reads
和 n.cores
,然后再毫无问题地复制和粘贴脚本的其余部分。)
#! /usr/bin/Rscript
######
## Counting and excluding low read fastqs
# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file
## Collect arguments
args <- commandArgs(TRUE)
## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)
if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
cat("
Counts the number of reads in the gzipped FASTQs in a folder, then moves
files below a certain number of reads to a folder called 'less-than-[min_reads]'
--files=string - Full path to folder with raw files
--min_reads=numeric - Minimum number of reads for a file to be included
--n_cores=numeric - Number of cores to use
Example:
Rscript count_and_move_raw_fastqs.R --files=raw/ \\ \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
q(save="no")
}
## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores
# Import libraries
library(foreach)
library(doParallel)
library(tidyverse)
# set wd to raw.path
setwd(raw.path)
# Set up parallelization with n cores
my.cluster <- makePSOCKcluster(nnodes = n.cores, names = n.cores, outfile="")
registerDoParallel(cl = my.cluster)
# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)
# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
# Get the base name (without R1) for each file
file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
# Count the reads
message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
# Move files if <= min.reads
if(reads <= min.reads){
# Find all files with the name
files.to.move <- list.files(pattern = file.name)
# Move them
purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
}
# Create final df of reads and names
output_txt <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
cat(paste(file.name, reads, "\n", sep = "\t"), file = output_txt, append = TRUE)
}
# Find the files to count
files.to.count <- list.files(getwd(), pattern = "_R1_001.fastq.gz$")
# Loop over each file in parallel
foreach(file=files.to.count, .combine = c) %dopar%{
process_file(file)
}
stopCluster(my.cluster)
由于您在类 Unix 上运行,请考虑仅使用 parallel 包的
mclapply()
函数:
#! /usr/bin/Rscript
######
## Counting and excluding low read fastqs
# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file
## Collect arguments
args <- commandArgs(TRUE)
## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)
if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
cat("
Counts the number of reads in the gzipped FASTQs in a folder, then moves
files below a certain number of reads to a folder called 'less-than-[min_reads]'
--files=string - Full path to folder with raw files
--min_reads=numeric - Minimum number of reads for a file to be included
--n_cores=numeric - Number of cores to use
Example:
Rscript count_and_move_raw_fastqs.R --files=raw/ \\ \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
q(save="no")
}
## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores
library(tidyverse)
# set wd to raw.path
setwd(raw.path)
# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)
# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
# Get the base name (without R1) for each file
file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
# Count the reads
message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
# Move files if <= min.reads
if(reads <= min.reads){
# Find all files with the name
files.to.move <- list.files(pattern = file.name)
# Move them
purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
}
# Create final list of reads and names
paste(file.name, reads, "\n", sep = "\t")
}
# Find the files to count
files.to.count <- list.files(getwd(), pattern = "_R1_001.fastq.gz$")
# Process each file in parallel and output a list of reads and names
output_txt = parallel::mclapply(files.to.count, process_file, mc.cores = n.cores)
# Serial write of the list of reads and names
outfile <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
lapply(output_txt, cat, file = outfile)
这将减少内存开销,因为分叉的并行 R 会话共享大部分内存空间。通过套接字连接的 R 会话集合则不然。
此外,代码不是使用
cat
进行并行写入,而是将文本行收集到列表中,该列表是用普通的串行“lapply”写入的。不建议并行写入一个文件,除非特别注意写入者不会互相冲突,从而导致乱码文本。
我没有运行你的代码,因为我没有输入文件。让我知道它对您有何作用。