Python线程池执行器终止所有线程

2024-04-20 12:00:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行一段python代码,其中多个线程通过threadpool executor运行。每个线程都应该执行一项任务(例如获取网页)。我希望能够终止所有线程,即使其中一个线程失败。例如:

with ThreadPoolExecutor(self._num_threads) as executor:
    jobs = []
    for path in paths:
        kw = {"path": path}
        jobs.append(executor.submit(start,**kw))
    for job in futures.as_completed(jobs):
        result = job.result()
        print(result)
def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        #Signal all threads to stop

有可能吗?除非所有线程都成功,否则线程返回的结果对我来说是无用的,因此,如果其中一个线程失败,我希望节省其余线程的一些执行时间,并立即终止它们。实际的代码显然在执行相对较长的任务,但有几个失败点


Tags: path代码in网页forasjobsjob
3条回答

我会这样做:

import concurrent.futures

def start(*args,**kwargs):
    #fetch the page
    if(success):
        return True
    else:
        return False

with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(start, {"path": path}) for path in paths]
    concurrent.futures.wait(results, timeout=10, return_when=concurrent.futures.FIRST_COMPLETED)
    for f in concurrent.futures.as_completed(results):
        f_success = f.result()
        if not f_success:
            executor.shutdown(wait=False, cancel_futures=True) # shutdown if one fails
        else:
            #do stuff here

如果任何结果不属实,一切都将立即关闭

如果您已经完成了线程的工作,并且希望研究进程,那么这里的代码和平看起来非常有希望和简单,几乎与线程的语法相同,但是使用了多处理模块

当超时标志过期时,进程终止,非常方便

import multiprocessing

def get_page(*args, **kwargs):
    # your web page downloading code goes here

def start_get_page(timeout, *args, **kwargs):
    p = multiprocessing.Process(target=get_page, args=args, kwargs=kwargs)
    p.start()
    p.join(timeout)
    if p.is_alive():
        # stop the downloading 'thread'
        p.terminate()
        # and then do any post-error processing here

if __name__ == "__main__":
    start_get_page(timeout, *args, **kwargs)

我已经为我的一个类似问题创建了一个答案,我认为这将对这个问题有效

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

NUM_REQUESTS = 100


def long_request(id):
    sleep(1)

    # Simulate bad response
    if id == 10:
        return {"data": {"valid": False}}
    else:
        return {"data": {"valid": True}}


def check_results(results):
    valid = True
    for result in results:
        valid = result["data"]["valid"]

    return valid


def main():
    futures = []
    responses = []
    num_requests = 0

    with ThreadPoolExecutor(max_workers=10) as executor:
        for request_index in range(NUM_REQUESTS):
            future = executor.submit(long_request, request_index)

            # Future list
            futures.append(future)

        for future in as_completed(futures):

            is_responses_valid = check_results(responses)

            # Cancel all future requests if one invalid
            if not is_responses_valid:
                executor.shutdown(wait=False)
            else:
                # Append valid responses
                num_requests += 1
                responses.append(future.result())

    return num_requests


if __name__ == "__main__":
    requests = main()
    print("Num Requests: ", requests)

相关问题 更多 >