Python:使用sqlite3进行多处理

2024-06-16 11:10:37 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个sqlite3db。我需要分析10000个文件。我从每个文件中读取一些数据,然后使用这些数据查询数据库以获得结果。我的代码在单进程环境中运行良好。但是我在尝试使用mulitprocessing池时遇到了一个错误。在

My approach without multiprocessing (works OK):
1. Open DB connection object
2. for f in files: 
     foo(f, x1=x1, x2=x2, ..., db=DB)
3. Close DB

My approach with multiprocessing (does NOT work):
1. Open DB
2. pool = multiprocessing.Pool(processes=4) 
3. pool.map(functools.partial(foo, x1=x1, x2=x2, ..., db=DB), [files])
4. pool.close()
5. Close DB 

我得到以下错误:sqlite3.ProgrammingError:Base Cursor.\uu init\uu未调用。

我的DB类实现如下:

^{pr2}$

函数foo()如下所示:

def foo(f, x1, x2, db):

  extract some data from file f
  r1 = db.get_driver_net(...)
  r2 = db.get_cell_id(...)

总体执行情况如下:

mapdb = MapDB(sqlite_file)

log.info('Create NetInfo objects')
pool = multiprocessing.Pool(processes=4)
files = [get list of files to process]                 
pool.map(functools.partial(foo, x1=x1, x2=x2, db=mapdb), files)    
pool.close()
mapdb.close()

为了解决这个问题,我想我需要在每个池工作线程内创建MapDB()对象(因此有4个并行/独立连接)。但我不知道该怎么做。有人能给我演示一下如何用Pool来完成这个任务吗?在


Tags: 文件数据closedbgetfoo错误files
1条回答
网友
1楼 · 发布于 2024-06-16 11:10:37

像这样定义foo怎么样:

def foo(f, x1, x2, db_path):
    mapdb = MapDB(db_path)
    ... open mapdb
    ... process data ...
    ... close mapdb

然后把你的池.map呼叫:

^{2}$

更新

另一个选择是自己处理工作线程并通过Queue分发工作。在

from Queue import Queue
from threading import Thread

q = Queue()

def worker():
  mapdb = ...open the sqlite database
  while True:
    item = q.get()
    if item[0] == "file":
      file = item[1]
      ... process file ...
      q.task_done()
    else:
      q.task_done()
      break
  ...close sqlite connection...

# Start up the workers

nworkers = 4

for i in range(nworkers):
  worker = Thread(target=worker)
  worker.daemon = True
  worker.start()

# Place work on the Queue

for x in ...list of files...:
  q.put(("file",x))

# Place termination tokens onto the Queue

for i in range(nworkers):
  q.put(("end",))

# Wait for all work to be done.

q.join()

终止令牌用于确保sqlite连接是关闭的-以防发生问题。在

相关问题 更多 >