Python threading.Thread只能通过私有方法self.__Thread_stop()停止

6 投票
3 回答
15868 浏览
提问于 2025-04-17 03:49

我有一个函数,它接受一大堆 x,y 对作为输入,然后用 numpy 和 scipy 进行复杂的曲线拟合,最后返回一个值。为了加快速度,我想用两个线程来处理这些数据,使用 Queue.Queue 来传递数据。一旦数据处理完成,我想让这两个线程结束,然后结束调用的过程,把控制权返回给命令行。

我想搞明白为什么我必须使用 threading.Thread 中的一个私有方法来停止我的线程,并把控制权返回给命令行。

使用 self.join() 并不能结束程序。唯一能让程序恢复控制的方法就是使用这个私有的停止方法。

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

以下是我代码的大致样子:

    class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()

3 个回答

0

我试了g.d.d.c的方法,结果挺有意思的。我可以在多个线程之间顺利地进行他所说的x**y计算。

当我在工作线程的while True循环中调用我的函数时,只有在for循环里加上time.sleep(1),我才能在多个线程中进行计算,这个for循环是用来启动线程的。

所以在我的代码中,如果不加time.sleep(1),程序要么干脆退出没有任何输出,要么在某些情况下出现这样的错误:

“线程Thread-2中出现异常(很可能是在解释器关闭时引发的):线程Thread-1中出现异常(很可能是在解释器关闭时引发的)。”

一旦我加上了time.sleep(),一切就正常运行了。

for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"
4

为了更详细地说明我的观点——如果你创建线程的唯一目的是从队列中取值并对这些值进行处理,那么我认为你最好这样做:

q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.

q.join() 这个命令会一直等到队列里的东西都被处理完。工作线程会继续尝试从队列中取值,但如果队列空了,它们就会一直等着,不会再找到任何值。当你的脚本执行完毕后,这些工作线程会自动结束,因为它们被标记为守护线程。

5

一般来说,强行结束一个正在修改共享资源的线程是个坏主意。

在Python中,如果你不释放全局解释器锁(GIL),那么多个线程同时进行CPU密集型任务几乎是没什么用的。很多numpy函数确实会释放GIL。

文档中的ThreadPoolExecutor示例

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")

如果没有共享状态,你可以用ProcessPoolExecutor来替代ThreadPoolExecutor。把代码放在你的main()函数里。

撰写回答