线程与队列对比串行性能

0 投票
5 回答
2465 浏览
提问于 2025-04-15 15:58

我觉得看看线程和队列会很有意思,所以我写了两个脚本。一个是把文件分成小块,然后在一个线程里加密每一块,另一个则是一个接一个地处理。我对Python还很陌生,不太明白为什么使用线程的脚本反而要花更多时间。

使用线程的脚本:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue


BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit
TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048
KEY = os.urandom(32)

class DataSplit():
    def __init__(self,fileObj, chunkSize):

        self.fileObj = fileObj
        self.chunkSize = chunkSize

    def split(self):
        while True:
            data = self.fileObj.read(self.chunkSize)
            if not data:
                break
            yield data

class encThread(threading.Thread):
    def __init__(self, seg_queue,result_queue, cipher):
        threading.Thread.__init__(self)
        self.seg_queue = seg_queue
        self.result_queue = result_queue
        self.cipher = cipher

    def run(self):
        while True:
            #Grab a data segment from the queue
            data = self.seg_queue.get()
            encSegment = []           
            for lines in data:
            encSegment.append(self.cipher.encrypt(lines))
            self.result_queue.put(encSegment)
            print "Segment Encrypted"
            self.seg_queue.task_done()

start = time.time()
def main():
    seg_queue = Queue.Queue()
    result_queue = Queue.Queue()
    estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1
    cipher = AES.new(KEY, AES.MODE_CFB)
    #Spawn threads (one for each segment at the moment)
    for i in range(estSegCount):
        eT = encThread(seg_queue, result_queue, cipher)
        eT.setDaemon(True)
        eT.start()
        print ("thread spawned")

    fileObj = open(TFILE, "rb")
    splitter = DataSplit(fileObj, CHUNK_SIZE)
    for data in splitter.split():
        seg_queue.put(data)
        print ("Data sent to thread")

    seg_queue.join()
    #result_queue.join()
    print ("Seg Q: {0}".format(seg_queue.qsize()))
    print ("Res Q: {0}".format(result_queue.qsize()))



main()
print ("Elapsed Time: {0}".format(time.time()-start))

串行处理的脚本:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue

TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048

class EncSeries():
    def __init(self):
        pass

    def loadFile(self,path):
        openFile = open(path, "rb")
        #fileData = openFile.readlines()
        fileData = openFile.read(CHUNK_SIZE)
        openFile.close()
        return fileData

    def encryptData(self,key, data):
        cipher = AES.new(key, AES.MODE_CFB)
        newData = []
        for lines in data:
            newData.append(cipher.encrypt(lines))
        return newData


start = time.time()
def main():
    print ("Start")
    key = os.urandom(32)
    run = EncSeries()
    fileData = run.loadFile(TFILE)

    encFileData=run.encryptData(key, fileData)
    print("Finish")

main()
print ("Elapsed Time: {0}".format(time.time()-start))

使用readlines()而不是read,似乎也让串行版本的速度快了很多,但它已经比线程版本快得多了。

5 个回答

1

线程并不是让程序变快的神奇方法——把工作分成多个线程通常会让程序变慢,除非这个程序大部分时间都在等待输入输出(I/O)。每增加一个新线程,代码中就会增加更多的开销,因为要分配工作,同时操作系统在不同线程之间切换时也会增加开销。

理论上,如果你在一个多处理器的CPU上运行,那么这些线程可以在不同的处理器上同时运行,这样工作就能并行完成,但即便如此,线程的数量也不应该超过处理器的数量。

实际上,情况就不一样了,至少对于C版本的Python来说。全局解释器锁(GIL)在多个处理器上表现得并不好。想了解原因,可以看看David Beazley的这个演讲。而IronPython和Jython就没有这个问题。

如果你真的想让工作并行处理,最好是启动多个进程,把工作分配给它们,但要注意,进程间传递大量数据的开销可能会抵消并行处理带来的好处。

1

我看了Dave Kirby分享的演示,并尝试了一个示例计数器,结果发现用两个线程运行的时间比预期的要长超过两倍:

import time
from threading import Thread

countmax=100000000

def count(n):
    while n>0:
        n-=1

def main1():
    count(countmax)
    count(countmax)

def main2():
    t1=Thread(target=count,args=(countmax,))
    t2=Thread(target=count,args=(countmax,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

def timeit(func):
    start = time.time()
    func()
    end=time.time()-start
    print ("Elapsed Time: {0}".format(end))

if __name__ == '__main__':
    timeit(main1)
    timeit(main2)

输出结果:

Elapsed Time: 21.5470001698
Elapsed Time: 55.3279998302

但是,如果我把线程改成进程:

from multiprocessing import Process

然后

t1=Process(target ....

等等,我得到了这个输出:

Elapsed Time: 20.5
Elapsed Time: 10.4059998989

现在感觉就像我的奔腾CPU有两个核心,我猜这可能是超线程的原因。有没有人能在他们的双核或四核机器上试试,运行2个或4个线程?

可以查看Python 2.6.4的文档,了解多进程处理的相关内容。

1
  1. 看起来你的第二个版本只读取了一个数据块,而第一个版本是读取了整个文件,这就解释了速度提升这么大。补充:还有一个问题:我刚注意到你在用 for lines in data,其实没必要这样做——这会让每个字符单独加密,速度会慢很多。你只需要直接把数据传给 encrypt 就可以了。

  2. 启动的线程数量没有必要超过你的处理器核心数。

  3. 线程只有在调用一个可以释放全局解释器锁(GIL)的扩展模块时才能并行工作。我觉得 PyCrypto 并没有做到这一点,所以在这里你无法实现真正的并行处理。

  4. 如果瓶颈在于磁盘性能,那么你在这里也不会看到太大的改进——在这种情况下,最好是有一个线程负责磁盘输入输出,另一个线程负责加密。因为在进行磁盘输入输出时,GIL 会被释放,所以就不会有问题。

撰写回答