简单的Python挑战:数据缓冲区上的最快位运算XOR
挑战:
对两个大小相同的缓冲区进行按位异或操作。这些缓冲区需要是Python中的str
类型,因为在Python中,通常使用这种类型来处理数据缓冲区。最后将结果以str
的形式返回。尽量做到快速。
输入是两个1兆字节(2的20次方字节)的字符串。
这个挑战的目标是大幅度超过我现有的低效算法,可以使用Python或者现有的第三方Python模块(规则放宽:也可以自己创建模块)。微小的改进是没有意义的。
from os import urandom
from numpy import frombuffer,bitwise_xor,byte
def slow_xor(aa,bb):
a=frombuffer(aa,dtype=byte)
b=frombuffer(bb,dtype=byte)
c=bitwise_xor(a,b)
r=c.tostring()
return r
aa=urandom(2**20)
bb=urandom(2**20)
def test_it():
for x in xrange(1000):
slow_xor(aa,bb)
11 个回答
这是我在使用cython时的结果
slow_xor 0.456888198853
faster_xor 0.400228977203
cython_xor 0.232881069183
cython_xor_vectorised 0.171468019485
在cython中进行向量化处理,可以让我电脑上的for循环速度提高大约25%。不过,大部分时间还是花在构建Python字符串上(也就是return
语句的部分)——我觉得这个额外的复制是无法避免的(合法情况下),因为数组可能包含空字节。
如果用不合法的方式,可以直接传入一个Python字符串并在原地修改,这样函数的速度会翻倍。
xor.py
from time import time
from os import urandom
from numpy import frombuffer,bitwise_xor,byte,uint64
import pyximport; pyximport.install()
import xor_
def slow_xor(aa,bb):
a=frombuffer(aa,dtype=byte)
b=frombuffer(bb,dtype=byte)
c=bitwise_xor(a,b)
r=c.tostring()
return r
def faster_xor(aa,bb):
a=frombuffer(aa,dtype=uint64)
b=frombuffer(bb,dtype=uint64)
c=bitwise_xor(a,b)
r=c.tostring()
return r
aa=urandom(2**20)
bb=urandom(2**20)
def test_it():
t=time()
for x in xrange(100):
slow_xor(aa,bb)
print "slow_xor ",time()-t
t=time()
for x in xrange(100):
faster_xor(aa,bb)
print "faster_xor",time()-t
t=time()
for x in xrange(100):
xor_.cython_xor(aa,bb)
print "cython_xor",time()-t
t=time()
for x in xrange(100):
xor_.cython_xor_vectorised(aa,bb)
print "cython_xor_vectorised",time()-t
if __name__=="__main__":
test_it()
xor_.pyx
cdef char c[1048576]
def cython_xor(char *a,char *b):
cdef int i
for i in range(1048576):
c[i]=a[i]^b[i]
return c[:1048576]
def cython_xor_vectorised(char *a,char *b):
cdef int i
for i in range(131094):
(<unsigned long long *>c)[i]=(<unsigned long long *>a)[i]^(<unsigned long long *>b)[i]
return c[:1048576]
性能比较:numpy vs. Cython vs. C vs. Fortran vs. Boost.Python (pyublas)
| function | time, usec | ratio | type |
|------------------------+------------+-------+--------------|
| slow_xor | 2020 | 1.0 | numpy |
| xorf_int16 | 1570 | 1.3 | fortran |
| xorf_int32 | 1530 | 1.3 | fortran |
| xorf_int64 | 1420 | 1.4 | fortran |
| faster_slow_xor | 1360 | 1.5 | numpy |
| inline_xor | 1280 | 1.6 | C |
| cython_xor | 1290 | 1.6 | cython |
| xorcpp_inplace (int32) | 440 | 4.6 | pyublas |
| cython_xor_vectorised | 325 | 6.2 | cython |
| inline_xor_nocopy | 172 | 11.7 | C |
| xorcpp | 144 | 14.0 | boost.python |
| xorcpp_inplace | 122 | 16.6 | boost.python |
#+TBLFM: $3=@2$2/$2;%.1f
要复现这些结果,首先下载 http://gist.github.com/353005,然后输入 make
(如果需要安装依赖,可以输入:sudo apt-get install build-essential python-numpy python-scipy cython gfortran
,注意 Boost.Python
和 pyublas
的依赖没有包含,因为它们需要手动设置才能正常工作)
这里提到的内容:
slow_xor()
是来自提问者的问题faster_slow_xor()
、inline_xor()
、inline_xor_nocopy()
是来自 @Torsten Marek的回答cython_xor()
和cython_vectorised()
是来自 @gnibbler的回答
而 xor_$type_sig()
是:
! xorf.f90.template
subroutine xor_$type_sig(a, b, n, out)
implicit none
integer, intent(in) :: n
$type, intent(in), dimension(n) :: a
$type, intent(in), dimension(n) :: b
$type, intent(out), dimension(n) :: out
integer i
forall(i=1:n) out(i) = ieor(a(i), b(i))
end subroutine xor_$type_sig
在Python中使用的方法如下:
import xorf # extension module generated from xorf.f90.template
import numpy as np
def xor_strings(a, b, type_sig='int64'):
assert len(a) == len(b)
a = np.frombuffer(a, dtype=np.dtype(type_sig))
b = np.frombuffer(b, dtype=np.dtype(type_sig))
return getattr(xorf, 'xor_'+type_sig)(a, b).tostring()
xorcpp_inplace()
(Boost.Python, pyublas):
#include <inttypes.h>
#include <algorithm>
#include <boost/lambda/lambda.hpp>
#include <boost/python.hpp>
#include <pyublas/numpy.hpp>
namespace {
namespace py = boost::python;
template<class InputIterator, class InputIterator2, class OutputIterator>
void
xor_(InputIterator first, InputIterator last,
InputIterator2 first2, OutputIterator result) {
// `result` migth `first` but not any of the input iterators
namespace ll = boost::lambda;
(void)std::transform(first, last, first2, result, ll::_1 ^ ll::_2);
}
template<class T>
py::str
xorcpp_str_inplace(const py::str& a, py::str& b) {
const size_t alignment = std::max(sizeof(T), 16ul);
const size_t n = py::len(b);
const char* ai = py::extract<const char*>(a);
char* bi = py::extract<char*>(b);
char* end = bi + n;
if (n < 2*alignment)
xor_(bi, end, ai, bi);
else {
assert(n >= 2*alignment);
// applying Marek's algorithm to align
const ptrdiff_t head = (alignment - ((size_t)bi % alignment))% alignment;
const ptrdiff_t tail = (size_t) end % alignment;
xor_(bi, bi + head, ai, bi);
xor_((const T*)(bi + head), (const T*)(end - tail),
(const T*)(ai + head),
(T*)(bi + head));
if (tail > 0) xor_(end - tail, end, ai + (n - tail), end - tail);
}
return b;
}
template<class Int>
pyublas::numpy_vector<Int>
xorcpp_pyublas_inplace(pyublas::numpy_vector<Int> a,
pyublas::numpy_vector<Int> b) {
xor_(b.begin(), b.end(), a.begin(), b.begin());
return b;
}
}
BOOST_PYTHON_MODULE(xorcpp)
{
py::def("xorcpp_inplace", xorcpp_str_inplace<int64_t>); // for strings
py::def("xorcpp_inplace", xorcpp_pyublas_inplace<int32_t>); // for numpy
}
在Python中使用的方法如下:
import os
import xorcpp
a = os.urandom(2**20)
b = os.urandom(2**20)
c = xorcpp.xorcpp_inplace(a, b) # it calls xorcpp_str_inplace()
第一次尝试
使用 scipy.weave
和 SSE2 的一些功能,效果稍微好了一点。第一次运行的时候稍微慢一些,因为代码需要从硬盘加载并缓存,之后再运行就快了:
import numpy
import time
from os import urandom
from scipy import weave
SIZE = 2**20
def faster_slow_xor(aa,bb):
b = numpy.fromstring(bb, dtype=numpy.uint64)
numpy.bitwise_xor(numpy.frombuffer(aa,dtype=numpy.uint64), b, b)
return b.tostring()
code = """
const __m128i* pa = (__m128i*)a;
const __m128i* pend = (__m128i*)(a + arr_size);
__m128i* pb = (__m128i*)b;
__m128i xmm1, xmm2;
while (pa < pend) {
xmm1 = _mm_loadu_si128(pa); // must use unaligned access
xmm2 = _mm_load_si128(pb); // numpy will align at 16 byte boundaries
_mm_store_si128(pb, _mm_xor_si128(xmm1, xmm2));
++pa;
++pb;
}
"""
def inline_xor(aa, bb):
a = numpy.frombuffer(aa, dtype=numpy.uint64)
b = numpy.fromstring(bb, dtype=numpy.uint64)
arr_size = a.shape[0]
weave.inline(code, ["a", "b", "arr_size"], headers = ['"emmintrin.h"'])
return b.tostring()
第二次尝试
根据大家的评论,我重新检查了代码,想看看能不能避免复制的过程。结果发现我之前理解字符串对象的文档有误,所以这是我的第二次尝试:
support = """
#define ALIGNMENT 16
static void memxor(const char* in1, const char* in2, char* out, ssize_t n) {
const char* end = in1 + n;
while (in1 < end) {
*out = *in1 ^ *in2;
++in1;
++in2;
++out;
}
}
"""
code2 = """
PyObject* res = PyString_FromStringAndSize(NULL, real_size);
const ssize_t tail = (ssize_t)PyString_AS_STRING(res) % ALIGNMENT;
const ssize_t head = (ALIGNMENT - tail) % ALIGNMENT;
memxor((const char*)a, (const char*)b, PyString_AS_STRING(res), head);
const __m128i* pa = (__m128i*)((char*)a + head);
const __m128i* pend = (__m128i*)((char*)a + real_size - tail);
const __m128i* pb = (__m128i*)((char*)b + head);
__m128i xmm1, xmm2;
__m128i* pc = (__m128i*)(PyString_AS_STRING(res) + head);
while (pa < pend) {
xmm1 = _mm_loadu_si128(pa);
xmm2 = _mm_loadu_si128(pb);
_mm_stream_si128(pc, _mm_xor_si128(xmm1, xmm2));
++pa;
++pb;
++pc;
}
memxor((const char*)pa, (const char*)pb, (char*)pc, tail);
return_val = res;
Py_DECREF(res);
"""
def inline_xor_nocopy(aa, bb):
real_size = len(aa)
a = numpy.frombuffer(aa, dtype=numpy.uint64)
b = numpy.frombuffer(bb, dtype=numpy.uint64)
return weave.inline(code2, ["a", "b", "real_size"],
headers = ['"emmintrin.h"'],
support_code = support)
不同之处在于字符串是在C代码内部分配的。由于SSE2指令要求内存地址要对齐到16字节的边界,所以在开始和结束的地方有些不对齐的内存区域需要用逐字节的方式来复制。
输入数据是通过numpy数组传入的,因为 weave
强制要求将Python的 str
对象复制到 std::string
中。使用 frombuffer
不会进行复制,所以这样是可以的,但内存并没有对齐到16字节,因此我们需要用 _mm_loadu_si128
而不是更快的 _mm_load_si128
。
我们用 _mm_stream_si128
代替 _mm_store_si128
,这样可以确保任何写入操作尽快写入主内存——这样输出数组就不会占用宝贵的缓存行。
时间测试
关于时间测试,slow_xor
在第一次编辑中提到的是我改进后的版本(内联位运算异或,uint64
),我去掉了这个混淆。slow_xor
指的是原始问题中的代码。所有的时间测试都是进行1000次运行得出的。
slow_xor
: 1.85秒 (1x)faster_slow_xor
: 1.25秒 (1.48x)inline_xor
: 0.95秒 (1.95x)inline_xor_nocopy
: 0.32秒 (5.78x)
这段代码是用gcc 4.4.3编译的,我已经确认编译器确实使用了SSE指令。