这可能听起来像是一个奇怪的要求,但我有一个在超过 150k 个文件上运行规则的工作流程。它们中的大多数可以并行运行,因此我使用通配符。
现在我正处于工作流程中的某个时刻,我需要通过具有速率限制的 REST API 来运行所有文件。 为了避免打破这些限制,我想逐个查询所有文件的 API。 为此,我创建了一个检查点,它将所有通配符文件作为列表作为输入并查询 REST API:
这是现在的检查点代码:
def get_all_files(wildcards):
split_dir = checkpoints.split.get(**wildcards).output['output_dir']
split_files = glob_wildcards(
os.path.join(split_dir,'{filename}.json')
).filename
return expand('resources/split-files/{filename}.json', filename=split_files)
checkpoint query_api:
input:
input_files=get_all_files
output:
output_dir=directory("resources/enriched-files")
resources:
mem='24GB',
runtime='21d',
script:
"scripts/query_api.py"
这工作正常,但我预计该作业将运行超过 2 周,其中可能会发生许多不好的事情(集群不是很稳定)。
为了避免多次重新启动作业,我想知道是否可以为每个输入文件创建一个作业,但不要并行运行它们,而是一个接一个(顺序)运行它们,以避免命中率限制,但 b) 还能够在任何给定时间重新启动工作流程,而无需再次查询 API 来获取所有文件。
这是你可以用 Snakemake 做的事情吗?
您可以这样做,并希望避免在代码中使用检查点(它很强大,但也很棘手)。
您正在寻找的内容由 Snakemake 中的
resources
处理。您的 query_api 规则已经具有 mem
和 runtime
资源,但您可以添加第三个:
api_call=1
然后在命令行或您的个人资料中说
--resources api_call=1
(或者 --resources api_call=2
,如果您认为可以同时运行两个作业。)
参见: https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#snakefiles-resources
请注意,Snakemake 认为
api_call
是全局资源(除非您另外声明),但 mem
是本地资源。当作业在集群上运行时,这会有所不同,但对于本地执行,它们的处理方式相同。
如果您想进一步限制 API 调用,最好的办法就是在 API 查询后添加短暂的睡眠。