多线程脚本在结束时挂起

0 投票
3 回答
1216 浏览
提问于 2025-04-18 17:44

你好:我正在尝试让这个脚本正常工作。有时候,根据用户的数量(下面的例子显示的是3,但这个数字可以很容易地增加),脚本不会退出。所有的任务都完成了,但脚本就是卡在那里不退出。我觉得我在worker中的while True退出方式可能有问题,但我不知道有什么替代的方法。有什么想法吗?

import datetime, logging, os.path, queue, random, threading, time

script = os.path.basename(__file__)
logging.basicConfig(level=logging.DEBUG, format="%(asctime)-4s %(thread)6s  %(message)s", datefmt="%m-%d %H:%M:%S",
    filename="%s_%s.log"%(script[:script.find(".")],datetime.datetime.today().strftime("%Y%m%d-%H%M%S")))

class User(object):

    def __init__(self, id, ndelay, mind, maxd):
        self.id = id
        self.numdelay = ndelay #number of delays
        self.mind = mind       #min delay
        self.maxd = maxd       #max delay
        self.currdelaynum = 0  #index for next delay

    def hasDelay(self):
        if self.currdelaynum >= 0 and self.currdelaynum < self.numdelay:
            return True

    def runNextDelay(self):
        delay = round(self.mind + random.random()*(self.maxd - self.mind))
        logging.info("%s beg (delay=%d)"%(self.id,delay))
        time.sleep(delay)
        logging.info("%s end"%self.id)            
        self.currdelaynum += 1


def worker(unext,udone):
    while True:
        if unext.qsize() > 0:
            m = unext.get()
            users_all[m].runNextDelay()
            if users_all[m].hasDelay():
                unext.put(m)
            else:
                udone.put(m)
        else:
            if udone.qsize() >= len(users_all):
                break


if __name__=='__main__':
    random.seed(10)

    #global users_all
    users_all = list()
    users_all.append(User("aa",2,3,9))
    users_all.append(User("bb",3,2,4))
    users_all.append(User("cc",1,4,5))

    users_next = queue.Queue()
    users_done = queue.Queue()
    for n in range(len(users_all)):
        users_next.put(n)

    threads = [threading.Thread(target=worker, args=(users_next,users_done)) for n in range(2)]
    for t in threads: t.start()
    for t in threads: t.join()

大多数多线程的Python示例都有一个提前知道的任务队列。我正在写一个脚本,用来测试在数据库上并行运行查询的响应时间。为了让上面的例子更完整,我把ODBC查询的部分替换成了sleep。如果有更好的实现建议,我也非常欢迎。

根据评论更新的版本

def worker(unext):
    while True:
        try:
            m = unext.get_nowait()
            users_all[m].runNextDelay()
            if users_all[m].hasDelay():
                unext.put(m)
        except queue.Empty:
            break

3 个回答

0

我之前在工作队列上也遇到过类似的问题。我的解决办法是(在上面提到过)用超时时间为0来调用get()函数:

def run(self):
    while not self._stopevent.isSet():
        try:
            self._execute_job_function()
        except queue.Empty:
            pass    #make sure the application doesn't crash when the jobqueue is empty

def _execute_job_function(self):
    job = self._job_list.get(False, 0)  #calling get function with time-out = 0 to prevent hanging

    print("Executing job: {0}".format(job))
    self._results_queue.put("{0} - Done".format(job))
    self._job_list.task_done()

希望这对你有帮助。

0

这是另一个多线程代码的版本。改动如下:

1) 线程有了合适的名字(比如“thread-1”),这些名字会被记录在日志里。

2) 队列中存放的是用户实例,而不是全局数组中的索引。

3) 如果线程在队列中收到一个 None,它们会自己停止。初始化代码会把一些用户放入输入队列,然后在每个线程的末尾添加一个 None,这样每个线程就会收到退出的信号。

4) 工作线程会记录它们的开始和结束时间;用户对象可以直接打印出来。

源代码

import datetime, logging, os.path, random, sys, threading, time
import Queue as queue

script = os.path.basename(__file__)
logging.basicConfig(
    level=logging.DEBUG, 
    format="%(asctime)-4s %(threadName)s  %(message)s", datefmt="%m-%d %H:%M:%S",
    stream=sys.stderr,
    # filename="%s_%s.log"%(script[:script.find(".")],datetime.datetime.today().strftime("%Y%m%d-%H%M%S")))
)

class User(object):

    def __init__(self, id, ndelay, mind, maxd):
        self.id = id
        self.numdelay = ndelay #number of delays
        self.mind = mind       #min delay
        self.maxd = maxd       #max delay
        self.currdelaynum = 0  #index for next delay

    def __repr__(self):
        return '<User: id={}>'.format(self.id)

    def hasDelay(self):
        return (
            self.currdelaynum >= 0 
            and self.currdelaynum < self.numdelay
        )

    def runNextDelay(self):
        delay = round(self.mind + random.random()*(self.maxd - self.mind))
        logging.info("%s beg (delay=%d)", self.id, delay)
        time.sleep(delay)
        logging.info("%s end", self.id)
        self.currdelaynum += 1


def worker(unext, udone):
    logging.info('start')
    for user in iter(unext.get, None):
        while True:
            user.runNextDelay()
            if not user.hasDelay():
                break
            logging.debug('%s: reloop', user)
        udone.put(user)
    logging.info('done')


if __name__=='__main__':
    random.seed(10)

    users_all = list()
    users_all.append(User("aa",2,3,9))
    users_all.append(User("bb",3,2,4))
    users_all.append(User("cc",1,4,5))

    users_next = queue.Queue()
    users_done = queue.Queue()
    for user in users_all:
        users_next.put(user)

    # flag each thread to exit at end
    num_threads = 2
    for _ in range(num_threads):
        users_next.put(None)

    threads = [
        threading.Thread(
            target=worker, 
            args=(users_next,users_done),
            name='thread-{}'.format(n),
        )
        for n in range(num_threads)
    ]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

输出结果

08-19 12:29:29 thread-0  start
08-19 12:29:29 thread-0  aa beg (delay=6)
08-19 12:29:29 thread-1  start
08-19 12:29:29 thread-1  bb beg (delay=3)
08-19 12:29:32 thread-1  bb end
08-19 12:29:32 thread-1  <User: id=bb>: reloop
08-19 12:29:32 thread-1  bb beg (delay=3)
08-19 12:29:35 thread-0  aa end
08-19 12:29:35 thread-0  <User: id=aa>: reloop
08-19 12:29:35 thread-0  aa beg (delay=4)
08-19 12:29:35 thread-1  bb end
08-19 12:29:35 thread-1  <User: id=bb>: reloop
08-19 12:29:35 thread-1  bb beg (delay=4)
08-19 12:29:39 thread-1  bb end
08-19 12:29:39 thread-0  aa end
08-19 12:29:39 thread-0  cc beg (delay=5)
08-19 12:29:39 thread-1  done
08-19 12:29:44 thread-0  cc end
08-19 12:29:44 thread-0  done
0

正如univerio所提到的,这里存在竞争条件。一般来说,当你在处理多个线程共享的对象时,要问自己一个问题:如果我的线程在这个时刻被打断,另一个线程开始运行,会发生什么?univerio提到的情况是,线程A调用qsize()时可能返回非零值,然后线程B运行并从同一个队列中取出一个项目。当线程A再次运行并执行get()时,假设队列里还有项目的想法就错了,这时get()可能会被阻塞。

这里有一些未经测试的代码,可以用来指导你最终的实现:

def worker(unext, udone):
    while True:
        try:
            m = unext.get_nowait()
            users_all[m].runNextDelay()
            if users_all[m].hasDelay():
                unext.put(m)
            else:
                udone.put(m)
        except queue.Queue.Empty:
            if udone.qsize() >= len(users_all):
                break

不过这仍然不是一个理想的实现,因为当unext队列为空,但其他线程还没有处理完时,所有线程都会在while循环中疯狂等待,直到最后一个线程完成。

可能更好的做法是让线程完成自己的工作后直接退出,而不是一直等待。主线程可以等到udone.qsize() >= len(users_all)这个条件成立时再继续。

撰写回答