在Python和Numpy中消除for循环
我在寻找一种方法,可以在Python和/或Numpy中实现向量化,来避免使用for循环,处理以下内容:
for i in list_range_values:
v[list_list_values[i]] += list_comp_values[i]
这里的内容包括:
list_range_values是一个Python列表,里面存的是整数值,比如说[1, 3, 5],这些值是从range(0, R-1, 1)中取出来的。
list_comp_values也是一个Python列表,里面存的是数字值,比如说[0.7, 9.8, 1.2, 5, 10, 11.7, 6, 0.2],并且len(list_comp_values) = R。
v是一个长度为V的numpy向量,而R的值可以小于、等于或大于V。
list_list_values是一个Python的列表,里面又包含了多个列表(每个子列表里的整数个数可能不同,比如[[3, 6, 7], [5, 7, 11, 25, 99], [8, 45], [4, 7, 8], [0, 1], [21, 31, 41], [9, 11, 22, 33, 44], [17, 19]]),这些值是从range(0, V-1, 1)中取出来的,并且len(list_list_values) = R。
例如:
for i in list_range_values (= [1, 3, 5]):
i=1: v[[5, 7, 11, 25, 99]] += list_comp_values[1] (= 9.8)
i=3: v[[4, 7, 8]] += list_comp_values[3] (= 5)
i=5: v[[21, 31, 41]] += list_comp_values[5] (= 11.7)
有没有什么方法可以去掉for循环呢?
Cython、Scipy/Weave/Blitz和一个C模块都是其他的解决方案,但我想先确认一下是否有Numpy向量化的解决方案。
3 个回答
首先,设置你提供的变量:
import numpy as np
list_range_values = [1, 3, 5]
list_list_values = [[3, 6, 7], [5, 7, 11, 25, 99], [8, 45],
[4, 7, 8], [0, 1], [21, 31, 41]]
list_comp_values = [0.7, 9.8, 1.2, 5, 10, 11.7]
v = np.arange(100, dtype=float)
接下来,list_list_values
和 list_comp_values
需要被整理成一个连续的结构:
list_list_lens = np.array(map(len, list_list_values))
comp_vals_expanded = np.repeat(list_comp_values, list_list_lens)
import itertools
list_vals_flat = np.fromiter(itertools.chain.from_iterable(list_list_values),
dtype=int)
然后,我们需要每个子数组的起始索引:
list_list_starts = np.concatenate(([0], np.cumsum(list_list_lens)[:-1]))
现在我们有了起始值和结束值,可以使用这个问题中的 索引函数 来获取一个选择器索引的数组:
def indices(start, end):
lens = end - start
np.cumsum(lens, out=lens)
i = np.ones(lens[-1], dtype=int)
i[0] = start[0]
i[lens[:-1]] += start[1:]
i[lens[:-1]] -= end[:-1]
np.cumsum(i, out=i)
return i
toadd = indices(list_list_starts[list_range_values],
(list_list_starts + list_list_lens)[list_range_values])
现在我们完成了这些操作,可以像这样将数组添加到一起:
v[list_vals_flat[toadd]] += comp_vals_expanded[toadd]
使用你的示例输入:
>>> list_list_values = [[3, 6, 7], [5, 7, 11, 25, 99], [8, 45], [4, 7, 8],
[0, 1], [21, 31, 41], [9, 11, 22, 33, 44], [17, 19]]
>>> list_comp_values = [0.7, 9.8, 1.2, 5, 10, 11.7, 6, 0.2]
>>> list_range_values = [1, 3, 5]
首先,来点生成器的小把戏:
>>> indices_weights = ((list_list_values[i], list_comp_values[i])
for i in list_range_values)
>>> flat_indices_weights = ((i, weight) for indices, weight in indices_weights
for i in indices)
现在我们把数据传给 numpy
。我不知道怎么从一个迭代器生成 rec.array
,所以我不得不把上面的生成器转换成一个列表。也许有办法可以避免这样做……
>>> i_w = numpy.rec.array(list(flat_indices_weights),
dtype=[('i', int), ('weight', float)])
>>> numpy.histogram(i_w['i'], bins=range(0, 100), weights=i_w['weight'])
(array([ 0. , 0. , 0. , 0. , 5. , 9.8, 0. , 14.8, 5. ,
0. , 0. , 9.8, 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 11.7, 0. , 0. , 0. , 9.8, 0. ,
0. , 0. , 0. , 0. , 11.7, 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 11.7, 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 9.8]),
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67,
68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]))
我有时间跟进一下JoshAdel的测试,并做了一些自己的测试。目前最快的解决方案使用了Bago的设置,但把 sum_by_group
函数换成了内置的 histogram
函数。以下是我得到的数字 (更新):
方法1 (jterrace) : 2.65
方法2 (for循环) : 2.25
方法3 (Bago) : 1.14
方法4 (直方图) : 2.82
方法5 (3/4组合) : 1.07
请注意,按照我测试的结果,这里实现的第一种方法给出的结果是不正确的。我没有时间去弄清楚问题出在哪里。我的测试代码在下面;它只是轻微调整了JoshAdel的原始代码,但我在这里完整贴出,方便大家查看。(更新以包含Bago的评论,并稍微整理了一下。)
from timeit import Timer
setstr="""import numpy as np
import itertools
import random
Nlists = 1000
total_lists = 5000
outsz = 100
maxsublistsz = 100
# create random list of lists
list_range_values = random.sample(xrange(total_lists),Nlists)
list_list_values = [random.sample(xrange(outsz),np.random.randint(1,maxsublistsz)) for k in xrange(total_lists)]
list_comp_values = list(10*np.random.uniform(size=(total_lists,)))
v = np.zeros((outsz,))
def indices(start, end):
lens = end - start
np.cumsum(lens, out=lens)
i = np.ones(lens[-1], dtype=int)
i[0] = start[0]
i[lens[:-1]] += start[1:]
i[lens[:-1]] -= end[:-1]
np.cumsum(i, out=i)
return i
def sum_by_group(values, groups):
order = np.argsort(groups)
groups = groups[order]
values = values[order]
values.cumsum(out=values)
index = np.ones(len(groups), 'bool')
index[:-1] = groups[1:] != groups[:-1]
values = values[index]
groups = groups[index]
values[1:] = np.diff(values)
return values, groups
"""
setstr_test = setstr + "\nprint_v = True\n"
method1="""
list_list_lens = np.array(map(len, list_list_values))
comp_vals_expanded = np.repeat(list_comp_values, list_list_lens)
list_vals_flat = np.fromiter(itertools.chain.from_iterable(list_list_values),dtype=int)
list_list_starts = np.concatenate(([0], np.cumsum(list_list_lens)[:-1]))
toadd = indices(list_list_starts[list_range_values],(list_list_starts + list_list_lens)[list_range_values])
v[list_vals_flat[toadd]] += comp_vals_expanded[toadd]
"""
method2="""
for k in list_range_values:
v[list_list_values[k]] += list_comp_values[k]
"""
method3="""
llv = [np.fromiter(list_list_values[i], 'int') for i in list_range_values]
lcv = [list_comp_values[i] for i in list_range_values]
counts = map(len, llv)
indices = np.concatenate(llv)
values = np.repeat(lcv, counts)
totals, indices_unique = sum_by_group(values, indices)
v[indices_unique] += totals
"""
method4="""
indices_weights = ((list_list_values[i], list_comp_values[i]) for i in list_range_values)
flat_indices_weights = ((i, weight) for indices, weight in indices_weights for i in indices)
i_w = np.rec.array(list(flat_indices_weights), dtype=[('i', 'i'), ('weight', 'd')])
v += np.histogram(i_w['i'], bins=range(0, outsz + 1), weights=i_w['weight'], new=True)[0]
"""
method5="""
llv = [np.fromiter(list_list_values[i], 'int') for i in list_range_values]
lcv = [list_comp_values[i] for i in list_range_values]
counts = map(len, llv)
indices = np.concatenate(llv)
values = np.repeat(lcv, counts)
v += np.histogram(indices, bins=range(0, outsz + 1), weights=values, new=True)[0]
"""
t1 = Timer(method1,setup=setstr).timeit(100)
print t1
t2 = Timer(method2,setup=setstr).timeit(100)
print t2
t3 = Timer(method3,setup=setstr).timeit(100)
print t3
t4 = Timer(method4,setup=setstr).timeit(100)
print t4
t5 = Timer(method5,setup=setstr).timeit(100)
print t5
exec(setstr_test + method1 + "\nprint v\n")
exec("\nv = np.zeros((outsz,))\n" + method2 + "\nprint v\n")
exec("\nv = np.zeros((outsz,))\n" + method3 + "\nprint v\n")
exec("\nv = np.zeros((outsz,))\n" + method4 + "\nprint v\n")
exec("\nv = np.zeros((outsz,))\n" + method5 + "\nprint v\n")
虽然通常情况下,去掉for循环并利用numpy的内置函数或向量化处理可以大幅提高速度,但我想提醒一下,这并不总是如此。简单的for循环和复杂的向量化处理相比,时间上并没有显著的提升,而且代码也更复杂。这是一个需要考虑的点:
from timeit import Timer
setstr="""import numpy as np
import itertools
import random
Nlists = 1000
total_lists = 5000
outsz = 100
maxsublistsz = 100
# create random list of lists
list_range_values = random.sample(xrange(total_lists),Nlists)
list_list_values = [random.sample(xrange(outsz),np.random.randint(1,maxsublistsz)) for k in xrange(total_lists)]
list_comp_values = 10*np.random.uniform(size=(total_lists,))
v = np.zeros((outsz,))
def indices(start, end):
lens = end - start
np.cumsum(lens, out=lens)
i = np.ones(lens[-1], dtype=int)
i[0] = start[0]
i[lens[:-1]] += start[1:]
i[lens[:-1]] -= end[:-1]
np.cumsum(i, out=i)
return i
def sum_by_group(values, groups):
order = np.argsort(groups)
groups = groups[order]
values = values[order]
values.cumsum(out=values)
index = np.ones(len(groups), 'bool')
index[:-1] = groups[1:] != groups[:-1]
values = values[index]
groups = groups[index]
values[1:] = np.diff(values)
return values, groups
"""
method1="""
list_list_lens = np.array(map(len, list_list_values))
comp_vals_expanded = np.repeat(list_comp_values, list_list_lens)
list_vals_flat = np.fromiter(itertools.chain.from_iterable(list_list_values),dtype=int)
list_list_starts = np.concatenate(([0], np.cumsum(list_list_lens)[:-1]))
toadd = indices(list_list_starts[list_range_values],(list_list_starts + list_list_lens)[list_range_values])
v[list_vals_flat[toadd]] += comp_vals_expanded[toadd]
"""
method2="""
for k in list_range_values:
v[list_list_values[k]] += list_comp_values[k]
"""
method3="""
llv = [list_list_values[i] for i in list_range_values]
lcv = [list_comp_values[i] for i in list_range_values]
counts = map(len, llv)
indices = np.concatenate(llv)
values = np.repeat(lcv, counts)
totals, indices_unique = sum_by_group(values, indices)
v[indices_unique] += totals
"""
t1 = Timer(method1,setup=setstr).timeit(100)
print t1
t2 = Timer(method2,setup=setstr).timeit(100)
print t2
t3 = Timer(method3,setup=setstr).timeit(100)
print t3
对于列表中元素数量比较大的情况:
方法1: (没有for循环 - jterrace) 1.43秒
方法2: (有for循环) 4.62秒
方法3: (没有for循环 - bago) 2.99秒
而对于列表数量较少的情况(将Nlists
改为10),for循环的速度明显快于jterrace的方案:
方法1: (没有for循环 - jterrace) 1.05秒
方法2: (有for循环) 0.045秒
方法3: (没有for循环 - bago) 0.041秒
这并不是说@jterrace或@bago的方案不好,它们其实很优雅。我的意思是,很多时候简单的for循环并没有那么糟糕。