如果其中一个线程结束firs,则结束python多线程处理

2024-06-16 11:45:09 发布

您现在位置:Python中文网/ 问答频道 /正文

因此,我使用以下代码同时在多个函数中运行任务:

        if __name__ == '__main__':
            po = Pool(processes = 10)
            resultslist = []
            i = 1
            while i <= 2:
                arg = [i]
                result = po.apply_async(getAllTimes, arg)
                resultslist.append(result)
                i += 1

            feedback = []
            for res in resultslist:
                multipresults = res.get()
                feedback.append(multipresults)

matchesBegin, matchesEnd = feedback[0][0], feedback[0][1]
TheTimes = feedback[1]

这对我很有效。我目前正在使用它同时运行两个作业

但问题是,在进入脚本的下一阶段之前,我并不总是需要同时完成所有两个正在运行的作业。有时,如果第一个作业成功完成,并且我能够通过验证matchesbegen,matchesEnd中的内容来确认它,我希望能够继续并终止另一个作业

我的问题是,我不知道怎么做

作业1通常比作业2完成的快得多。所以,我在这里尝试做的是,如果作业1在作业2之前完成,并且作业1中变量的内容(matchesbegen,matchesEnd)为真,那么,我希望作业2被吹走,因为我不再需要它了。如果我不把它吹走,只会延长剧本的完成时间。只有当来自作业1的变量的结果不为True时,才应允许作业2继续运行


Tags: 函数代码内容if作业argresresult
1条回答
网友
1楼 · 发布于 2024-06-16 11:45:09

我不知道您的用例的所有细节,但我希望这能为您提供一些指导。从本质上讲,您从apply_async()开始的工作可以完成这项工作,但是您还需要使用它的callback参数并评估传入的结果,以查看它是否满足您的条件,如果满足,则采取相应的操作。我已经破解了你的代码,得到了这个:

class ParallelCall:
    def __init__(self, jobs=None, check_done=lambda res: None):
        self.pool = Pool(processes=jobs)
        self.pending_results = []
        self.return_results = []
        self.check_done = check_done

    def _callback(self, incoming_result):
        self.return_results.append(incoming_result)
        if self.check_done(incoming_result):
            self.pool.terminate()
        return incoming_result

    def run_fce(self, fce, *args, **kwargs):
        self.pending_results.append(self.pool.apply_async(fce,
                                                          *args, **kwargs,
                                                          callback=self._callback))

    def collect(self):
        self.pool.close()
        self.pool.join()
        return self.return_results

你可以这样使用:

def final_result(result_to_check):
    return result_to_check[0] == result_to_check[1]

if __name__ == '__main__':
    runner = ParallelCall(jobs=2, check_done=final_result)
    for i in range(1,3):
        arg = [i]
        runner.run_fce(getAllTimes, arg)

    feedback = runner.collect()

    TheTimes = feedback[-1]  # last completed getAllTimes call

它有什么作用runnerParallelCall(注意:我只使用了两个worker,因为您似乎只运行了两个作业)的一个实例,它使用final_result()函数来评估结果是否是有效最终结果的合适候选。在这种情况下,第一项和第二项相等

我们用它来启动getAllTimes两次,就像上面的例子一样。它和您一样使用apply_async(),但是我们现在也注册了一个回调,当结果可用时,我们通过它传递结果。我们还将它传递给用check_done注册的函数,以查看是否得到了可接受的最终结果,如果是这样(返回值的计算结果为True),则停止所有工作进程

免责声明:这并不完全是您的示例所做的,因为返回列表不是按函数调用发生的顺序排列的,而是按结果可用的顺序排列的

然后我们将collect()可用结果转换成feedback。此方法关闭池以不接受任何进一步的任务(close()),然后等待辅助进程完成(wait())(如果传入的结果之一与注册的条件匹配,则可以停止它们)。然后我们返回所有的结果(或者直到匹配的结果或者直到所有的工作都完成)

我已经把它放到了ParallelCall类中,这样我就可以方便地跟踪挂起的和完成的结果,并知道我的池是什么。默认的check_done基本上是一个(可调用的)nop

相关问题 更多 >