Python中使用多进程反而比不使用慢
我花了很多时间试图理解多进程的概念,最后写出了这段代码,用来做基准测试:
示例 1:
from multiprocessing import Process
class Alter(Process):
def __init__(self, word):
Process.__init__(self)
self.word = word
self.word2 = ''
def run(self):
# Alter string + test processing speed
for i in range(80000):
self.word2 = self.word2 + self.word
if __name__=='__main__':
# Send a string to be altered
thread1 = Alter('foo')
thread2 = Alter('bar')
thread1.start()
thread2.start()
# wait for both to finish
thread1.join()
thread2.join()
print(thread1.word2)
print(thread2.word2)
这个测试用时2秒(是多线程的一半时间)。出于好奇,我决定接着运行这个:
示例 2:
word2 = 'foo'
word3 = 'bar'
word = 'foo'
for i in range(80000):
word2 = word2 + word
word = 'bar'
for i in range(80000):
word3 = word3 + word
print(word2)
print(word3)
让我惊讶的是,这个测试竟然在不到半秒内完成!
这是怎么回事呢?我本以为多进程会更快——既然示例 1 是示例 2 分成两个进程,难道不应该用时是示例 2 的一半吗?
更新:
在考虑了Chris的反馈后,我加入了消耗最多处理时间的“实际”代码,这让我开始考虑使用多进程:
self.ListVar = [[13379+ strings],[13379+ strings],
[13379+ strings],[13379+ strings]]
for b in range(len(self.ListVar)):
self.list1 = []
self.temp = []
for n in range(len(self.ListVar[b])):
if not self.ListVar[b][n] in self.temp:
self.list1.insert(n, self.ListVar[b][n] + '(' +
str(self.ListVar[b].count(self.ListVar[b][n])) +
')')
self.temp.insert(0, self.ListVar[b][n])
self.ListVar[b] = list(self.list1)
4 个回答
这个例子太简单了,不需要用到多进程。
启动一个新进程需要消耗很多资源。如果处理的任务很复杂,这些消耗就不那么明显了。但是你的例子其实并没有那么复杂,所以你会感觉到这些额外的消耗。
如果用真正的线程,你可能会看到更明显的差别,不过可惜的是,Python(特别是CPython)在处理CPU密集型任务时,线程方面有一些问题。
多进程可能对你正在做的事情有帮助,但不是你想的那样。因为你基本上是在对列表中的每个成员进行一些计算,你可以使用 multiprocessing.Pool.map
方法,这样就可以并行地对列表中的成员进行计算。
下面是一个例子,展示了你的代码在使用单个进程和使用 multiprocessing.Pool.map
时的性能表现:
from multiprocessing import Pool
from random import choice
from string import printable
from time import time
def build_test_list():
# Builds a test list consisting of 5 sublists of 10000 strings each.
# each string is 20 characters long
testlist = [[], [], [], [], []]
for sublist in testlist:
for _ in xrange(10000):
sublist.append(''.join(choice(printable) for _ in xrange(20)))
return testlist
def process_list(l):
# the time-consuming code
result = []
tmp = []
for n in range(len(l)):
if l[n] not in tmp:
result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
tmp.insert(0, l[n])
return result
def single(l):
# process the test list elements using a single process
results = []
for sublist in l:
results.append(process_list(sublist))
return results
def multi(l):
# process the test list elements in parallel
pool = Pool()
results = pool.map(process_list, l)
return results
print "Building the test list..."
testlist = build_test_list()
print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime
print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime
# make sure they both return the same thing
assert singleresults == multiresults
print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)
输出:
Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
补充说明:现在你已经贴出了你的代码,我可以告诉你,有一种简单的方法可以让你做的事情快得多(快超过100倍)。
我看到你在做的事情是给每个字符串列表中的项目加上一个括号里的频率。与其每次都去计算所有元素的数量(你可以用cProfile确认,这确实是你代码中最大的瓶颈),不如直接创建一个字典,把每个元素和它的频率对应起来。这样,你只需要遍历列表两次——第一次是创建频率字典,第二次是用这个字典来加上频率。
在这里,我会展示我的新方法,给它计时,并用一个生成的测试案例来和旧方法进行比较。测试案例甚至显示新结果和旧结果是完全相同的。注意:下面你只需要关注新方法。
import random
import time
import collections
import cProfile
LIST_LEN = 14000
def timefunc(f):
t = time.time()
f()
return time.time() - t
def random_string(length=3):
"""Return a random string of given length"""
return "".join([chr(random.randint(65, 90)) for i in range(length)])
class Profiler:
def __init__(self):
self.original = [[random_string() for i in range(LIST_LEN)]
for j in range(4)]
def old_method(self):
self.ListVar = self.original[:]
for b in range(len(self.ListVar)):
self.list1 = []
self.temp = []
for n in range(len(self.ListVar[b])):
if not self.ListVar[b][n] in self.temp:
self.list1.insert(n, self.ListVar[b][n] + '(' + str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
self.temp.insert(0, self.ListVar[b][n])
self.ListVar[b] = list(self.list1)
return self.ListVar
def new_method(self):
self.ListVar = self.original[:]
for i, inner_lst in enumerate(self.ListVar):
freq_dict = collections.defaultdict(int)
# create frequency dictionary
for e in inner_lst:
freq_dict[e] += 1
temp = set()
ret = []
for e in inner_lst:
if e not in temp:
ret.append(e + '(' + str(freq_dict[e]) + ')')
temp.add(e)
self.ListVar[i] = ret
return self.ListVar
def time_and_confirm(self):
"""
Time the old and new methods, and confirm they return the same value
"""
time_a = time.time()
l1 = self.old_method()
time_b = time.time()
l2 = self.new_method()
time_c = time.time()
# confirm that the two are the same
assert l1 == l2, "The old and new methods don't return the same value"
return time_b - time_a, time_c - time_b
p = Profiler()
print p.time_and_confirm()
当我运行这个时,得到的时间是(15.963812112808228, 0.05961179733276367),这意味着它快了大约250倍,虽然这个优势取决于列表的长度和每个列表中的频率分布。我相信你会同意,凭借这个速度优势,你可能不需要使用多线程处理了 :)
(我原来的回答保留在下面以供后人参考)
补充说明:顺便提一下,这个算法的性能大致是线性的,而你使用的代码是平方级别的。这意味着元素数量越多,它的表现优势越明显。例如,如果你把每个列表的长度增加到1000000,运行只需要5秒。根据推算,旧代码可能需要超过一天的时间 :)
这取决于你正在执行的操作。例如:
import time
NUM_RANGE = 100000000
from multiprocessing import Process
def timefunc(f):
t = time.time()
f()
return time.time() - t
def multi():
class MultiProcess(Process):
def __init__(self):
Process.__init__(self)
def run(self):
# Alter string + test processing speed
for i in xrange(NUM_RANGE):
a = 20 * 20
thread1 = MultiProcess()
thread2 = MultiProcess()
thread1.start()
thread2.start()
thread1.join()
thread2.join()
def single():
for i in xrange(NUM_RANGE):
a = 20 * 20
for i in xrange(NUM_RANGE):
a = 20 * 20
print timefunc(multi) / timefunc(single)
在我的机器上,多进程操作的时间大约只占单线程操作的60%。