Python/多进程:进程似乎没有启动

1 投票
2 回答
1047 浏览
提问于 2025-04-18 16:43

我有一个函数,它可以读取一个二进制文件,并把每个字节转换成对应的字符序列。比如,0x05会变成'AACC',0x2A会变成'AGGG'等等。这个读取文件并转换字节的函数现在是线性的,因为要转换的文件大小在25KB到2MB之间,所以这个过程可能会花费不少时间。

因此,我想尝试使用多进程来分担这个任务,希望能提高速度。不过,我就是搞不定。下面是那个线性函数,它虽然能工作,但速度慢;

def fileToRNAString(_file):

    if (_file and os.path.isfile(_file)):
        rnaSequences = []
        blockCount = 0
        blockSize = 2048
        printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
        with open(_file, "rb") as hFile:
            buf = hFile.read(blockSize)
            while buf:
                decSequenceToRNA(blockCount, buf, rnaSequences)
                blockCount = blockCount + 1
                buf = hFile.read(blockSize)
    else:
        printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
    return rnaSequences

注意:这个函数'decSequenceToRNA'会读取缓冲区,并把每个字节转换成所需的字符串。执行后,这个函数会返回一个元组,里面包含块的编号和字符串,比如(1, 'ACCGTAGATTA...'),最后我会得到一个包含这些元组的数组。

我尝试把这个函数改成使用Python的多进程;

def fileToRNAString(_file):
    rnaSequences = []
    if (_file and os.path.isfile(_file)):
        blockCount = 0
        blockSize = 2048
        printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
        workers = []
        with open(_file, "rb") as hFile:
            buf = hFile.read(blockSize)
            while buf:
                p = Process(target=decSequenceToRNA, args=(blockCount, buf, rnaSequences))
                p.start()
                workers.append(p)
                blockCount = blockCount + 1
                buf = hFile.read(blockSize)
        for p in workers:
            p.join()
    else:
        printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
    return rnaSequences

但是,似乎没有任何进程启动,因为当这个函数运行时,返回的是一个空数组。在'decSequenceToRNA'中打印到控制台的任何信息都没有显示;

>>>fileToRNAString(testfile)
[!] Converting /root/src/amino56/M1H2.bin into RNA string (2048 bytes/block).

和这个问题不同,我是在Linux shiva 3.14-kali1-amd64 #1 SMP Debian 3.14.5-1kali1 (2014-06-07) x86_64 GNU/Linux上运行,并且使用PyCrust来测试Python版本:2.7.3。我使用的包如下:

import os
import re
import sys
import urllib2
import requests
import logging
import hashlib
import argparse
import tempfile
import shutil
import feedparser
from multiprocessing import Process

我希望能得到帮助,找出我的代码为什么不工作,或者我是否在其他地方遗漏了什么,以使进程能够正常工作。同时也欢迎对改进代码的建议。下面是'decSequenceToRNA'的参考代码:

def decSequenceToRNA(_idxSeq, _byteSequence, _rnaSequences):
    rnaSequence = ''
    printAndLog("!", "Processing block %d (%d bytes)" % (_idxSeq, len(_byteSequence)))
    for b in _byteSequence:
        rnaSequence = rnaSequence + base10ToRNA(ord(b))
    printAndLog("+", "Block %d completed. RNA of %d nucleotides generated." % (_idxSeq, len(rnaSequence)))
    _rnaSequences.append((_idxSeq, rnaSequence))

2 个回答

-1

试着这样写(在参数列表的末尾加个逗号)

p = Process(target=decSequenceToRNA, args=(blockCount, buf, rnaSequences,))
1

decSequenceToRNA 是在自己的进程中运行的,这意味着它有自己独立的每个数据结构的副本,和主进程的副本是分开的。这就意味着,当你在 decSequenceToRNA 中向 _rnaSequences 添加内容时,它对主进程中的 rnaSequences 没有任何影响。这就解释了为什么返回的是一个空列表。

要解决这个问题,你有两个选择。第一个是使用 list,通过 multiprocessing.Manager 来在进程之间共享。例如:

import multiprocessing

def f(shared_list):
    shared_list.append(1)

if __name__ == "__main__":
    normal_list = []
    p = multiprocessing.Process(target=f, args=(normal_list,))
    p.start()
    p.join()
    print(normal_list)

    m = multiprocessing.Manager()
    shared_list = m.list()
    p = multiprocessing.Process(target=f, args=(shared_list,))
    p.start()
    p.join()
    print(shared_list)

输出:

[]   # Normal list didn't work, the appended '1' didn't make it to the main process
[1]  # multiprocessing.Manager() list works fine

将这个应用到你的代码中,只需要把

rnaSequences = []

替换为

m = multiprocessing.Manager()
rnaSequences = m.list()

另外,你可以(而且可能应该)使用 multiprocessing.Pool,而不是为每个数据块创建单独的 Process。我不确定 hFile 有多大,或者你读取的数据块有多大,但如果数据块的数量超过了 multiprocessing.cpu_count(),那么为每个数据块生成进程会影响性能。使用 Pool,你可以保持进程数量不变,并且轻松创建你的 rnaSequence 列表:

def decSequenceToRNA(_idxSeq, _byteSequence):
    rnaSequence = ''
    printAndLog("!", "Processing block %d (%d bytes)" % (_idxSeq, len(_byteSequence)))
    for b in _byteSequence:
        rnaSequence = rnaSequence + base10ToRNA(ord(b))
    printAndLog("+", "Block %d completed. RNA of %d nucleotides generated." % (_idxSeq, len(rnaSequence)))
    return _idxSeq, rnaSequence

def fileToRNAString(_file):
    rnaSequences = []
    if (_file and os.path.isfile(_file)):
        blockCount = 0
        blockSize = 2048
        printAndLog("!", "Converting %s into RNA string (%d bytes/block)" % (_file, blockSize))
        results = []
        p = multiprocessing.Pool()  # Creates a pool of cpu_count() processes
        with open(_file, "rb") as hFile:
            buf = hFile.read(blockSize)
            while buf:
                result = pool.apply_async(decSequenceToRNA, blockCount, buf)
                results.append(result)
                blockCount = blockCount + 1
                buf = hFile.read(blockSize)
        rnaSequences = [r.get() for r in results]
        pool.close()
        pool.join()
    else:
        printAndLog("-", "Could not find the specified file. Please verify that the file exists:" + _file)
    return rnaSequences

注意,我们不再将 rnaSequences 列表传递给子进程。相反,我们只是将结果返回给父进程(这是用 Process 做不到的),然后在父进程中构建列表。

撰写回答