如何矢量化包含if语句的函数?

19 投票
7 回答
25238 浏览
提问于 2025-04-18 12:36

假设我们有一个这样的函数:

def f(x, y):
    if y == 0:
        return 0
    return x/y

这个函数在处理单个数值时运行得很好。但是,当我尝试用numpy数组作为xy时,比较y == 0就变成了数组操作,这样会导致错误:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-13-9884e2c3d1cd> in <module>()
----> 1 f(np.arange(1,10), np.arange(10,20))

<ipython-input-10-fbd24f17ea07> in f(x, y)
      1 def f(x, y):
----> 2     if y == 0:
      3         return 0
      4     return x/y

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

我试过使用np.vectorize,但没有任何改善,代码还是报同样的错。其实np.vectorize是一个选择,它能给我想要的结果。

我能想到的唯一解决办法是对y数组使用np.where,像这样:

def f(x, y):
    np.where(y == 0, 0, x/y)

不过这样对单个数值是无效的。

有没有更好的方法来写一个包含if语句的函数?这个函数应该能同时处理单个数值和数组。

7 个回答

0

你可以用Heaviside函数 np.heaviside 来代替if语句。

6

一种有点笨重但有效的方法就是先对数据进行预处理:

def f(x, y):
    if type(x) == int and type(y) == int: return x/y # Will it ever be used for this?

    # Change scalars to arrays
    if type(x) == int: x = np.full(y.shape, x, dtype=y.dtype)
    if type(y) == int: y = np.full(x.shape, y, dtype=x.dtype)

    # Change all divide by zero operations to 0/1
    div_zero_idx = (y==0)
    x[div_zero_idx] = 0
    y[div_zero_idx] = 1

    return x/y

我对所有不同的方法进行了计时:

def f_mask(x, y):
    x = np.ma.array(x, mask=(y==0))
    y = np.array(y)
    ans = x/y
    ans[ans.mask]=0
    return np.asarray(ans)

def f_where(x, y):
    x = np.array(x)
    y = np.array(y)
    return np.where(y == 0, 0, x/y)

def f_vect(x, y):
    if y == 0:
        return 0
    return x/y

vf = np.vectorize(f_vect)

print timeit.timeit('f(np.random.randint(10, size=array_length), np.random.randint(10, size=array_length))', number=10000, setup="from __main__ import f; import numpy as np; array_length=1000")
print timeit.timeit('f_mask(np.random.randint(10, size=array_length), np.random.randint(10, size=array_length))', number=10000, setup="from __main__ import f_mask; import numpy as np; array_length=1000")
print timeit.timeit('f_where(np.random.randint(10, size=array_length), np.random.randint(10, size=array_length))', number=10000, setup="from __main__ import f_where; import numpy as np; array_length=1000")
print timeit.timeit('vf(np.random.randint(10, size=array_length), np.random.randint(10, size=array_length))', number=10000, setup="from __main__ import vf; import numpy as np; array_length=(1000)")

# f
# 0.760189056396

# f_mask
# 2.24414896965

# f_where
# RuntimeWarning: divide by zero encountered in divide return np.where(y == 0, 0, x/y)
# 1.08176398277

# f_vect
# 3.45374488831

第一个函数是最快的,而且没有任何警告。如果x或y是单个数字(标量),时间比率是相似的。对于更高维度的数组,使用掩码数组的方法相对会更快一些(不过它还是最慢的)。

7

你可以使用一个“掩码数组”,这样只有在y!=0的情况下才会进行除法运算:

def f(x, y):
    x = np.atleast_1d(np.array(x))
    y = np.atleast_1d(np.ma.array(y, mask=(y==0)))
    ans = x/y
    ans[ans.mask]=0
    return np.asarray(ans)
8

我想知道你在使用 np.vectorize 时遇到了什么问题。在我的系统上,它运行得很好:

In [145]: def f(x, y):
     ...:     if y == 0:
     ...:         return 0
     ...:     return x/y

In [146]: vf = np.vectorize(f)

In [147]: vf([[3],[10]], [0,1,2,0])
Out[147]: 
array([[ 0,  3,  1,  0],
       [ 0, 10,  5,  0]])

请注意,结果的 dtype 是由第一个元素的结果决定的。你也可以自己设置想要的输出:

In [148]: vf = np.vectorize(f, otypes=[np.float])

In [149]: vf([[3],[10]], [0,1,2,0])
Out[149]: 
array([[  0. ,   3. ,   1.5,   0. ],
       [  0. ,  10. ,   5. ,   0. ]])

文档 中还有更多示例。

21

一种方法是在你的函数里把 xy 转换成 numpy 数组:

def f(x, y):
    x = np.array(x)
    y = np.array(y)
    return np.where(y == 0, 0, x/y)

这样做的时候,如果 x 或者 y 其中一个是单个数字(标量),另一个是 numpy 数组,这个方法就能正常工作。如果它们都是可以进行广播的数组,也可以正常使用。不过,如果它们是形状不兼容的数组(比如,长度不同的一维数组),这个方法就不行了。不过在这种情况下,实际上也不太清楚你想要的结果是什么。

撰写回答