使用cx_Oracle和多进程并发查询数据

1 投票
2 回答
3035 浏览
提问于 2025-04-18 06:02

大家好,

我正在尝试从一个Oracle数据库中访问和处理大量数据。为此,我使用了多进程模块,启动了50个进程来访问数据库。为了避免打开50个实际的连接,我尝试使用cx_Oracle的会话池。下面是我的代码。不过,我总是遇到一个解包错误。我知道cx_Oracle有解包的问题,但我想通过使用全局变量来绕过这个问题。有没有人能帮帮我呢?

import sys
import cx_Oracle
import os
from multiprocessing import Pool

 # Read a list of ids from the input file
 def ReadList(inputFile):
        ............


def GetText(applId):
        global sPool
        connection = sPool.acquire()
        cur = connection.cursor()
        cur.prepare('Some Query')
        cur.execute(None, appl_id = applId)
        result = cur.fetchone()
        title = result[0]
        abstract = result[2].read()
        sa = result[3].read()
        cur.close()
        sPool.release(connection)
        return (title, abstract, sa)
if __name__=='__main__':
        inputFile = sys.argv[1]
        ids = ReadList(inputFile)
        dsn = cx_Oracle.makedsn('xxx', ...)
        sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1)
        pool = Pool(10)
        results = pool.map(GetText, ids)


Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 282, in _handle_results
task = get()
UnpicklingError: NEWOBJ class argument has NULL tp_new

2 个回答

0

首先,你的代码出现了“NameError: global name 'sPool' is not defined”这个错误,这意味着 sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1) 这行代码必须放在 def GetText(applId): 之前。

对我来说,这段代码在我把 from multiprocessing import Pool 改成 from multiprocessing.dummy import Pool 之后开始正常工作,并且在调用 cx_Oracle.SessionPool 时添加了参数 threaded=True,变成 sPool=cx_Oracle.SessionPool(....., min=1, max=10, increment=1, threaded=True)

1

你怎么能指望50个进程使用同一个由进程内部管理的数据库连接(连接池)呢?

撰写回答