我有一个2D UINT8 numpy数组,大小为((149797,64)]。每个元素都是0或1。我想将每行中的这些二进制值打包到UINT64值中,以便得到一个形状为149797的UINT64数组。我使用numpy bitpack函数尝试了以下代码。
test = np.random.randint(0, 2, (149797, 64),dtype=np.uint8)
col_pack=np.packbits(test.reshape(-1, 8, 8)[:, ::-1]).view(np.uint64)
packbits函数大约需要10 ms才能执行。这个数组本身的简单reshaping似乎要花费7 ms左右。我还尝试了通过移位操作在2d numpy数组上进行迭代以达到相同的结果。但速度没有改善。
最后,我也想使用numba进行编译。
@njit
def shifting(bitlist):
x=np.zeros(149797,dtype=np.uint64) #54
rows,cols=bitlist.shape
for i in range(0,rows): #56
out=0
for bit in range(0,cols):
out = (out << 1) | bitlist[i][bit] # If i comment out bitlist, time=190 microsec
x[i]=np.uint64(out) # Reduces time to microseconds if line is commented in njit
return x
使用njit大约需要6毫秒。
这里是并行的njit版本
@njit(parallel=True)
def shifting(bitlist):
rows,cols=149797,64
out=0
z=np.zeros(rows,dtype=np.uint64)
for i in prange(rows):
for bit in range(cols):
z[i] = (z[i] << 1) | bitlist[i,bit] # Time becomes 100 micro if i use 'out' instead of 'z[i] array'
return z
与5.6ms执行时间相比要好一些。当前,使用swapbytes(Paul's)方法的python解决方案似乎是最好的方法,即1.74 ms。
我们如何进一步加快转换速度?是否可以使用任何矢量化(或并行化),位数组等来实现加速?
您可以通过使用byteswap
而不是重新塑形来获得可观的加速:
test = np.random.randint(0, 2, (149797, 64),dtype=np.uint8)
np.packbits(test.reshape(-1, 8, 8)[:, ::-1]).view(np.uint64)
# array([ 1079982015491401631, 246233595099746297, 16216705265283876830,
# ..., 1943876987915462704, 14189483758685514703,
12753669247696755125], dtype=uint64)
np.packbits(test).view(np.uint64).byteswap()
# array([ 1079982015491401631, 246233595099746297, 16216705265283876830,
# ..., 1943876987915462704, 14189483758685514703,
12753669247696755125], dtype=uint64)
timeit(lambda:np.packbits(test.reshape(-1, 8, 8)[:, ::-1]).view(np.uint64),number=100)
# 1.1054180909413844
timeit(lambda:np.packbits(test).view(np.uint64).byteswap(),number=100)
# 0.18370431219227612
有点Numba解决方案(版本0.46 / Windows)。
代码
import numpy as np
import numba as nb
#with memory allocation
@nb.njit(parallel=True)
def shifting(bitlist):
assert bitlist.shape[1]==64
x=np.empty(bitlist.shape[0],dtype=np.uint64)
for i in nb.prange(bitlist.shape[0]):
out=np.uint64(0)
for bit in range(bitlist.shape[1]):
out = (out << 1) | bitlist[i,bit]
x[i]=out
return x
#without memory allocation
@nb.njit(parallel=True)
def shifting_2(bitlist,x):
assert bitlist.shape[1]==64
for i in nb.prange(bitlist.shape[0]):
out=np.uint64(0)
for bit in range(bitlist.shape[1]):
out = (out << 1) | bitlist[i,bit]
x[i]=out
return x
Timings
test = np.random.randint(0, 2, (149797, 64),dtype=np.uint8)
#If you call this function multiple times, only allocating memory
#once may be enough
x=np.empty(test.shape[0],dtype=np.uint64)
#Warmup first call takes significantly longer
res=shifting(test)
res=shifting_2(test,x)
%timeit res=shifting(test)
#976 µs ± 41.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit res=shifting_2(test,x)
#764 µs ± 63 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit np.packbits(test).view(np.uint64).byteswap()
#8.07 ms ± 52.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit np.packbits(test.reshape(-1, 8, 8)[:, ::-1]).view(np.uint64)
#17.9 ms ± 91 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)