创建数据库连接并在多个进程中维护
这段内容和我之前发的一个帖子有点相似,主要是对那个帖子进行回应,同时也提出了一个新问题。
简单回顾一下:我需要更新一个空间数据库里的每一条记录,这个数据库里有一些点的数据集,这些点的数据集覆盖了一些多边形的数据集。对于每一个点,我想给它分配一个键,以便把它和它所在的多边形关联起来。比如说,如果我的点“纽约市”在多边形“美国”里面,而这个“美国”多边形的标识是‘GID = 1’,那么我就会给我的点“纽约市”分配‘gid_fkey = 1’。
好的,这个目标已经通过多进程实现了。我发现使用这种方法速度提高了150%,所以确实有效。但我觉得这样做有很多不必要的开销,因为每条记录都需要一个数据库连接。
下面是代码:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
根据我用'time'模块测量的结果,每个连接的时间大约在0.3到1.5秒之间。
有没有办法让每个进程只建立一个数据库连接,然后把city_id的信息作为变量传入查询中呢?这样我可以创建四个进程,每个进程都有一个数据库连接,然后把city_id传进去进行处理。
1 个回答
43
试着把连接的创建放在消费者的构造函数里,然后把它传给执行的任务:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
self.pyConn.set_isolation_level(0)
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task(connection=self.pyConn)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self, connection=None):
pyConn = connection
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'