python MySQLDB 查询超时

10 投票
7 回答
14368 浏览
提问于 2025-04-15 14:44

我正在尝试在Python的MySQLDB中给查询设置一个时间限制。我遇到的情况是,我无法控制这些查询,但我需要确保它们不会超过设定的时间限制。我试过使用signal.SIGALRM来中断执行的调用,但这似乎不起作用。信号确实发出了,但直到执行完成后才被捕获。

我写了一个测试案例来证明这种行为:

#!/usr/local/bin/python2.6

import time
import signal

from somewhere import get_dbc

class Timeout(Exception):
    """ Time Exceded """

def _alarm_handler(*args):
    raise Timeout

dbc = get_dbc()

signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(1)

try:
    print "START:  ", time.time()
    dbc.execute("SELECT SLEEP(10)")
except Timeout:
    print "TIMEOUT!", time.time()'

这里的“SELECT SLEEP(10)”是在模拟一个慢查询,但我发现实际的慢查询也有同样的表现。

结果是:

START:   1254440686.69
TIMEOUT! 1254440696.69

正如你所看到的,它睡了10秒钟,然后我才收到超时异常。

问题:

  1. 为什么我在执行完成后才收到信号?
  2. 有没有其他可靠的方法来限制查询的执行时间?

7 个回答

2

我尝试使用signal.SIGALRM来中断执行的调用,但这似乎不起作用。信号确实被发送了,但在执行调用完成之前并没有被捕获。

mysql库内部处理被中断的系统调用,所以你不会在API调用完成之前看到SIGALRM的副作用(除非你杀掉当前的线程或进程)。

你可以尝试修改MySQL-Python,并使用MYSQL_OPT_READ_TIMEOUT选项(这个选项是在mysql 5.0.25中添加的)。

2

使用 adbapi。这个工具可以让你异步地进行数据库操作。

from twisted.internet import reactor
from twisted.enterprise import adbapi

def bogusQuery():
    return dbpool.runQuery("SELECT SLEEP(10)")

def printResult(l):
    # function that would be called if it didn't time out
    for item in l:
        print item

def handle_timeout():
    # function that will be called when it timeout
    reactor.stop()

dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async")
bogusQuery().addCallback(printResult)
reactor.callLater(4, handle_timeout)
reactor.run()
8

@nosklo 提出的基于 twisted 的解决方案很优雅,也能正常工作,但如果你想避免依赖 twisted,这个任务还是可以完成的,比如:

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.start()
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise TimeoutError("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()

撰写回答