Python脚本在多线程处理后挂起

2024-04-19 18:34:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我知道在Python中有一些与挂起线程相关的问题和答案,但我的情况略有不同,因为脚本在所有线程都完成后挂起。线程脚本如下所示,但显然前两个函数被大量简化

当我运行显示的脚本时,它工作正常。当我使用真正的函数时,脚本挂在最后一行之后。因此,所有的场景都会被处理(并打印一条消息来确认),logStudyData()然后整理所有结果并写入csv"Script Complete"已打印。然后就挂了

删除线程功能的脚本运行良好

我尝试过将主脚本封装在try...except中,但没有记录任何异常。如果我使用一个在最后一个print上有断点的调试器,然后向前推进,它就会挂起

我知道这里没什么可谈的,但除了包括整个1500行的剧本,我不知道还能做什么。欢迎任何建议

def runScenario(scenario):
    # Do a bunch of stuff
    with lock:
        # access global variables
        pass
    pass

def logStudyData():
    # Combine results from all scenarios into a df and write to csv
    pass

def worker():
    global q
    while True:
        next_scenario = q.get() 
        if next_scenario is None:
            break
        runScenario(next_scenario)
        print(next_scenario , " is complete")
        q.task_done() 

import threading
from queue import Queue 
global q, lock
q = Queue()  
threads = []

scenario_list = ['s1','s2','s3','s4','s5','s6','s7','s8','s9','s10','s11','s12']
num_worker_threads = 6  
    lock = threading.Lock()
    for i in range(num_worker_threads):
        print("Thread number ",i)
        this_thread = threading.Thread(target=worker)
        this_thread.start()
        threads.append(this_thread)
    for scenario_name in scenario_list:
        q.put(scenario_name)  
    q.join()
    print("q.join completed")
    logStudyData() 
    print("script complete")

Tags: 脚本lockdefpassthis线程globalthread
1条回答
网友
1楼 · 发布于 2024-04-19 18:34:25

正如^{}的文档所说:

Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

换言之,get不可能返回None,除非在主线程上调用q.put(None),否则您不会这样做

请注意,这些文档下面的示例是这样做的:

for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

第二种方法在技术上是必要的,但你通常不做它就能逃脱惩罚

但第一个是绝对必要的。你要么这样做,要么想出一些其他机制来告诉你的员工辞职。否则,您的主线程只会尝试退出,这意味着它会尝试加入每个工作线程,但这些工作线程在get上永远被阻止,这永远不会发生,因此您的程序永远挂起


构建线程池可能不是火箭科学(如果只是因为火箭科学家往往需要他们的计算具有确定性和硬实时性……),但它也不是微不足道的,而且有很多事情你可能会出错。您可能需要考虑使用Python标准库中已经构建的两个线程池之一^{}^{}。这将使您的整个计划减少到:

import concurrent.futures

def work(scenario):
    runScenario(scenario)
    print(scenario , " is complete")

scenario_list = ['s1','s2','s3','s4','s5','s6','s7','s8','s9','s10','s11','s12']
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as x:
    results = list(x.map(work, scenario_list))
print("q.join completed")
logStudyData() 
print("script complete")

显然,您仍然需要在runScenario内更改的任何可变变量周围设置一个锁—尽管如果您只是在那里使用可变变量,因为您不知道如何将值返回到主线程,那么使用Executor就很简单了:只需return来自work的值,然后您就可以这样使用它们:

for result in x.map(work, scenario_list):
    do_something(result)

相关问题 更多 >