为什么concurrent.futures不复制参数?
我理解的是,concurrent.futures这个模块依赖于“序列化”参数,以便在不同的进程(或者线程)中运行。序列化应该会创建参数的一个副本吧?在Linux上似乎并没有这样做,也就是说,我必须明确地传递一个副本。
我正在试图理解以下结果:
<0> rands before submission: [17, 72, 97, 8, 32, 15, 63, 97, 57, 60]
<1> rands before submission: [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
<2> rands before submission: [15, 57, 63, 17, 97, 97, 8, 32, 60, 72]
<3> rands before submission: [32, 97, 63, 72, 17, 57, 97, 8, 15, 60]
in function 0 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
in function 1 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 2 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
in function 3 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
这是代码:
from __future__ import print_function
import time
import random
try:
from concurrent import futures
except ImportError:
import futures
def work_with_rands(i, rands):
print('in function', i, rands)
def main():
random.seed(1)
rands = [random.randrange(100) for _ in range(10)]
# sequence 1 and sequence 2 should give the same results but they don't
# only difference is that one uses a copy of rands (i.e., rands.copy())
# sequence 1
with futures.ProcessPoolExecutor() as ex:
for i in range(4):
print("<{}> rands before submission: {}".format(i, rands))
ex.submit(work_with_rands, i, rands)
random.shuffle(rands)
print('-' * 30)
random.seed(1)
rands = [random.randrange(100) for _ in range(10)]
# sequence 2
print("initial sequence: ", rands)
with futures.ProcessPoolExecutor() as ex:
for i in range(4):
print("<{}> rands before submission: {}".format(i, rands))
ex.submit(work_with_rands, i, rands[:])
random.shuffle(rands)
if __name__ == "__main__":
main()
那[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
到底是从哪里来的?这根本不是传给submit
的任何序列。
在Python 2下,结果稍微有点不同。
2 个回答
你在所有线程中共享同一个列表,并且这个列表会被修改。这让调试变得很困难,因为当你添加打印语句时,程序的表现会不一样。不过,这个 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
必须是在 shuffle
函数内部的一个状态。shuffle 函数持有这个列表(就是在所有线程中都存在的那个列表),并且会多次改变它。当线程被调用时,状态是 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8]
。这些值并不是立刻被复制的,而是在另一个线程中被复制,所以你无法保证它们什么时候会被复制。
下面是 shuffle 在完成之前产生的一个例子:
[31, 64, 88, 7, 68, 85, 69, 3, 15, 47] # initial value (rands)
# ex.submit() is called here
# shuffle() is called here
# shuffle starts changing rand to:
[31, 64, 88, 47, 68, 85, 69, 3, 15, 7]
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7]
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7]
[31, 64, 69, 47, 68, 85, 15, 3, 88, 7]
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # threads may be called here
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here
[31, 85, 64, 47, 68, 69, 15, 3, 88, 7]
[85, 31, 64, 47, 68, 69, 15, 3, 88, 7] # value when the shuffle has finished
shuffle 的源代码:
def shuffle(self, x, random=None):
if random is None:
randbelow = self._randbelow
for i in reversed(range(1, len(x))):
# pick an element in x[:i+1] with which to exchange x[i]
j = randbelow(i+1)
x[i], x[j] = x[j], x[i]
# added this print here. that's what prints the output above
# your threads are probably being called when this is still pending
print(x)
... other staff here
所以如果你的输入是 [17, 72, 97, 8, 32, 15, 63, 97, 57, 60]
,而你的输出是 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63]
,那么 shuffle 在这之间有“中间步骤”。你的线程在“中间步骤”中被调用。
这是一个没有修改的例子,通常来说,尽量避免在多个线程之间共享数据,因为这真的很难做到正确:
def work_with_rands(i, rands):
print('in function', i, rands)
def foo(a):
random.seed(random.randrange(999912)/9)
x = [None]*len(a)
for i in a:
_rand = random.randrange(len(a))
while x[_rand] is not None:
_rand = random.randrange(len(a))
x[_rand] = i
return x
def main():
rands = [random.randrange(100) for _ in range(10)]
with futures.ProcessPoolExecutor() as ex:
for i in range(4):
new_rands = foo(rands)
print("<{}> rands before submission: {}".format(i, new_rands ))
ex.submit(work_with_rands, i, new_rands )
<0> rands before submission: [84, 12, 93, 47, 40, 53, 74, 38, 52, 62]
<1> rands before submission: [74, 53, 93, 12, 38, 47, 52, 40, 84, 62]
<2> rands before submission: [84, 12, 93, 38, 62, 52, 53, 74, 47, 40]
<3> rands before submission: [53, 62, 52, 12, 84, 47, 93, 40, 74, 38]
in function 0 [84, 12, 93, 47, 40, 53, 74, 38, 52, 62]
in function 1 [74, 53, 93, 12, 38, 47, 52, 40, 84, 62]
in function 2 [84, 12, 93, 38, 62, 52, 53, 74, 47, 40]
in function 3 [53, 62, 52, 12, 84, 47, 93, 40, 74, 38]
简单来说,ProcessPoolExecutor.submit() 方法会把你要执行的函数和它的参数放到一个叫“工作项”的字典里,这个字典是可以被其他线程共享的。这个线程叫做_queue_management_worker,它会把字典里的工作项传递到一个队列中,实际的工作进程会从这个队列里读取任务。
在源代码中,有一段注释描述了这个并发模块的架构:http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l6
结果发现,在提交任务的过程中,_queue_management_worker 没有足够的时间去接收到新任务的通知。
所以,这个线程一直在这里等待:(http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l226),只有在调用 ProcessPoolExecutor.shutdown(也就是退出 ProcessPoolExecutor 的上下文)时才会醒来。
如果你在你的第一个任务序列中加一点延迟,比如这样:
with futures.ProcessPoolExecutor() as ex:
for i in range(4):
print("<{}> rands before submission: {}".format(i, rands))
ex.submit(work_with_rands, i, rands)
random.shuffle(rands)
time.sleep(0.01)
你会看到,_queue_management_worker 会醒来并把任务传递给工作进程,而 work_with_rands 会打印出不同的值。