处理异常的Python线程池

27 投票
5 回答
28229 浏览
提问于 2025-04-17 18:47

我一直在寻找一个简单的Python线程池的实现,但真的找不到适合我需求的东西。我使用的是Python 2.7,找到的模块要么不工作,要么在处理工作线程中的异常时表现得不太好。我想知道有没有人知道可以提供我所需功能的库。非常感谢大家的帮助。

多进程

我第一次尝试使用内置的multiprocessing模块,但这个模块使用的是子进程而不是线程,所以我们遇到了一个问题,就是对象不能被序列化。这样就不行了。

from multiprocessing import Pool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

未来

我发现有一个可以使用Python 3.2的一些很酷的并发特性的库,在这里。这看起来很完美,也很简单使用。问题是,当工作线程中出现异常时,你只能看到异常的类型,比如“ZeroDivisionError”,但没有详细的错误追踪信息,也就是说你不知道是哪个代码行导致了异常。这样代码就变得很难调试了。所以也不行。

from concurrent import futures

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib


#    futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
#    354     def __get_result(self):
#    355         if self._exception:
#--> 356             raise self._exception
#    357         else:
#    358             return self._result
#
# ZeroDivisionError: integer division or modulo by zero

工作池

我找到另一个实现这个模式的库,在这里。这次当出现异常时,它会被打印出来,但我的ipython交互式解释器会处于挂起状态,需要从另一个终端强制关闭。这样也不行。

import workerpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783

线程池

又一个实现,在这里。这次当出现异常时,它会被打印到stderr,但脚本不会中断,而是继续执行,这样就违背了异常的目的,可能会导致不安全的情况。还是不能用。

import threadpool

class Sample(object):
    def compute_fib(self, n):
        phi = (1 + 5**0.5) / 2
        1/0
        self.fib = int(round((phi**n - (1-phi)**n) / 5**0.5))

samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib

# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'

- 更新 -

看起来关于futures库,Python 3的行为和Python 2是不一样的。

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed

def div_zero(x):
    return x / 0

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = executor.map(div_zero, range(4))
    for future in as_completed(futures): print(future)

Python 2.7.6 输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 12, in <module>
    for future in as_completed(futures):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
    with _AcquireFutures(fs):
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
    self.futures = sorted(futures, key=id)
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
    yield future.result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
    return self.__get_result()
  File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
    raise self._exception
ZeroDivisionError: integer division or modulo by zero

Python 3.3.2 输出:

Traceback (most recent call last):
  File "...futures_exceptions.py", line 11, in <module>
    for future in as_completed(futures):
  File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
    with _AcquireFutures(fs):
  File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
    self.futures = sorted(futures, key=id)
  File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
    yield future.result()
  File "...python3.3/concurrent/futures/_base.py", line 392, in result
    return self.__get_result()
  File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
    raise self._exception
  File "...python3.3/concurrent/futures/thread.py", line 54, in run
    result = self.fn(*self.args, **self.kwargs)
  File "...futures_exceptions.py", line 7, in div_zero
    return x / 0
ZeroDivisionError: division by zero

5 个回答

1

简单的解决办法是:使用你觉得最合适的替代方案,然后在你的工作程序里自己实现一个 try-except 块。如果需要的话,可以把根调用包裹起来。

我不认为这些库处理异常的方式是“错误的”。它们有一个默认的行为,虽然比较简单。如果这个默认行为不符合你的需求,那你就得自己来处理。

3

如果你想了解在使用线程池(ThreadPoolExecutor)时未处理的异常信息,可以这样做:

import time
import traceback

from concurrent.futures import ThreadPoolExecutor


def worker():
    a = 2 / 0


def worker_callbacks(f):
    e = f.exception()

    if e is None:
        return

    trace = []
    tb = e.__traceback__
    while tb is not None:
        trace.append({
            "filename": tb.tb_frame.f_code.co_filename,
            "name": tb.tb_frame.f_code.co_name,
            "lineno": tb.tb_lineno
        })
        tb = tb.tb_next
    print(str({
        'type': type(e).__name__,
        'message': str(e),
        'trace': trace
    }))


executor = ThreadPoolExecutor(max_workers=1)
executor.submit(worker).add_done_callback(worker_callbacks)
5

我个人使用 concurrent.futures 这个工具,因为它的界面非常简单。关于追踪错误信息的问题,我找到了一种解决办法,可以保留这些信息。你可以看看我在另一个问题中的回答:

在 concurrent.futures 中获取异常的原始行号

撰写回答