线程与队列对比串行性能
我觉得看看线程和队列会很有意思,所以我写了两个脚本。一个是把文件分成小块,然后在一个线程里加密每一块,另一个则是一个接一个地处理。我对Python还很陌生,不太明白为什么使用线程的脚本反而要花更多时间。
使用线程的脚本:
#!/usr/bin/env python
from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue
BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit
TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048
KEY = os.urandom(32)
class DataSplit():
def __init__(self,fileObj, chunkSize):
self.fileObj = fileObj
self.chunkSize = chunkSize
def split(self):
while True:
data = self.fileObj.read(self.chunkSize)
if not data:
break
yield data
class encThread(threading.Thread):
def __init__(self, seg_queue,result_queue, cipher):
threading.Thread.__init__(self)
self.seg_queue = seg_queue
self.result_queue = result_queue
self.cipher = cipher
def run(self):
while True:
#Grab a data segment from the queue
data = self.seg_queue.get()
encSegment = []
for lines in data:
encSegment.append(self.cipher.encrypt(lines))
self.result_queue.put(encSegment)
print "Segment Encrypted"
self.seg_queue.task_done()
start = time.time()
def main():
seg_queue = Queue.Queue()
result_queue = Queue.Queue()
estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1
cipher = AES.new(KEY, AES.MODE_CFB)
#Spawn threads (one for each segment at the moment)
for i in range(estSegCount):
eT = encThread(seg_queue, result_queue, cipher)
eT.setDaemon(True)
eT.start()
print ("thread spawned")
fileObj = open(TFILE, "rb")
splitter = DataSplit(fileObj, CHUNK_SIZE)
for data in splitter.split():
seg_queue.put(data)
print ("Data sent to thread")
seg_queue.join()
#result_queue.join()
print ("Seg Q: {0}".format(seg_queue.qsize()))
print ("Res Q: {0}".format(result_queue.qsize()))
main()
print ("Elapsed Time: {0}".format(time.time()-start))
串行处理的脚本:
#!/usr/bin/env python
from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue
TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048
class EncSeries():
def __init(self):
pass
def loadFile(self,path):
openFile = open(path, "rb")
#fileData = openFile.readlines()
fileData = openFile.read(CHUNK_SIZE)
openFile.close()
return fileData
def encryptData(self,key, data):
cipher = AES.new(key, AES.MODE_CFB)
newData = []
for lines in data:
newData.append(cipher.encrypt(lines))
return newData
start = time.time()
def main():
print ("Start")
key = os.urandom(32)
run = EncSeries()
fileData = run.loadFile(TFILE)
encFileData=run.encryptData(key, fileData)
print("Finish")
main()
print ("Elapsed Time: {0}".format(time.time()-start))
使用readlines()而不是read,似乎也让串行版本的速度快了很多,但它已经比线程版本快得多了。
5 个回答
线程并不是让程序变快的神奇方法——把工作分成多个线程通常会让程序变慢,除非这个程序大部分时间都在等待输入输出(I/O)。每增加一个新线程,代码中就会增加更多的开销,因为要分配工作,同时操作系统在不同线程之间切换时也会增加开销。
理论上,如果你在一个多处理器的CPU上运行,那么这些线程可以在不同的处理器上同时运行,这样工作就能并行完成,但即便如此,线程的数量也不应该超过处理器的数量。
实际上,情况就不一样了,至少对于C版本的Python来说。全局解释器锁(GIL)在多个处理器上表现得并不好。想了解原因,可以看看David Beazley的这个演讲。而IronPython和Jython就没有这个问题。
如果你真的想让工作并行处理,最好是启动多个进程,把工作分配给它们,但要注意,进程间传递大量数据的开销可能会抵消并行处理带来的好处。
我看了Dave Kirby分享的演示,并尝试了一个示例计数器,结果发现用两个线程运行的时间比预期的要长超过两倍:
import time
from threading import Thread
countmax=100000000
def count(n):
while n>0:
n-=1
def main1():
count(countmax)
count(countmax)
def main2():
t1=Thread(target=count,args=(countmax,))
t2=Thread(target=count,args=(countmax,))
t1.start()
t2.start()
t1.join()
t2.join()
def timeit(func):
start = time.time()
func()
end=time.time()-start
print ("Elapsed Time: {0}".format(end))
if __name__ == '__main__':
timeit(main1)
timeit(main2)
输出结果:
Elapsed Time: 21.5470001698
Elapsed Time: 55.3279998302
但是,如果我把线程改成进程:
from multiprocessing import Process
然后
t1=Process(target ....
等等,我得到了这个输出:
Elapsed Time: 20.5
Elapsed Time: 10.4059998989
现在感觉就像我的奔腾CPU有两个核心,我猜这可能是超线程的原因。有没有人能在他们的双核或四核机器上试试,运行2个或4个线程?
可以查看Python 2.6.4的文档,了解多进程处理的相关内容。
看起来你的第二个版本只读取了一个数据块,而第一个版本是读取了整个文件,这就解释了速度提升这么大。补充:还有一个问题:我刚注意到你在用
for lines in data
,其实没必要这样做——这会让每个字符单独加密,速度会慢很多。你只需要直接把数据传给encrypt
就可以了。启动的线程数量没有必要超过你的处理器核心数。
线程只有在调用一个可以释放全局解释器锁(GIL)的扩展模块时才能并行工作。我觉得 PyCrypto 并没有做到这一点,所以在这里你无法实现真正的并行处理。
如果瓶颈在于磁盘性能,那么你在这里也不会看到太大的改进——在这种情况下,最好是有一个线程负责磁盘输入输出,另一个线程负责加密。因为在进行磁盘输入输出时,GIL 会被释放,所以就不会有问题。