处理守护线程中的数据库连接
我在处理一个守护进程的数据库连接时遇到了问题。首先,我用以下代码连接到我的Postgres数据库:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
之后,我选择数据库中所有状态为0的行:
try:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
cursor.execute( "SELECT * FROM table WHERE status = 0")
rows = cursor.fetchall( )
cursor.close( )
except Exception, e:
print "Error on sql query [%s]" % ( e,)
然后,如果选中了行,程序就会分叉成:
while 1:
try:
psycopg2.apilevel = '2.0'
psycopg2.threadsafety = 3
cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
except Exception, e:
print "Unable to connect to DB. Error [%s]" % ( e,)
exit( )
if rows:
daemonize( )
for i in rows:
try:
global q, l
q = Queue.Queue( max_threads)
for i in rows:
cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
t = threading.Thread( target=sender, args=(i, cursor))
t.setDaemon( True)
t.start( )
for i in rows:
q.put( i)
q.join( )
except Exception, e:
print "Se ha producido el siguente error [%s]" % ( e,)
exit( )
else:
print "No rows where selected\n"
time.sleep( 5)
我的守护进程函数看起来是这样的:
def daemonize( ):
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
os.chdir("/")
os.umask(0)
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError, e:
print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
sys.exit(1)
线程的目标是发送者函数:
def sender( row, db):
while 1:
item = q.get( )
if send_to( row['to'], row['text']):
db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
else:
print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
db.close( )
q.task_done( )
send_to
函数只是打开一个网址,并根据成功与否返回真或假
从昨天开始,我一直收到这些错误,找不到解决办法:
UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
self.run()
File "/usr/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "sender.py", line 30, in sender
db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
1 个回答
1
数据库的连接在使用 fork()
这个命令后是无法继续使用的。也就是说,在每个子进程中你需要重新打开一个新的数据库连接,也就是在你调用 daemonize()
之后,要再调用一次 psycopg2.connect
来建立连接。
我没有使用过 Postgres,但我知道这对于 MySQL 是绝对正确的。