在Cython数组中输入Python序列(然后返回)

2024-06-16 10:21:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我第一次成功地使用Cython显著加快了将半字节从一个整数列表(字节)打包到另一个(参见Faster bit-level data packing),例如将两个连续字节0x0A和{}打包到0xAB。在

def pack(it):
    """Cythonize python nibble packing loop, typed"""
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    return [ (it[i*2]//16)<<4 | it[i*2+1]//16 for i in range(n) ]

虽然得到的速度令人满意,但我很好奇,通过更好地使用输入和输出列表,是否可以进一步提高速度。在

cython3 -a pack.cyx生成一个非常“cythonic”的HTML报告,不幸的是,我没有足够的经验从中得出任何有用的结论。在

从C的角度来看,循环应该“简单地”访问两个无符号整型数组。可能,使用更广泛的数据类型(16/32位)可以进一步按比例加快速度。在

问题是:(如何)对于Cython,Python [binary/immutable] sequence types可以被类型化为unsigned int array?在


使用How to convert python array to cython array?中建议的数组似乎并不能加快速度(数组需要事先从bytes对象创建),也不能将参数键入list而不是object(与no type相同)或使用for loop代替list comprehension:

^{pr2}$

我想我之前的测试只是指定了一个数组.array作为输入,但是根据评论我现在已经尝试过了

from cpython cimport array
import array

def packa(array.array a):
    """Cythonize python nibble packing loop, typed"""
    cdef unsigned int n = len(a)//2
    cdef unsigned int i
    cdef unsigned int b[256*64/2]
    for i in range(n):
        b[i] = (a[i*2]//16)<<4 | a[i*2+1]//16;

    cdef array.array c = array.array("B", b)
    return c

它编译但是

ima = array.array("B", imd) # unsigned char (1 Byte)
pa = packa(ima)
packed = pa.tolist()

分段断层。 我发现文档有点少,所以这里有什么问题以及如何为输出数据分配数组的任何提示都是值得赞赏的。在


采用@ead的第一种方法,加上合并除法和移位(似乎可以节省一微秒:
#cython: boundscheck=False, wraparound=False

def packa(char[::1] a):
    """Cythonize python nibble packing loop, typed with array"""

    cdef unsigned int n = len(a)//2
    cdef unsigned int i

    # cdef unsigned int b[256*64/2]
    cdef array.array res = array.array('B', [])
    array.resize(res, n)

    for i in range(n):
        res.data.as_chars[i] = ( a[i*2] & 0xF0 ) | (a[i*2+1] >> 4);

    return res

编译时间更长,但运行速度更快:

python3 -m timeit -s 'from pack import packa; import array; data = array.array("B", bytes([0]*256*64))' 'packa(data)'
1000 loops, best of 3: 236 usec per loop

太棒了!但是,使用额外的字节到数组和数组到列表的转换

ima = array.array("B", imd) # unsigned char (1 Byte)
pa = packa(ima)
packed = pa.tolist() # bytes would probably also do

现在只需1.7毫秒-非常酷!在


下降至150 us定时或约0.4 ms实际值:

from cython cimport boundscheck, wraparound
from cpython cimport array
import array

@boundscheck(False)
@wraparound(False)
def pack(const unsigned char[::1] di):
    cdef:
        unsigned int i, n = len(di)
        unsigned char h, l, r
        array.array do = array.array('B')

    array.resize(do, n>>1)

    for i in range(0, n, 2):
        h = di[i] & 0xF0
        l = di[i+1] >> 4
        r = h | l
        do.data.as_uchars[i>>1] = r

    return do

当我把这个数组转换成10兆赫时,结果就不一样了。在


Tags: loopfordata字节def数组arraydo
1条回答
网友
1楼 · 发布于 2024-06-16 10:21:45

如果你想和C一样快,你不应该在里面使用python整数的list,而应该使用array.array。使用cython+array.array,python+列表代码的速度可能提高140左右。在

下面是一些如何使用cython使代码更快的方法。作为基准,我选择一个包含1000个元素的列表(足够大,缓存未命中还没有影响):

import random
l=[random.randint(0,15) for _ in range(1000)]

作为基线,您的python实现包含以下列表:

^{pr2}$

顺便说一下,我使用%而不是//,这可能是您想要的,否则您将只得到0s(只有较低的比特在您的描述中有数据)。在

在使用相同的函数(使用%%cython-magic)后,我们得到大约2:

%timeit packx(l)
77.6 µs ± 1.28 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

让我们看看由选项-a生成的html,我们看到了与for-循环对应的行的以下内容:

..... 
__pyx_t_2 = PyNumber_Multiply(__pyx_v_i, __pyx_int_2); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 6, __pyx_L1_error)
 __Pyx_GOTREF(__pyx_t_2);
 __pyx_t_5 = PyObject_GetItem(__pyx_v_it, __pyx_t_2); if (unlikely(!__pyx_t_5)) __PYX_ERR(0, 6, __pyx_L1_error)
 __Pyx_GOTREF(__pyx_t_5);
 __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0;
 __pyx_t_2 = __Pyx_PyInt_RemainderObjC(__pyx_t_5, __pyx_int_16, 16, 0); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 6, __pyx_L1_error)
...

Py_NumberMultiply意味着我们使用慢python乘法,Pyx_DECREF-所有的临时对象都是慢python对象。我们需要改变这种状况!在

让我们将一个array.array字节传递给函数,并返回array.array字节。列表中有完整的python对象,array.array速度更快的低级原始c-data:

%%cython
from cpython cimport array
def cy_apackx(char[::1] it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef array.array res = array.array('b', [])
    array.resize(res, n)
    for i in range(n):
        res.data.as_chars[i] = (it[i*2]%16)<<4 | it[i*2+1]%16
    return res

import array
a=array.array('B', l)
%timeit cy_apackx(a)
19.2 µs ± 316 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

更好,但是让我们看看生成的html,仍然有一些缓慢的python代码:

 __pyx_t_2 = __Pyx_PyInt_From_long(((__Pyx_mod_long((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_7)) ))), 16) << 4) | __Pyx_mod_long((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_8)) ))), 16))); if (unlikely(!__pyx_t_2)) __PYX_ERR(0, 9, __pyx_L1_error)
__Pyx_GOTREF(__pyx_t_2);
 if (unlikely(__Pyx_SetItemInt(((PyObject *)__pyx_v_res), __pyx_v_i, __pyx_t_2, unsigned int, 0, __Pyx_PyInt_From_unsigned_int, 0, 0, 1) < 0)) __PYX_ERR(0, 9, __pyx_L1_error)
 __Pyx_DECREF(__pyx_t_2); __pyx_t_2 = 0;

对于数组(__Pax_SetItemInt),我们仍然使用python setter,为此需要一个python objecct __pyx_t_2,为了避免这种情况,我们使用array.data.as_chars

%%cython
from cpython cimport array
def cy_apackx(char[::1] it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef array.array res = array.array('B', [])
    array.resize(res, n)
    for i in range(n):
        res.data.as_chars[i] = (it[i*2]%16)<<4 | it[i*2+1]%16  ##HERE!
return res

%timeit cy_apackx(a)
1.86 µs ± 30.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

好多了,但是让我们再看一次html,我们看到一些调用__Pyx_RaiseBufferIndexError-这种安全性需要一些时间,所以让我们关闭它:

%%cython
from cpython cimport array    
import cython
@cython.boundscheck(False) # switch of safety-checks
@cython.wraparound(False) # switch of safety-checks
def cy_apackx(char[::1] it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef array.array res = array.array('B', [])
    array.resize(res, n)
    for i in range(n):
        res.data.as_chars[i] = (it[i*2]%16)<<4 | it[i*2+1]%16  ##HERE!
    return res

%timeit cy_apackx(a)
1.53 µs ± 11.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

当我们查看生成的html时,我们看到:

__pyx_t_7 = (__pyx_v_i * 2);
__pyx_t_8 = ((__pyx_v_i * 2) + 1);
(__pyx_v_res->data.as_chars[__pyx_v_i]) = ((__Pyx_mod_long((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_7)) ))), 16) << 4) | __Pyx_mod_long((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_8)) ))), 16));

没有Python的东西!到目前为止还不错。但是,我不确定__Pyx_mod_long,它的定义是:

static CYTHON_INLINE long __Pyx_mod_long(long a, long b) {
   long r = a % b;
   r += ((r != 0) & ((r ^ b) < 0)) * b;
   return r;
}

因此,C和Python在负数mod上存在差异,必须加以考虑。这个函数定义,尽管是内联的,但会阻止C编译器将a%16优化为a&15。我们只有正数,所以不需要关心它们,因此我们需要自己做a&15-技巧:

%%cython
from cpython cimport array
import cython
@cython.boundscheck(False)
@cython.wraparound(False)
def cy_apackx(char[::1] it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef array.array res = array.array('B', [])
    array.resize(res, n)
    for i in range(n):
        res.data.as_chars[i] = (it[i*2]&15)<<4 | (it[i*2+1]&15)
    return res

%timeit cy_apackx(a)
1.02 µs ± 8.63 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

我也很满意得到的C-code/html(只有一行):

(__pyx_v_res->data.as_chars[__pyx_v_i]) = ((((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_7)) ))) & 15) << 4) | ((*((char *) ( /* dim=0 */ ((char *) (((char *) __pyx_v_it.data) + __pyx_t_8)) ))) & 15));

结论:总而言之,这意味着速度提高了140(140µs vs 1.02µs)-不错!另一个有趣的地方是:计算本身大约需要2µs(这包括不太理想的边界检查和除法)——138µs用于创建、注册和删除临时python对象。在


如果您需要高位,并且可以假定低位没有污垢(否则&250可以帮助),您可以使用:

from cpython cimport array
import cython
@cython.boundscheck(False)
@cython.wraparound(False)
def cy_apackx(char[::1] it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef array.array res = array.array('B', [])
    array.resize(res, n)
    for i in range(n):
        res.data.as_chars[i] = it[i*2] | (it[i*2+1]>>4)
    return res

%timeit cy_apackx(a)
819 ns ± 8.24 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

另一个有趣的问题是,如果使用list,哪些成本具有操作性。如果我们从“改进”版本开始:

%%cython
def cy_packx(it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    res=[0]*n
    for i in range(n):
        res[i] = it[i*2] | (it[i*2+1]>>4))
    return res

%timeit cy_packx(l)
20.7 µs ± 450 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

我们看到,减少整数运算的数量会导致很大的加速。这是因为python整数是不可变的,并且每个操作都会创建一个新的临时对象,这是非常昂贵的。消除操作也意味着消除昂贵的临时性。在

但是,it[i*2] | (it[i*2+1]>>4)是用python integer完成的,下一步我们将其设为cdef-操作:

%%cython   
def cy_packx(it):
    cdef unsigned int n = len(it)//2
    cdef unsigned int i
    cdef unsigned char a,b
    res=[0]*n
    for i in range(n):
        a=it[i*2]
        b=it[i*2+1]  # ensures next operations are fast
        res[i]= a | (b>>4)
    return res   

    %timeit cy_packx(l)
    7.3 µs ± 880 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

我不知道如何进一步改进它,因此list的值为7.3µs,而array.array为1µs。在


最后一个问题,列表版本的成本明细是什么?为了避免被C编译器优化,我们使用了一个稍微不同的基线函数:

%%cython
def cy_packx(it):
        cdef unsigned int n = len(it)//2
        cdef unsigned int i
        cdef unsigned char a,b
        cdef unsigned char s = 0
        res=[0]*n
        for i in range(n):
            a=it[i*2]
            b=it[i*2+1]  # ensures next operations are fast
            s+=a | (b>>4)
            res[i]= s
        return res
%timeit cy_packx(l)

In [79]: %timeit cy_packx(l)
7.67 µs ± 106 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

使用s变量意味着,它在第二个版本中不会得到优化:

%%cython   
def cy_packx(it):
        cdef unsigned int n = len(it)//2
        cdef unsigned int i
        cdef unsigned char a,b
        cdef unsigned char s = 0
        res=[0]*n
        for i in range(n):
            a=it[i*2]
            b=it[i*2+1]  # ensures next operations are fast
            s+=a | (b>>4)
        res[0]=s
        return res 

In [81]: %timeit cy_packx(l)
5.46 µs ± 72.7 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

创建新整数对象的成本约为2µs或约30%。内存分配的成本是多少?在

%%cython   
def cy_packx(it):
        cdef unsigned int n = len(it)//2
        cdef unsigned int i
        cdef unsigned char a,b
        cdef unsigned char s = 0
        for i in range(n):
            a=it[i*2]
            b=it[i*2+1]  # ensures next operations are fast
            s+=a | (b>>4)
        return s 

In [84]: %timeit cy_packx(l)
3.84 µs ± 43.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

这将导致以下列表版本的性能细分:

                    Time(in µs)      Percentage(in %)
     all                7.7                 100
     calculation          1                  12
     alloc memory       1.6                  21
     create ints        2.2                  29
     access data/cast   2.6                  38

我必须承认,我期望create ints要扮演更大的角色,而不是访问列表中的数据并将其强制转换为chars,这将花费大量成本。在

相关问题 更多 >