我有一个很长的数组
arr = np.array([1,1,2,2,3,3,0,0,2,2])
我想在 numba cuda 中删除该数组的所有零值,因为实际数组非常大并且 numpy 非常慢。
有谁知道如何做到这一点?
正如评论中已经提到的,您正在寻找的算法称为“流压缩”。由于上述算法在numba
中无法开箱即用,因此必须手动实现。
基本上,过程如下:
创建输入值的二进制掩码以识别非零值。
假设输入数组的索引号 5 处有一个非零值。我们选择前缀和数组索引号 5 处的值(假设该值为 3)。使用该值作为输出索引。这意味着输入的索引号 5 处的元素将转到最终输出中的索引号 3。
前缀和的计算是整个过程中的难点。我擅自将 C++ 版本的前缀和算法(改编自 GPU Gems 3 书)移植到了
numba
。如果我们自己实现前缀和,我们可以将步骤 1 和 2 合并到单个 CUDA 内核中。
以下是基于
numba
的流压缩的完整工作示例以及所提供的用例。
import numpy as np
from numba import cuda, int32
BLOCK_SIZE = 8
#CUDA kernel to calculate prefix sum of each block of input array
@cuda.jit('void(int32[:], int32[:], int32[:], int32)')
def prefix_sum_nzmask_block(a, b, s, length):
ab = cuda.shared.array(shape=(BLOCK_SIZE), dtype=int32)
tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x;
if tid < length:
ab[cuda.threadIdx.x] = int32(a[tid] != 0); #Load mask of input data into shared memory
i = 1
while i<=cuda.threadIdx.x:
cuda.syncthreads() #Total number of cuda.synchthread calls = log2(BLOCK_SIZE).
ab[cuda.threadIdx.x] += ab[cuda.threadIdx.x - i] #Perform scan on shared memory
i *= 2
b[tid] = ab[cuda.threadIdx.x]; #Write scanned blocks to global memory
if(cuda.threadIdx.x == cuda.blockDim.x-1): #Last thread of block
s[cuda.blockIdx.x] = ab[cuda.threadIdx.x]; #Write last element of shared memory into global memory
#CUDA kernel to merge the prefix sums of individual blocks
@cuda.jit('void(int32[:], int32[:], int32)')
def prefix_sum_merge_blocks(b, s, length):
tid = (cuda.blockIdx.x + 1) * cuda.blockDim.x + cuda.threadIdx.x; #Skip first block
if tid<length:
i = 0
while i<=cuda.blockIdx.x:
b[tid] += s[i] #Accumulate last elements of all previous blocks
i += 1
#CUDA kernel to copy non-zero entries to the correct index of the output array
@cuda.jit('void(int32[:], int32[:], int32[:], int32)')
def map_non_zeros(a, prefix_sum, nz, length):
tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x;
if tid < length:
input_value = a[tid]
if input_value != 0:
index = prefix_sum[tid] #The correct output index is the value at current index of prefix sum array
nz[index-1] = input_value
#Apply stream compaction algorithm to get only the non-zero entries from the input array
def get_non_zeros(a):
length = a.shape[0]
block = BLOCK_SIZE
grid = int((length + block - 1)/block)
#Create auxiliary array to hold the sum of each block
block_sum = cuda.device_array(shape=(grid), dtype=np.int32)
#Copy input array from host to device
ad = cuda.to_device(a)
#Create prefix sum output array
bd = cuda.device_array_like(ad)
#Perform partial scan of each block. Store block sum in auxillary array named block_sum.
prefix_sum_nzmask_block[grid, block](ad, bd, block_sum, length)
#Add block sum to the output
prefix_sum_merge_blocks[grid, block](bd,block_sum,length);
#The last element of prefix sum contains the total number of non-zero elements
non_zero_count = int(bd[bd.shape[0]-1])
#Create device output array to hold ONLY the non-zero entries
non_zeros = cuda.device_array(shape=(non_zero_count), dtype=np.int32)
#Copy ONLY the non-zero entries
map_non_zeros[grid, block](a, bd, non_zeros, length)
#Return to host
return non_zeros.copy_to_host()
if __name__ == '__main__':
arr = np.array([1,1,2,2,3,3,0,0,2,2], dtype=np.int32)
nz = get_non_zeros(arr)
print(nz)
在以下环境下测试:虚拟环境中的Python 3.6.7
CUDA 10.0.130
NVIDIA 驱动程序 410.48
麻木0.42
Ubuntu 18.04
免责声明:该代码仅用于演示目的,尚未针对性能测量进行严格的分析/测试。
存在一些问题,我碰巧仔细查看了,所以我也将在这里分享我的发现。
cuda-memcheck
或
compute-sanitizer
检测到)。
cuda.synchronize()
(在条件代码中使用,其中条件并不总是在整个扭曲中评估相同的值)。
b[tid] = ab[cuda.threadIdx.x]; #Write scanned blocks to global memory
未通过
if tid < length:
正确调节,因此,对于某些输入配置,可以对全局空间执行非法写入。
prefix_sum_nzmask_block
内核设计得相当好,但
prefix_sum_merge_blocks
内核则不然,特别是对于较大的测试用例,它将主导执行时间。
$ cat t71.py
import numpy as np
from numba import cuda, int32
BSP2 = 9 # possible values are integers from 1`to 10, 9 corresponds to 512 threads per block (2^9)
BLOCK_SIZE = 2**BSP2
#CUDA kernel to calculate prefix sum of each block of input array
#@cuda.jit('void(int32[:], int32[:], int32[:], int32, int32)')
@cuda.jit
def prefix_sum_nzmask_block(a, b, s, nzm, length):
ab = cuda.shared.array(shape=(BLOCK_SIZE), dtype=int32)
tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x;
ab[cuda.threadIdx.x] = 0
if tid < length:
if nzm == 1:
ab[cuda.threadIdx.x] = int32(a[tid] != 0); #Load mask of input data into shared memory
else:
ab[cuda.threadIdx.x] = int32(a[tid]); #Load input data into shared memory
for j in range(0,BSP2):
i = 2**j
cuda.syncthreads()
if i <= cuda.threadIdx.x:
temp = ab[cuda.threadIdx.x]
temp += ab[cuda.threadIdx.x - i] #Perform scan on shared memory
cuda.syncthreads()
if i <= cuda.threadIdx.x:
ab[cuda.threadIdx.x] = temp
if tid < length:
b[tid] = ab[cuda.threadIdx.x]; #Write scanned blocks to global memory
if(cuda.threadIdx.x == cuda.blockDim.x-1): #Last thread of block
s[cuda.blockIdx.x] = ab[cuda.threadIdx.x]; #Write last element of shared memory into global memory
#CUDA kernel to merge the prefix sums of individual blocks
@cuda.jit('void(int32[:], int32[:], int32)')
def pref_sum_update(b, s, length):
tid = (cuda.blockIdx.x + 1) * cuda.blockDim.x + cuda.threadIdx.x; #Skip first block
if tid<length:
b[tid] += s[cuda.blockIdx.x] #Accumulate last elements of all previous blocks
#CUDA kernel to copy non-zero entries to the correct index of the output array
@cuda.jit('void(int32[:], int32[:], int32[:], int32)')
def map_non_zeros(a, prefix_sum, nz, length):
tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x;
if tid < length:
input_value = a[tid]
if input_value != 0:
index = prefix_sum[tid] #The correct output index is the value at current index of prefix sum array
nz[index-1] = input_value
#Recursive prefix sum (inclusive scan)
#a and asum should already be in device memory, nzm determines whether first step is masking values (nzm = 1) or not
def pref_sum(a, asum, nzm):
block = BLOCK_SIZE
length = a.shape[0]
grid = int((length + block -1)/block)
#Create auxiliary array to hold the sum of each block
bs = cuda.device_array(shape=(grid), dtype=np.int32)
#Perform partial scan of each block. Store block sum in auxillary array.
prefix_sum_nzmask_block[grid, block](a, asum, bs, nzm, length)
if grid > 1:
bssum = cuda.device_array(shape=(grid), dtype=np.int32)
pref_sum(bs, bssum, 0) #recursively build complete prefix sum
pref_sum_update[grid-1, block](asum, bssum, length)
#Apply stream compaction algorithm to get only the non-zero entries from the input array
def get_non_zeros(a):
#Copy input array from host to device
ad = cuda.to_device(a)
#Create prefix sum output array
bd = cuda.device_array_like(ad)
#Perform full prefix sum
pref_sum(ad, bd, int(1))
#The last element of prefix sum contains the total number of non-zero elements
non_zero_count = int(bd[bd.shape[0]-1])
#Create device output array to hold ONLY the non-zero entries
non_zeros = cuda.device_array(shape=(non_zero_count), dtype=np.int32)
#Copy ONLY the non-zero entries
block = BLOCK_SIZE
length = a.shape[0]
grid = int((length + block -1)/block)
map_non_zeros[grid, block](ad, bd, non_zeros, length)
#Return to host
return non_zeros.copy_to_host()
if __name__ == '__main__':
arr_size = 5000000
arr = np.zeros(arr_size, dtype=np.int32)
for i in range(32,arr_size, 3):
arr[i] = i
print(arr_size)
nz = get_non_zeros(arr)
print(nz)
$ cuda-memcheck python t71.py
========= CUDA-MEMCHECK
5000000
[ 32 35 38 ..., 4999991 4999994 4999997]
========= ERROR SUMMARY: 0 errors
$