在Python中使用多进程+线程加速应用遇到问题
我有一个主要依赖CPU的应用程序,想通过使用多进程和多线程来加速,而不是仅仅使用纯线程版本。我写了一个简单的应用程序来测试我的方法,结果让我很惊讶:多进程和多进程加线程的版本表现得比线程版本和串行版本都要差。
在我的应用程序中,有一个工作队列,用来存储所有的工作。线程会一个一个地从队列中取出工作项,然后处理它,处理方式可以是直接处理(线程版本),也可以是把它传递给一个进程。线程需要等结果出来后才能继续处理下一个工作项。我之所以一个一个取工作项,是因为工作是动态的(下面粘贴的原型应用代码中并不是这样),我无法在创建时就把工作分好,分配给每个线程或进程。
我想知道我哪里做错了,以及如何能加速我的应用程序。
这是我在一台16核机器上运行时的执行时间:
Version : 2.7.2
Compiler : GCC 4.1.2 20070925 (Red Hat 4.1.2-33)
Platform : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf
Processor : x86_64
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 3505.97214699 ms
mainPureMultiprocessing exec time: 2241.89805984 ms
mainPureThreaded exec time: 309.767007828 ms
mainSerial exec time: 52.3412227631 ms
Terminating
这是我使用的代码:
import threading
import multiprocessing
import time
import platform
class ConcurrentQueue:
def __init__(self):
self.data = []
self.lock = threading.Lock()
def push(self, item):
self.lock.acquire()
try:
self.data.append(item)
finally:
self.lock.release()
return
def pop(self):
self.lock.acquire()
result = None
try:
length = len(self.data)
if length > 0:
result = self.data.pop()
finally:
self.lock.release()
return result
def isEmpty(self, item):
self.lock.acquire()
result = 0
try:
result = len(self.data)
finally:
self.lock.release()
return result != 0
def timeFunc(passedFunc):
def wrapperFunc(*arg):
startTime = time.time()
result = passedFunc(*arg)
endTime = time.time()
elapsedTime = (endTime - startTime) * 1000
print passedFunc.__name__, 'exec time:', elapsedTime, " ms"
return result
return wrapperFunc
def checkPrime(candidate):
# dummy process to do some work
for k in xrange(3, candidate, 2):
if candidate % k:
return False
return True
def fillQueueWithWork(itemQueue, numItems):
for item in xrange(numItems, 2 * numItems):
itemQueue.push(item)
@timeFunc
def mainSerial(numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
# Start: Implement a pure threaded version
def pureThreadFunc(jobQueue):
curThread = threading.currentThread()
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
return
@timeFunc
def mainPureThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a pure threaded version
# Start: Implement a pure multiprocessing version
def pureMultiprocessingFunc(jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
# do work with dataItem
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
@timeFunc
def mainPureMultiprocessing(numProcesses, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
queueSize = (numItems/numProcesses) + 10
for index in xrange(numProcesses):
jobs = multiprocessing.Queue(queueSize)
results = multiprocessing.Queue(queueSize)
loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results, ))
loopProcess.start()
workers.append((loopProcess, jobs, results))
processIndex = 0
while True:
dataItem = jobQueue.pop()
if dataItem is None:
break
workers[processIndex][1].put_nowait(dataItem)
processIndex += 1
if numProcesses == processIndex:
processIndex = 0
for worker in workers:
worker[1].put_nowait(None)
for worker in workers:
worker[0].join()
return
# End: Implement a pure multiprocessing version
# Start: Implement a threaded+multiprocessing version
def mpFunc(processName, jobQueue, resultQueue):
while True:
dataItem = jobQueue.get()
if dataItem is None:
break
result = checkPrime(dataItem)
resultQueue.put_nowait(result)
return
def mpThreadFunc(jobQueue):
curThread = threading.currentThread()
threadName = curThread.getName()
jobs = multiprocessing.Queue()
results = multiprocessing.Queue()
myProcessName = "Process-" + threadName
myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results, ))
myProcess.start()
while True:
dataItem = jobQueue.pop()
# put item to allow process to start
jobs.put_nowait(dataItem)
# terminate loop if work queue is empty
if dataItem is None:
break
# wait to get result from process
result = results.get()
# do something with result
return
@timeFunc
def mainMultiprocessAndThreaded(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
workers = []
for index in xrange(numThreads):
loopName = "Thread-" + str(index)
loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue, ))
loopThread.start()
workers.append(loopThread)
for worker in workers:
worker.join()
return
# End: Implement a threaded+multiprocessing version
if __name__ == '__main__':
print 'Version :', platform.python_version()
print 'Compiler :', platform.python_compiler()
print 'Platform :', platform.platform()
print 'Processor :', platform.processor()
numThreads = 8
numItems = 16000 #200000
print "Num Threads/Processes:", numThreads, "; Num Items:", numItems
mainMultiprocessAndThreaded(numThreads, numItems)
mainPureMultiprocessing(numThreads, numItems)
mainPureThreaded(numThreads, numItems)
mainSerial(numItems)
print "Terminating"
补充:我猜测慢的原因之一是Queue.put()在忙等待,而不是释放全局解释器锁(GIL)。如果是这样,有没有建议我应该使用的其他数据结构?
2 个回答
看起来你的函数计算量不够大,没法抵消多进程带来的开销。(注意,在Python中,由于全局解释器锁(GIL)的存在,多线程并不会增加你的计算资源。)
你的函数(checkPrime)实际上并没有真正检查一个数是否是质数,它返回的速度非常快。用一个简单(而且很基础)的质数检查器替换掉它,结果就如你所预期的那样。
不过,可以看看如何使用Python的pool.map让多个进程对一个列表进行操作,这能帮助你更好地理解多进程的简单用法。注意,有一些内置类型可以完成你队列的任务,比如Queue。可以参考http://docs.python.org/library/multiprocessing.html#multiprocessing-managers。
def checkPrime(candidate):
# dummy process to do some work
for k in xrange(3, candidate):
if not candidate % k:
return False
return True
还有一个“快速”实现的例子:
@timeFunc
def speedy(numThreads,numItems):
pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers
for i in xrange(numItems, 2 * numItems):
pool.apply_async(checkPrime,i)
pool.close()
pool.join()
这个实现几乎快了两倍!
wdolphin@Cory-linuxlaptop:~$ python test.py
Version : 2.6.6
Compiler : GCC 4.4.5
Platform : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick
Processor :
Num Threads/Processes: 8 ; Num Items: 16000
mainSerial exec time: 5555.76992035 ms
mainMultiprocessAndThreaded exec time: 4721.43602371 ms
mainPureMultiprocessing exec time: 4440.83094597 ms
mainPureThreaded exec time: 10829.3449879 ms
speedy exec time: 1898.72503281 ms
看起来每个任务的计算成本并没有超过把工作分配给另一个线程或进程所带来的额外开销。比如,我在我的电脑上运行你的测试应用时,得到了以下结果(和你的结果非常相似):
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 1134.00006294 ms
mainPureMultiprocessing exec time: 917.000055313 ms
mainPureThreaded exec time: 111.000061035 ms
mainSerial exec time: 41.0001277924 ms
Terminating
如果我把正在执行的工作改成一些计算量更大的任务,比如:
def checkPrime(candidate):
i = 0;
for k in xrange(1,10000):
i += k
return i < 5000
那么我看到的结果就更符合你预期的样子了:
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 16000
mainMultiprocessAndThreaded exec time: 2190.99998474 ms
mainPureMultiprocessing exec time: 2154.99997139 ms
mainPureThreaded exec time: 16170.0000763 ms
mainSerial exec time: 9143.00012589 ms
Terminating
你可能还想看看 multiprocessing.Pool
。它提供了一个和你描述的类似的模型(多个工作进程从一个公共队列中取任务)。在你的例子中,可能的实现方式看起来像这样:
@timeFunc
def mainPool(numThreads, numItems):
jobQueue = ConcurrentQueue()
fillQueueWithWork(jobQueue, numItems)
pool = multiprocessing.Pool(processes=numThreads)
results = []
while True:
dataItem = jobQueue.pop()
if dataItem == None:
break
results.append(pool.apply_async(checkPrime, dataItem))
pool.close()
pool.join()
在我的电脑上,使用替代的 checkPrime
实现,我看到的结果是:
Version : 2.7.1
Compiler : MSC v.1500 32 bit (Intel)
Platform : Windows-7-6.1.7601-SP1
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel
Num Threads/Processes: 8 ; Num Items: 1600
mainPool exec time: 1530.99989891 ms
Terminating
因为 multiprocessing.Pool
已经提供了安全的方式来插入工作,所以你可能可以去掉你的 ConcurrentQueue
,直接把动态任务插入到 Pool
中。