在Ruffus pipelin中运行函数之前需要生成一组文件

2024-04-20 07:33:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我用鲁弗斯写了一个管道。我有一个被并行调用多次的函数,它创建了几个文件。我想创建一个函数“combineFiles()”,它在所有这些文件生成之后被调用。因为它们在集群上并行运行,所以它们不会一起完成。我编写了一个函数“getFilenames()”,它返回需要创建的一组文件名,但是如何使combineFiles()等待它们出现呢?在

我尝试了以下方法:

@pipelineFunction
@files(getFilenames)
def combineFiles(filenames):
  # I should only be called if every file in the list 'filenames' exists

我也试过装修师:

^{pr2}$

但这也行不通。在生成getFilenames给定的文件之前,combinefile仍然会被错误地调用。我怎样才能使combineFiles以这些文件存在为条件?在

谢谢。在


Tags: 文件方法函数only管道文件名def集群
1条回答
网友
1楼 · 发布于 2024-04-20 07:33:12

我是Ruffus的开发者。我不确定我完全理解你想做什么,但这里是:

等待需要不同时间才能完成的作业,以便运行下一阶段的管道,这正是Ruffus所要做的,所以希望这是简单明了的。在

第一个问题是,您是否知道哪些文件正在预先创建,即在管道运行之前?我们先假设你有。在

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

让我们编写一个虚拟函数,每次调用它时都会创建一个文件。在Ruffus中,任何输入和输出文件名都分别包含在前两个参数中。我们没有输入文件名,因此函数调用应该如下所示:

^{pr2}$

create_file的定义如下所示:

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    open(output_file_name, "w").write("dummy file")

这些文件中的每一个都将在3个单独的调用中创建来创建\u文件。如果你愿意的话,这些可以并行运行。在

pipeline_run([create_file], multiprocess = 5)

现在合并文件。“@Merge”装饰器确实是为此而设置的。我们只需要把它和前面的函数联系起来:

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())

这只会在三次调用create_file()时调用merge_file。在

整个代码如下:

from ruffus import *
filenames = ["one.file", "two.file", "three.file"]

from random import randint
from time import sleep

@files([(None, fn) for fn in filenames])
def create_file(no_input_file_name, output_file_name):
    # simulate create file process of indeterminate complexity
    sleep(randint(1,5))
    open(output_file_name, "w").write("dummy file")

@merge(create_file, "merge.file")
def merge_file(input_file_names, output_file_name):
    output_file = open(output_file_name, "w")
    for i in input_file_names:
        output_file.write(open(i).read())


pipeline_run([merge_file], multiprocess = 5)

结果是:

>>> pipeline_run([merge_file], multiprocess = 5)

    Job = [None -> two.file] completed
    Job = [None -> three.file] completed
    Job = [None -> one.file] completed
Completed Task = create_file
    Job = [[one.file, three.file, two.file] -> merge.file] completed
Completed Task = merge_file

相关问题 更多 >