处理守护线程中的数据库连接

0 投票
1 回答
948 浏览
提问于 2025-04-15 15:02

我在处理一个守护进程的数据库连接时遇到了问题。首先,我用以下代码连接到我的Postgres数据库:

try:
  psycopg2.apilevel = '2.0'
  psycopg2.threadsafety = 3
  cnx = psycopg2.connect( "host='192.168.10.36' dbname='db' user='vas' password='vas'")
  except Exception, e:
  print "Unable to connect to DB. Error [%s]" % ( e,)
  exit( )

之后,我选择数据库中所有状态为0的行:

try:
  cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
  cursor.execute( "SELECT * FROM table WHERE status = 0")
  rows = cursor.fetchall( )
  cursor.close( )
except Exception, e:
  print "Error on sql query [%s]" % ( e,)

然后,如果选中了行,程序就会分叉成:

while 1:
  try:
    psycopg2.apilevel = '2.0'
    psycopg2.threadsafety = 3
    cnx = psycopg2.connect( "host='192.168.10.36' dbname='sms' user='vas' password='vas'")
  except Exception, e:
    print "Unable to connect to DB. Error [%s]" % ( e,)
    exit( )

  if rows:
    daemonize( )
    for i in rows:
      try:
        global q, l
        q = Queue.Queue( max_threads)
        for i in rows:
          cursor = cnx.cursor( cursor_factory = psycopg2.extras.DictCursor)
          t = threading.Thread( target=sender, args=(i, cursor))
          t.setDaemon( True)
          t.start( )

          for i in rows:
            q.put( i)
            q.join( )
      except Exception, e:
        print "Se ha producido el siguente error [%s]" % ( e,)
        exit( )
  else:
    print "No rows where selected\n"
    time.sleep( 5)

我的守护进程函数看起来是这样的:

def daemonize( ):
  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

  os.chdir("/")
  os.umask(0)

  try:
    pid = os.fork()
    if pid > 0:
      sys.exit(0)
  except OSError, e:
    print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror)
    sys.exit(1)

线程的目标是发送者函数:

def sender( row, db):
  while 1:
  item = q.get( )
  if send_to( row['to'], row['text']):
    db.execute( "UPDATE table SET status = 1 WHERE id = %d" % sms['id'])
  else:
    print "UPDATE table SET status = 2 WHERE id = %d" % sms['id']
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  db.close( )
  q.task_done( )

send_to函数只是打开一个网址,并根据成功与否返回真或假

从昨天开始,我一直收到这些错误,找不到解决办法:

UPDATE outbox SET status = 2 WHERE id = 36
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.6/threading.py", line 525, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "sender.py", line 30, in sender
    db.execute( "UPDATE table SET status = 2 WHERE id = %d" % sms['id'])
  File "/usr/lib/python2.6/dist-packages/psycopg2/extras.py", line 88, in execute
    return _cursor.execute(self, query, vars, async)
OperationalError: server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.

1 个回答

1

数据库的连接在使用 fork() 这个命令后是无法继续使用的。也就是说,在每个子进程中你需要重新打开一个新的数据库连接,也就是在你调用 daemonize() 之后,要再调用一次 psycopg2.connect 来建立连接。

我没有使用过 Postgres,但我知道这对于 MySQL 是绝对正确的。

撰写回答