Python线程池 - 创建子任务并等待它们

2024-03-28 06:00:22 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我有一个线程池执行器,最多有10个线程,我向它提交一个任务,这个任务本身会创建另一个任务,然后依次等待它完成,递归地直到达到11个深度。在

Python中的示例代码:

import concurrent.futures

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        concurrent.futures.wait([f])


f = e.submit(task, 0)
print f.result()

以上代码输出:

^{pr2}$

以及僵局。在

有没有办法在不创建额外的线程和执行器的情况下解决这个问题?在

换句话说,一种让工作线程在等待时处理其他任务的方法?在


Tags: 代码import示例taskdef线程concurrentmax
3条回答

不,如果要避免死锁,就不能等待任务中同一个执行器的未来。在

在本例中,您只能返回future,然后递归地处理结果:

import concurrent.futures
import time

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        f = e.submit(task, depth + 1)
        return f


f = e.submit(task, 0)
while isinstance(f.result(), concurrent.futures.Future):
    f = f.result()

print f.result()

但是,最好首先避免这种递归执行。在

使用协同程序,您的代码可以重写为:

import asyncio

@asyncio.coroutine
def task(depth):
    print('started depth %d' % (depth, ))
    if depth > 10:
        return depth
    else:
        # create new task
        t = asyncio.async(task(depth + 1))
        # wait for task to complete
        yield from t
        # get the result of the task
        return t.result()

loop = asyncio.get_event_loop()
result = loop.run_until_complete(task(1))
print(result)
loop.close()

但是,我很难理解为什么需要这些额外的代码。在示例代码中,您总是直接等待任务的结果,因此如果没有执行器,您的代码将不会有任何不同的运行。例如,下面将产生相同的结果

^{pr2}$

我认为文档中的这个例子更好地展示了异步协程如何能够并行化任务。这个例子创建了3个任务,每个任务都计算不同的阶乘。请注意,当每个任务产生另一个协程(在本例中是async.sleep)时,如何允许另一个任务继续执行。在

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

你在这里所经历的,就是你已经正确地称之为deadlock。启动下一个线程并等待它的第一个线程持有一个lock,在等待同一个{}被释放时,所有后续任务都将在该线程上死锁(在您的情况下永远不会)。我建议您在任务中启动自己的线程,而不是使用池,类似于:

import concurrent.futures
import threading


class TaskWrapper(threading.Thread):

    def __init__(self, depth, *args, **kwargs):
        self._depth = depth
        self._result = None
        super(TaskWrapper, self).__init__(*args, **kwargs)

    def run(self):
        self._result = task(self._depth)

    def get(self):
        self.join()
        return self._result

e = concurrent.futures.ThreadPoolExecutor(max_workers=10)


def task(depth):
    print 'started depth %d' % (depth, )
    if depth > 10:
        return depth
    else:
        t = TaskWrapper(depth + 1)
        t.start()
        return t.get()

f = e.submit(task, 0)
print f.result()

相关问题 更多 >