我的main方法(在导入时受到适当保护,不会运行)如下所示:
def main():
json_file, csv_out = get_user_input()
sessions = get_sessions(json_file)
if sessions:
pool = multiprocessing.Pool()
pool.map_async(process_one_item, sessions, 1)
pool.close()
pool.join()
write_csv(csv_out, json_file, sessions)
它处理每一个会话(超过1000个),这只是写入磁盘之前处理的时间戳列表,但当它完成最后一个会话时,它就挂起。它永远不会进入“write_csv”,即读取已写入磁盘的内容。在
我错过了什么?在
更新1:
^{pr2}$这个也挂了。在
如果给定的项目少于200个,程序可以运行并完成。但当我超过1000的时候,它就挂了。。。在
更新2:
我添加了一些打印语句,以便更好地知道它挂在哪里。在
def main():
json_file, csv_out = get_user_input()
sessions = get_sessions(json_file)
pool = multiprocessing.Pool()
for results in pool.imap_unordered(func=process_one_item,
iterable=sessions):
store_results(results)
print('...now back to main')
print('Terminate NOW.')
pool.terminate()
print('Join NOW.')
pool.join()
write_csv(csv_out, json_file, sessions)
“…现在回到主菜单”总是打印。在
“现在就终止。”从不打印。在
更新3:
这是当它挂起来的时候,我杀了它。在
Process ForkPoolWorker-1:
Process ForkPoolWorker-13:
Process ForkPoolWorker-23:
Process ForkPoolWorker-29:
Process ForkPoolWorker-27:
Process ForkPoolWorker-30:
Process ForkPoolWorker-31:
Process ForkPoolWorker-26:
Traceback (most recent call last):
File , line 759, in next
item = self._items.popleft()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "main.py", line 227, in <module>
main()
File "main.py", line 214, in main
for results in pool.imap_unordered(func=process_one_item, iterable=sessions):
File , line 763, in next
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 297, in _bootstrap
self.run()
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 351, in get
with self._rlock:
File , line 110, in worker
task = get()
File , line 110, in worker
task = get()
File , line 351, in get
with self._rlock:
File ", line 95, in __enter__
return self._semlock.__enter__()
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 351, in get
with self._rlock:
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File , line 351, in get
with self._rlock:
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 351, in get
with self._rlock:
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 352, in get
res = self._reader.recv_bytes()
File , line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File , line 407, in _recv_bytes
buf = self._recv(4)
File , line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 351, in get
with self._rlock:
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
self._cond.wait(timeout)
File , line 296, in wait
Traceback (most recent call last):
File , line 297, in _bootstrap
self.run()
File , line 99, in run
self._target(*self._args, **self._kwargs)
File , line 110, in worker
task = get()
File , line 351, in get
with self._rlock:
File , line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
waiter.acquire()
KeyboardInterrupt
Process finished with exit code 1
我删除了文件名以保护无辜者(路径中的用户名)。在
更新4:
我添加了错误处理并设置了超时。在
if __name__ == '__main__':
faulthandler.enable()
faulthandler.dump_traceback_later(timeout=60, repeat=True, exit=True)
start_time = time.perf_counter()
main()
print('\nDone!\n\nThis code ran in {:.1f} seconds'.format(time.perf_counter()-start_time))
当我这样做时,我得到了一个超时和堆栈跟踪:
Thread 0x00001094 (most recent call first): File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 306 in _recv_bytes File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250 in recv File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 489 in _handle_results File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x0000476c (most recent call first): File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 284 in _send_bytes File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 206 in send File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 450 in _handle_tasks File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x00002d80 (most recent call first): File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 432 in _handle_workers File "C:\ProgramData\Anaconda3\lib\threading.py", line 864 in run File "C:\ProgramData\Anaconda3\lib\threading.py", line 916 in _bootstrap_inner File "C:\ProgramData\Anaconda3\lib\threading.py", line 884 in _bootstrapThread 0x000038a8 (most recent call first): File "C:\ProgramData\Anaconda3\lib\threading.py", line 295 in wait File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 750 in next File "C:/Users/mtanner/IdeaProjects/cti_trending_analysis/main.py", line 215 in main File "C:/Users/mtanner/IdeaProjects/cti_trending_analysis/main.py", line 231 in <module>Process SpawnPoolWorker-36:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker put((job, i, result))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap self.run()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker put((job, i, (False, wrapped)))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedProcess SpawnPoolWorker-28:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker put((job, i, result))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap self.run()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker put((job, i, (False, wrapped)))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedProcess SpawnPoolWorker-26:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 125, in worker put((job, i, result))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closedDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 258, in _bootstrap self.run()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 93, in run self._target(*self._args, **self._kwargs)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\pool.py", line 130, in worker put((job, i, (False, wrapped)))
File "C:\ProgramData\Anaconda3\lib\multiprocessing\queues.py", line 344, in put self._writer.send_bytes(obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 200, in send_bytes self._send_bytes(m[offset:offset + size])
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 280, in _send_bytes ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed
这对任何人都有意义吗?在
更新5:
在我的函数process_one_item
函数中,我使用了sys.exit(-1)
调用,试图在发生代码中断时结束运行。(此程序最初不是多进程的)。我认为多重处理隐藏了这种效果。我删除了这个调用,程序现在运行到完成不管会话计数。在
如果有人能解释这一切,我很高兴把它算作解决办法!在
目前没有回答
相关问题 更多 >
编程相关推荐