Python/多进程:进程似乎没有启动
我有一个函数,它可以读取一个二进制文件,并把每个字节转换成对应的字符序列。比如,0x05会变成'AACC',0x2A会变成'AGGG'等等。这个读取文件并转换字节的函数现在是线性的,因为要转换的文件大小在25KB到2MB之间,所以这个过程可能会花费不少时间。
因此,我想尝试使用多进程来分担这个任务,希望能提高速度。不过,我就是搞不定。下面是那个线性函数,它虽然能工作,但速度慢;
def fileToRNAString(_file):
if (_file and os.path.isfile(_file)):
rnaSequences = []
blockCount = 0
blockSize = 2048
printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
with open(_file, "rb") as hFile:
buf = hFile.read(blockSize)
while buf:
decSequenceToRNA(blockCount, buf, rnaSequences)
blockCount = blockCount + 1
buf = hFile.read(blockSize)
else:
printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
return rnaSequences
注意:这个函数'decSequenceToRNA'会读取缓冲区,并把每个字节转换成所需的字符串。执行后,这个函数会返回一个元组,里面包含块的编号和字符串,比如(1, 'ACCGTAGATTA...'),最后我会得到一个包含这些元组的数组。
我尝试把这个函数改成使用Python的多进程;
def fileToRNAString(_file):
rnaSequences = []
if (_file and os.path.isfile(_file)):
blockCount = 0
blockSize = 2048
printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
workers = []
with open(_file, "rb") as hFile:
buf = hFile.read(blockSize)
while buf:
p = Process(target=decSequenceToRNA, args=(blockCount, buf, rnaSequences))
p.start()
workers.append(p)
blockCount = blockCount + 1
buf = hFile.read(blockSize)
for p in workers:
p.join()
else:
printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
return rnaSequences
但是,似乎没有任何进程启动,因为当这个函数运行时,返回的是一个空数组。在'decSequenceToRNA'中打印到控制台的任何信息都没有显示;
>>>fileToRNAString(testfile)
[!] Converting /root/src/amino56/M1H2.bin into RNA string (2048 bytes/block).
和这个问题不同,我是在Linux shiva 3.14-kali1-amd64 #1 SMP Debian 3.14.5-1kali1 (2014-06-07) x86_64 GNU/Linux上运行,并且使用PyCrust来测试Python版本:2.7.3。我使用的包如下:
import os
import re
import sys
import urllib2
import requests
import logging
import hashlib
import argparse
import tempfile
import shutil
import feedparser
from multiprocessing import Process
我希望能得到帮助,找出我的代码为什么不工作,或者我是否在其他地方遗漏了什么,以使进程能够正常工作。同时也欢迎对改进代码的建议。下面是'decSequenceToRNA'的参考代码:
def decSequenceToRNA(_idxSeq, _byteSequence, _rnaSequences):
rnaSequence = ''
printAndLog("!", "Processing block %d (%d bytes)" % (_idxSeq, len(_byteSequence)))
for b in _byteSequence:
rnaSequence = rnaSequence + base10ToRNA(ord(b))
printAndLog("+", "Block %d completed. RNA of %d nucleotides generated." % (_idxSeq, len(rnaSequence)))
_rnaSequences.append((_idxSeq, rnaSequence))
2 个回答
试着这样写(在参数列表的末尾加个逗号)
p = Process(target=decSequenceToRNA, args=(blockCount, buf, rnaSequences,))
decSequenceToRNA
是在自己的进程中运行的,这意味着它有自己独立的每个数据结构的副本,和主进程的副本是分开的。这就意味着,当你在 decSequenceToRNA
中向 _rnaSequences
添加内容时,它对主进程中的 rnaSequences
没有任何影响。这就解释了为什么返回的是一个空列表。
要解决这个问题,你有两个选择。第一个是使用 list
,通过 multiprocessing.Manager
来在进程之间共享。例如:
import multiprocessing
def f(shared_list):
shared_list.append(1)
if __name__ == "__main__":
normal_list = []
p = multiprocessing.Process(target=f, args=(normal_list,))
p.start()
p.join()
print(normal_list)
m = multiprocessing.Manager()
shared_list = m.list()
p = multiprocessing.Process(target=f, args=(shared_list,))
p.start()
p.join()
print(shared_list)
输出:
[] # Normal list didn't work, the appended '1' didn't make it to the main process
[1] # multiprocessing.Manager() list works fine
将这个应用到你的代码中,只需要把
rnaSequences = []
替换为
m = multiprocessing.Manager()
rnaSequences = m.list()
另外,你可以(而且可能应该)使用 multiprocessing.Pool
,而不是为每个数据块创建单独的 Process
。我不确定 hFile
有多大,或者你读取的数据块有多大,但如果数据块的数量超过了 multiprocessing.cpu_count()
,那么为每个数据块生成进程会影响性能。使用 Pool
,你可以保持进程数量不变,并且轻松创建你的 rnaSequence
列表:
def decSequenceToRNA(_idxSeq, _byteSequence):
rnaSequence = ''
printAndLog("!", "Processing block %d (%d bytes)" % (_idxSeq, len(_byteSequence)))
for b in _byteSequence:
rnaSequence = rnaSequence + base10ToRNA(ord(b))
printAndLog("+", "Block %d completed. RNA of %d nucleotides generated." % (_idxSeq, len(rnaSequence)))
return _idxSeq, rnaSequence
def fileToRNAString(_file):
rnaSequences = []
if (_file and os.path.isfile(_file)):
blockCount = 0
blockSize = 2048
printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
results = []
p = multiprocessing.Pool() # Creates a pool of cpu_count() processes
with open(_file, "rb") as hFile:
buf = hFile.read(blockSize)
while buf:
result = pool.apply_async(decSequenceToRNA, blockCount, buf)
results.append(result)
blockCount = blockCount + 1
buf = hFile.read(blockSize)
rnaSequences = [r.get() for r in results]
pool.close()
pool.join()
else:
printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
return rnaSequences
注意,我们不再将 rnaSequences
列表传递给子进程。相反,我们只是将结果返回给父进程(这是用 Process
做不到的),然后在父进程中构建列表。