Python脚本在使用SQLAlchemy和多进程时挂起

7 投票
4 回答
6533 浏览
提问于 2025-04-17 09:50

考虑以下这个Python脚本,它使用了SQLAlchemy和Python的多进程模块。这个例子是在Debian squeeze上运行的,Python版本是2.6.6-8+b1(默认),SQLAlchemy版本是0.6.3-3(默认)。这是一些实际代码的简化版本。

import multiprocessing
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ...
password = ...
dbname = ...
dbstring = "postgresql://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
db.dispose()

for i in range(10):
    make_foo(i)

m.create_all()

def do(kwargs):
    i, dbstring = kwargs['i'], kwargs['dbstring']

    db = create_engine(dbstring)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
    Session.commit()
    db.dispose()

pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i, 'dbstring':dbstring})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
r.get()
r.wait()
pool.close()
pool.join()

这个脚本在运行时会卡住,并显示以下错误信息。

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 259, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 4 arguments (2 given)', <class 'sqlalchemy.exc.ProgrammingError'>, ('(ProgrammingError) syntax error at or near "%"\nLINE 1: COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;\n        ^\n',))

当然,这里的语法错误是TRUNCATE foo%s;。我的问题是,为什么这个进程会卡住?我能否让它在出错时退出,而不需要对我的代码进行大改动?这种行为和我实际的代码非常相似。

注意,如果把这个语句换成print foobarbaz,就不会出现卡住的情况。此外,即使我们把

Session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
Session.commit()
db.dispose()

换成Session.execute("TRUNCATE foo%s;"),依然会卡住。

我使用前一种方式是因为它更接近我实际代码的做法。

另外,如果去掉multiprocessing,然后顺序遍历表,就不会卡住了,而是直接报错退出。

我对这个错误的形式也感到困惑,特别是TypeError: ('__init__() takes at least 4 arguments (2 given)'这一部分。这个错误是从哪里来的呢?看起来很可能是来自multiprocessing的代码。

PostgreSQL的日志没有提供帮助。我看到很多类似的行

2012-01-09 14:16:34.174 IST [7810] 4f0aa96a.1e82/1 12/583 0 ERROR:  syntax error at or near "%" at character 28
2012-01-09 14:16:34.175 IST [7810] 4f0aa96a.1e82/2 12/583 0 STATEMENT:  COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;

但没有其他看起来相关的信息。

更新1:感谢lbolla和他的深刻分析,我能够提交一个Python错误报告。请查看sbt在该报告中的分析,以及这里的内容。还可以查看Python的错误报告修复异常序列化。根据sbt的解释,我们可以用

import sqlalchemy.exc
e = sqlalchemy.exc.ProgrammingError("", {}, None)
type(e)(*e.args)

重现原始错误,结果是

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
TypeError: __init__() takes at least 4 arguments (2 given)

更新2:这个问题已经被Mike Bayer修复,至少对于SQLAlchemy来说,具体请见错误报告StatementError异常无法序列化。根据Mike的建议,我也向psycopg2报告了一个类似的错误,尽管我没有(也没有)实际的破坏示例。无论如何,他们显然已经修复了这个问题,虽然没有提供修复的细节。请查看psycopg异常无法序列化。为了保险起见,我还提交了一个Python错误ConfigParser异常无法序列化,对应于lbolla提到的SO问题。看起来他们希望对此进行测试。

总之,这个问题在可预见的未来可能会继续存在,因为大多数Python开发者似乎并不意识到这个问题,因此没有采取防范措施。令人惊讶的是,似乎使用多进程的人不够多,以至于这个问题没有被广泛关注,或者他们只是忍受了这个问题。我希望Python开发者能在Python 3中解决这个问题,因为这实在让人烦恼。

我接受了lbolla的回答,因为如果没有他对问题与异常处理关系的解释,我可能根本无法理解这个问题。我也想感谢sbt,他解释了Python无法序列化异常是问题所在。我非常感激他们两个人,请给他们的回答投票支持。谢谢。

更新3:我发布了一个后续问题:捕获无法序列化的异常并重新抛出

4 个回答

1

我不知道最初出现的错误是什么原因。不过,关于多进程处理时“坏”异常的问题,主要是因为序列化(pickling)工作的方式。我觉得sqlalchemy的异常类有点问题。

如果一个异常类的 __init__() 方法没有调用 BaseException.__init__()(无论是直接还是间接),那么 self.args 可能就不会被正确设置。BaseException.__reduce__()(这是序列化协议使用的)假设可以通过简单地执行以下代码来重新创建一个异常 e 的副本:

type(e)(*e.args)

举个例子:

>>> e = ValueError("bad value")
>>> e
ValueError('bad value',)
>>> type(e)(*e.args)
ValueError('bad value',)

如果这个条件不成立,那么序列化/反序列化就会失败。所以,

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

的实例可以被序列化,但结果却无法被反序列化:

>>> from cPickle import loads, dumps
>>> class BadExc(Exception):
...     def __init__(self, a):
...         '''Non-optional param in the constructor.'''
...         self.a = a
...
>>> loads(dumps(BadExc(1)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: ('__init__() takes exactly 2 arguments (1 given)', <class '__main__.BadExc'>, ())

class GoodExc1(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        Exception.__init__(self, a)
        self.a = a

class GoodExc2(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.args = (a,)
        self.a = a

的实例则可以成功地进行序列化和反序列化。

所以你应该去找sqlalchemy的开发者,让他们修复他们的异常类。与此同时,你可以使用 copy_reg.pickle() 来覆盖 BaseException.__reduce__(),解决这些麻烦的类。

2

这个错误信息 TypeError: ('__init__() takes at least 4 arguments (2 given) 并不是因为你执行的 SQL 有问题,而是和你使用 SqlAlchemy 的方式有关。

问题在于,你试图在 session 类上调用 execute,而不是在这个 session 的实例上调用。

你可以试试这个:

session = Session()
session.execute("COMMIT; BEGIN; TRUNCATE foo%s; COMMIT;")
session.commit()

根据 文档

sessionmaker() 函数应该在应用的全局范围内调用,返回的类应该在应用的其他部分可用,作为创建会话的唯一类。

所以 Session = sessionmaker() 会返回一个新的 session 类,而 session = Session() 会返回这个类的一个实例,你可以在这个实例上调用 execute

11

我觉得这个 TypeError 错误是因为 multiprocessingget 方法引起的。

我把你代码里的数据库部分去掉了。你可以看看这个:

import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

使用 r.wait 可以得到预期的结果,但用 r.get 就会出现 TypeError。正如在 Python 的文档中所描述的,使用 r.waitmap_async 之后。

编辑: 我得修改我之前的回答。我现在认为这个 TypeError 是由 SQLAlchemy 引起的。我已经修改了我的脚本来重现这个错误。

编辑 2: 看起来问题在于 multiprocessing.pool 如果有任何工作进程抛出一个需要参数的异常时,就会出现问题(更多信息可以看 这里)。

我已经修改了我的脚本来突出这个问题。

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

在你的情况下,由于你的代码抛出了一个 SQLAlchemy 的异常,我能想到的唯一解决办法就是在 do 函数中捕获所有异常,然后重新抛出一个普通的 Exception。大概是这样的:

import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

编辑 3: 所以,这似乎是一个 Python 的 bug,但在 SQLAlchemy 中使用合适的异常可以解决这个问题:因此,我也 向 SQLAlchemy 提出了这个问题

作为解决这个问题的临时办法,我认为在 编辑 2 的最后提到的解决方案是可行的(在回调中使用 try-except,并重新抛出异常)。

撰写回答