Python threading.Thread只能通过私有方法self.__Thread_stop()停止
我有一个函数,它接受一大堆 x,y 对作为输入,然后用 numpy 和 scipy 进行复杂的曲线拟合,最后返回一个值。为了加快速度,我想用两个线程来处理这些数据,使用 Queue.Queue 来传递数据。一旦数据处理完成,我想让这两个线程结束,然后结束调用的过程,把控制权返回给命令行。
我想搞明白为什么我必须使用 threading.Thread 中的一个私有方法来停止我的线程,并把控制权返回给命令行。
使用 self.join() 并不能结束程序。唯一能让程序恢复控制的方法就是使用这个私有的停止方法。
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
以下是我代码的大致样子:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
3 个回答
我试了g.d.d.c的方法,结果挺有意思的。我可以在多个线程之间顺利地进行他所说的x**y计算。
当我在工作线程的while True循环中调用我的函数时,只有在for循环里加上time.sleep(1),我才能在多个线程中进行计算,这个for循环是用来启动线程的。
所以在我的代码中,如果不加time.sleep(1),程序要么干脆退出没有任何输出,要么在某些情况下出现这样的错误:
“线程Thread-2中出现异常(很可能是在解释器关闭时引发的):线程Thread-1中出现异常(很可能是在解释器关闭时引发的)。”
一旦我加上了time.sleep(),一切就正常运行了。
for aworker in range(5):
t = Thread(target = worker)
t.daemon = True
t.start()
# This sleep was essential or results for my specific function were None
time.sleep(1)
print "Started"
为了更详细地说明我的观点——如果你创建线程的唯一目的是从队列中取值并对这些值进行处理,那么我认为你最好这样做:
q = Queue()
results = []
def worker():
while True:
x, y = q.get()
results.append(x ** y)
q.task_done()
for _ in range(workerCount):
t = Thread(target = worker)
t.daemon = True
t.start()
for tup in listOfXYs:
q.put(tup)
q.join()
# Some more code here with the results list.
q.join()
这个命令会一直等到队列里的东西都被处理完。工作线程会继续尝试从队列中取值,但如果队列空了,它们就会一直等着,不会再找到任何值。当你的脚本执行完毕后,这些工作线程会自动结束,因为它们被标记为守护线程。
一般来说,强行结束一个正在修改共享资源的线程是个坏主意。
在Python中,如果你不释放全局解释器锁(GIL),那么多个线程同时进行CPU密集型任务几乎是没什么用的。很多numpy
函数确实会释放GIL。
文档中的ThreadPoolExecutor示例
import concurrent.futures # on Python 2.x: pip install futures
calc_args = []
if options.analyze_all:
calc_args.extend(dict(well_id=i,...) for i in well_ids)
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
future_to_args = dict((executor.submit(big_calculation, args), args)
for args in calc_args)
while future_to_args:
for future in concurrent.futures.as_completed(dict(**future_to_args)):
args = future_to_args.pop(future)
if future.exception() is not None:
print('%r generated an exception: %s' % (args,
future.exception()))
if isinstance(future.exception(), ValueError):
#modify params and resubmit
args["window"] += 1
future_to_args[executor.submit(big_calculation, args)] = args
else:
print('f%r returned %r' % (args, future.result()))
print("ALL work SEEMs TO BE DONE")
如果没有共享状态,你可以用ProcessPoolExecutor
来替代ThreadPoolExecutor
。把代码放在你的main()
函数里。