多处理池在处理后挂起(close或join)

2024-03-29 13:44:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我的main方法(在导入时受到适当保护,不会运行)如下所示:

def main():
    json_file, csv_out = get_user_input()
    sessions = get_sessions(json_file)

    if sessions:
        pool = multiprocessing.Pool()
        pool.map_async(process_one_item, sessions, 1)
        pool.close()
        pool.join()

    write_csv(csv_out, json_file, sessions)

它处理每一个会话(超过1000个),这只是写入磁盘之前处理的时间戳列表,但当它完成最后一个会话时,它就挂起。它永远不会进入“write_csv”,即读取已写入磁盘的内容。在

我错过了什么?在

更新1:

^{pr2}$

这个也挂了。在

如果给定的项目少于200个,程序可以运行并完成。但当我超过1000的时候,它就挂了。。。在

更新2:

我添加了一些打印语句,以便更好地知道它挂在哪里。在

def main():
    json_file, csv_out = get_user_input()
    sessions = get_sessions(json_file)

    pool = multiprocessing.Pool()

    for results in pool.imap_unordered(func=process_one_item, 
                                       iterable=sessions):
        store_results(results)
        print('...now back to main')

    print('Terminate NOW.')
    pool.terminate()
    print('Join NOW.')
    pool.join()

write_csv(csv_out, json_file, sessions)

“…现在回到主菜单”总是打印。在

“现在就终止。”从不打印。在

更新3:

这是当它挂起来的时候,我杀了它。在

Process ForkPoolWorker-1:
Process ForkPoolWorker-13:
Process ForkPoolWorker-23:
Process ForkPoolWorker-29:
Process ForkPoolWorker-27:
Process ForkPoolWorker-30:
Process ForkPoolWorker-31:
Process ForkPoolWorker-26:
Traceback (most recent call last):
  File , line 759, in next
    item = self._items.popleft()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "main.py", line 227, in <module>
    main()
  File "main.py", line 214, in main
    for results in pool.imap_unordered(func=process_one_item, iterable=sessions):
  File , line 763, in next
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 297, in _bootstrap
    self.run()
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 351, in get
    with self._rlock:
  File , line 110, in worker
    task = get()
  File , line 110, in worker
    task = get()
  File , line 351, in get
    with self._rlock:
  File ", line 95, in __enter__
    return self._semlock.__enter__()
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 351, in get
    with self._rlock:
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
  File , line 351, in get
    with self._rlock:
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 351, in get
    with self._rlock:
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 352, in get
    res = self._reader.recv_bytes()
  File , line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File , line 407, in _recv_bytes
    buf = self._recv(4)
  File , line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 351, in get
    with self._rlock:
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
    self._cond.wait(timeout)
  File , line 296, in wait
Traceback (most recent call last):
  File , line 297, in _bootstrap
    self.run()
  File , line 99, in run
    self._target(*self._args, **self._kwargs)
  File , line 110, in worker
    task = get()
  File , line 351, in get
    with self._rlock:
  File , line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
    waiter.acquire()
KeyboardInterrupt
Process finished with exit code 1

我删除了文件名以保护无辜者(路径中的用户名)。在

更新4:

我添加了错误处理并设置了超时。在

if __name__ == '__main__':
    faulthandler.enable()
    faulthandler.dump_traceback_later(timeout=60, repeat=True, exit=True)
    start_time = time.perf_counter()
    main()
    print('\nDone!\n\nThis code ran in {:.1f} seconds'.format(time.perf_counter()-start_time))

当我这样做时,我得到了一个超时和堆栈跟踪:

Thread 0x00001094 (most recent call first):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 306 in _recv_bytes  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250 in recv  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 489 in _handle_results  File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run  File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner  File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x0000476c (most recent call first):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 284 in _send_bytes  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 206 in send  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 450 in _handle_tasks  File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run  File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner  File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x00002d80 (most recent call first):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 432 in _handle_workers  File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run  File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner  File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x000038a8 (most recent call first):  File "C:\ProgramData\Anaconda3\lib\threading.py", line 295 in wait  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 750 in next  File "C:/Users/mtanner/IdeaProjects/cti_trending_analysis/main.py", line 215 in main  File "C:/Users/mtanner/IdeaProjects/cti_trending_analysis/main.py", line 231 in <module>Process SpawnPoolWorker-36:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker    put((job, i, result))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap    self.run()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker    put((job, i, (False, wrapped)))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedProcess SpawnPoolWorker-28:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker    put((job, i, result))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap    self.run()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker    put((job, i, (False, wrapped)))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedProcess SpawnPoolWorker-26:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker    put((job, i, result))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last):  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap    self.run()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run    self._target(*self._args, **self._kwargs)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker    put((job, i, (False, wrapped)))
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put    self._writer.send_bytes(obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes    self._send_bytes(m[offset:offset + size])
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed

这对任何人都有意义吗?在

更新5:

在我的函数process_one_item函数中,我使用了sys.exit(-1)调用,试图在发生代码中断时结束运行。(此程序最初不是多进程的)。我认为多重处理隐藏了这种效果。我删除了这个调用,程序现在运行到完成不管会话计数。在

如果有人能解释这一切,我很高兴把它算作解决办法!在


Tags: runinpyselfsendmostgetbytes