如何加速Python中numpy数组的填充?

8 投票
4 回答
3944 浏览
提问于 2025-04-16 15:50

我正在尝试用以下代码填充一个预先分配的字节数组:

# preallocate a block array
dt = numpy.dtype('u8')
in_memory_blocks = numpy.zeros(_AVAIL_IN_MEMORY_BLOCKS, dt)

...

# write all the blocks out, flushing only as desired
blocks_per_flush_xrange = xrange(0, blocks_per_flush)
for _ in xrange(0, num_flushes):
    for block_index in blocks_per_flush_xrange:
        in_memory_blocks[block_index] = random.randint(0, _BLOCK_MAX)

    print('flushing bytes stored in memory...')

    # commented out for SO; exists in actual code
    # removing this doesn't make an order-of-magnitude difference in time
    # m.update(in_memory_blocks[:blocks_per_flush])

    in_memory_blocks[:blocks_per_flush].tofile(f)

几点说明:

  • num_flushes 的值很小,大约在 4 到 10 之间
  • blocks_per_flush 的值很大,数量级在百万级
  • in_memory_blocks 可以是一个相当大的缓冲区(我设置过最低为 1MB,最高为 100MB),但时间非常一致……
  • _BLOCK_MAX 是一个 8 字节无符号整数的最大值
  • m 是一个 hashilib.md5() 对象

使用上述代码生成 1MB 的数据大约需要 1 秒;生成 500MB 则需要大约 376 秒。相比之下,我的简单 C 程序使用 rand() 可以在 8 秒内创建一个 500MB 的文件。

我该如何提高上述循环的性能?我很确定我忽略了一些明显的东西,导致运行时间差异如此巨大。

4 个回答

0

如果你只是想每次填充一个文件,填入 block_size 字节的数据,这种方法可能比之前的答案更快。这个方法是基于生成器的,完全不需要创建数组:

import numpy as np

def random_block_generator(block_size):
    while True:
        yield np.random.bytes(block_size)

rbg = random_block_generator(BLOCK_SIZE)

然后你的使用方法是:

f = open('testfile.bin','wb')

for _ in xrange(blocks_to_write):
    f.write( rbg.next())

f.close()

Numpy 使用的是确定性的随机数生成(也就是说,序列中的下一个数字总是一样的,只是初始化时从一个随机的地方开始)。如果你需要真正的随机数据(比如用于加密),那么你可以用 import Crypto.Random as cryield cr.get_random_bytes(block_size) 来替代 np。

另外,如果你的 BLOCK_SIZE 是一个固定的常量,你可以像这样使用生成器表达式(这次使用 Crypto 库):

import Crypto.Random as cr
from itertools import repeat

BLOCK_SIZE = 1000

rbg = (cr.get_random_bytes(BLOCK_SIZE) for _ in repeat(0))

f = open('testfile.bin','wb')

for _ in xrange(blocks_to_write):
    f.write( rbg.next())

f.close()

这包括了实现 rbg=... 和执行的部分。这个生成器方法,即使使用稍微慢一点的 Crypto.Random,也会在磁盘输入输出达到极限之前就不会再增加计算量(不过我相信其他答案也是如此)。

更新:

在 Athlon X2 245 上的一些时间测试数据 --

  • Crypto: 生成 500MB,不写入 -- 10.8秒(46 MB/s)
  • Crypto: 生成 500MB 并写入 -- 11.2秒(44.5 MB/s)
  • Numpy: 生成 500MB,不写入 -- 1.4秒(360 MB/s)
  • Numpy: 生成 500MB,并写入 -- 7.1秒(70 MB/s)

所以 Numpy 的版本大约快了 8 倍(足够快,可以让我的老式硬盘达到极限)。我测试了这两种方法,使用的是生成器表达式的形式,而不是生成器函数的形式。

7

因为 0.._BLOCK_MAX 包含了所有可能的 numpy.uint8 的值(我猜 numpy.dtype('u8') 是个笔误,实际上是指 numpy.uint64),你可以使用:

import numpy as np

for _ in xrange(0, num_flushes):
    in_memory_blocks = np.frombuffer(np.random.bytes(blocks_per_flush),
                                     dtype=np.uint8)

    print('flushing bytes stored in memory...')
    # ...

这个方法比 @hgomersall 的方法快大约8倍:

$ python -mtimeit -s'import numpy as np' '
>     np.uint8(np.random.randint(0,256,20000000))'
10 loops, best of 3: 316 msec per loop

$ python -mtimeit -s'import numpy as np' '
>     np.frombuffer(np.random.bytes(20000000), dtype=np.uint8)'
10 loops, best of 3: 38.6 msec per loop

如果 numpy.dtype('u8') 不是笔误,你确实需要 numpy.uint64,那么:

a = np.int64(np.random.random_integers(0, _BLOCK_MAX, blocks_per_flush))
in_memory_blocks = a.view(np.uint64) # unsigned

注意:如果数组的类型已经是 np.int64,那么 np.int64() 不会创建副本。使用 .view(numpy.uint64) 会强制将其视为无符号类型(同样不会创建副本)。

4

因为你在分配连续的内存块,所以你可以这样做(完全去掉内部循环):

for _ in xrange(0, num_flushes):
    in_memory_blocks[:blocks_per_flush] = numpy.random.randint(
            0, _BLOCK_MAX+1, blocks_per_flush)

    print('flushing bytes stored in memory...')

    # commented out for SO; exists in actual code
    # removing this doesn't make an order-of-magnitude difference in time
    # m.update(in_memory_blocks[:blocks_per_flush])

    in_memory_blocks[:blocks_per_flush].tofile(f)

这里使用了 numpy.random.randint 函数,它会分配一整块内存并用随机整数填充(注意下面 J.F. Sebastian 的评论,提到 numpy.random.randintrandom.randint 的区别)。我没有看到有什么方法可以用 numpy 的随机函数填充一个预先分配的数组。另一个问题是,numpy 的 randint 返回的是 int64 类型的数组。如果你需要其他大小的整数,可以使用 numpy 的类型方法,比如 numpy.uint8。如果你想要的随机整数覆盖这个类型的整个范围,那么下面 J.F. Sebastian 提到的使用 numpy.random.bytes 的方法几乎在任何情况下都是最好的选择!

不过,简单的测试显示,运行时间是合理的(和 C 代码差不多)。以下代码测试了使用 numpy 方法分配 20,000,000 个随机整数的 uint8 数组所需的时间:

from timeit import Timer
t = Timer(stmt='a=numpy.uint8(numpy.random.randint(0, 100, 20000000))',
        setup='import numpy')
test_runs = 50
time = t.timeit(test_runs)/test_runs
print time

我在我的一台四年旧的 Core2 笔记本上测试,发现每次分配大约需要 0.7 秒(它运行了 50 次,所以整个测试会花更长时间)。这意味着每次分配 20,000,000 个随机的 uint8 整数大约需要 0.7 秒,所以我预计整个 500MB 的分配时间大约在 20 秒左右。

如果内存更多,你可以一次性分配更大的块,但你仍然在为每个整数分配和写入 64 位的内存,而实际上你只需要 8 位(我还没有量化这个影响)。如果速度还是不够快,你可以通过 numpy 的 ctypes 接口调用你的 C 实现。这其实很简单,而且几乎不会比纯 C 慢。

总的来说,使用 numpy 时,尽量使用现有的 numpy 函数,记住退回到 C 的 ctypes 也不是太麻烦。一般来说,这种方法可以让 Python 在数值处理上非常有效,几乎没有速度损失。

编辑:我刚想到的另一件事是:按照上面的实现方式,我觉得你可能会多做一个不必要的拷贝。如果 in_memory_blocks 的长度是 blocks_per_flush,那么你最好直接把它赋值为 numpy.random.randint 的返回值,而不是分配给某个子数组(在一般情况下这必须是一个拷贝)。所以:

in_memory_blocks = numpy.random.randint(0, _BLOCK_MAX+1, blocks_per_flush)

而不是:

in_memory_blocks[:blocks_per_flush] = numpy.random.randint(
        0, _BLOCK_MAX+1, blocks_per_flush)

不过,经过计时,第一种情况并没有显著提高速度(只有大约 2%),所以可能不值得太担心。我想大部分时间都是花在实际生成随机数上(这也是我预期的)。

撰写回答