使用python进行多处理时,Cpu被锁定

2024-04-19 04:34:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我的代码:

def create_rods(folder="./", kappas=10, allowed_kappa_error=.3,
                radius_correction_ratio=0.1):
    """
    Create one rod for each rod_data and for each file
    returns [RodGroup1, RodGroup2, ...]
    """
    names, files = import_files(folder=folder)
    if len(files) == 0:
        print "No files to import."
        raise ValueError
    states = [None for dummy_ in range(len(files))]
    processes = []
    states_queue = mp.Queue()
    for index in range(len(files)):
        process = mp.Process(target=create_rods_process,
                        args=(kappas, allowed_kappa_error,
                        radius_correction_ratio, names,
                        files, index, states_queue))
        processes.append(process)
    run_processes(processes)        #This part seem to take a lot of time.
    try:
        while True:
            [index, state] = states_queue.get(False)
            states[index] = state
    except Queue.Empty:
        pass    
    return names, states

def create_rods_process(kappas, allowed_kappa_error,
                    radius_correction_ratio, names,
                    files, index, states_queue):
    """
    Process of method.
    """
    state = SystemState(kappas, allowed_kappa_error,
                radius_correction_ratio, names[index])
    data = import_data(files[index])
    for dataline in data:
        parameters = tuple(dataline)
        new_rod = Rod(parameters)
        state.put_rod(new_rod)
    state.check_rods()
    states_queue.put([index, state])

def run_processes(processes, time_out=None):
    """
        Runs all processes using all cores.
    """
    running = []
    cpus = mp.cpu_count()
    try:
        while True:
        #for cpu in range(cpus):
            next_process = processes.pop()
            running.append(next_process)
            next_process.start()
    except IndexError:
        pass
    if not time_out:
        try:
            while True:
                for process in running:
                    if not process.is_alive():
                        running.remove(process)
        except TypeError:
            pass
    else:
        for process in running:
            process.join(time_out)

我希望进程结束,但我得到了一个进程受阻。我不知道run_processes()方法或create_rods()方法是否有问题。使用join可以释放cpu,但程序无法继续运行。在


Tags: inforindexnamesqueuecreatefilesprocess
1条回答
网友
1楼 · 发布于 2024-04-19 04:34:31

来自Python的multiprocessing guidelines。在

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

在排出进程队列之前加入进程会导致死锁。在加入进程之前,您需要确保队列已清空。在

相关问题 更多 >