Wow.. thats quite a bit of work. .. though I must admit on first
inspection of the kernel code it was evident that as the kernel goes
up and down tree there is a great deal of waiting.
Any how I don't quite have the energy to get another version working
in the near future. I'm leaving it to the community to use the code or
not.
Regards
Nithin
@Andreas: I've attached an improved version with a few bug fixes.
On 22 February 2011 23:24, Bryan Catanzaro <[email protected]> wrote:
> There has been lots of work on Parallel Prefix Sum on GPUs. The
> difference between implementations of Inclusive and Exclusive prefix
> sum is very minor, and both can be implemented efficiently.
> Focusing on work efficiency for GPUs is actually a distracting red
> herring, especially when you're using a fully parallel recursive
> parallel Prefix Sum, as you are doing. This is for two reasons:
> 1. GPUs are SIMD machines, so work efficiency as counted by the PRAM
> model does not always correlate to saving work on the actual hardware.
> 2. The work involved in synchronizing and coordinating threads is not
> counted as "work" in traditional analyses. On GPUs, floating-point
> operations are essentially free. Communication and coordination
> between parallel threads, however, is very expensive. Instead of
> doing a fully parallel recursive parallel prefix sum, it's much more
> efficient to sequentialize the sum as much as possible, expressing
> only enough parallelism to fill the chip.
>
> It's also advantageous to take advantage of commutativity - have you
> defined whether your scan implementation accepts associative but
> non-commutative operators? If you require operators to be both
> associative and commutative, you can improve performance
> significantly. I wrote up some things about reductions (which are
> related to scan) here:
> http://developer.amd.com/documentation/articles/pages/opencl-optimization-case-study-simple-reductions.aspx
>
> As you can see, the "2-phase" reduction kernels are much faster than
> the "recursive multi-phase" kernels. Your implementation uses the
> recursive multi-phase approach. There are two analogues of the
> 2-phase reduction kernel for scan on an n element array with p
> processors (thread blocks), both of which require exactly 3 kernel
> launches, regardless of n or p:
>
> * Reduce, Scan, Scan: Divide the input into blocks of n/p elements.
> Perform reductions on these blocks in parallel, write out the sum of
> each block to a temporary array. Perform a scan on the temporary
> array (this should only require 1 thread block and one kernel, since
> there are only p elements in the temporary array). Perform a scan on
> the input, this time starting each block with the appropriate result
> from the scan.
> * Scan, Scan, Add: Scan the blocks in parallel, write the results out.
> Perform a scan on the first result from each block (this should only
> require 1 thread block and one kernel launch, since there are only p
> blocks. Broadcast the appropriate result from the partial scan and
> perform vector addition in each block.
>
> If you sequentialize these operations as much as possible, and do
> parallel reduction/scan trees only when absolutely necessary, you will
> save a lot of synchronization and communication cost. You can take a
> look at Thrust [1] to see an example of how this can be done.
>
> Best of luck,
> bryan
>
> [1] Thrust: http://code.google.com/p/thrust
>
>
>
> On Tue, Feb 22, 2011 at 1:47 AM, nithin s <[email protected]> wrote:
>> Hi Andreas
>>
>> On 22 February 2011 05:45, Andreas Kloeckner <[email protected]> wrote:
>>> Hi Nithin,
>>>
>>> two requests:
>>>
>>> - can you please resend this as an attachment? It's hard to fish out of
>>> the text of an email.
>> Done
>>>
>>> - please avoid using floating point functions (log, ceil, floor) in
>>> integer contexts. PyCUDA comes with a bitlog2 function that does what
>>> you need, I think.
>>
>> bitlog2 alone doesn't cut it. This is becase the log is taken to the
>> base 2*block_size. block_size need not be a power of 2 in a few rare
>> cases. This is because if shared mem is limited then the block_size =
>> shared_mem/item_size. Now Item size need not be a power of 2 (If we
>> are willing to support arbitrary types.. though there is a limitation
>> .. since dtype needs to be known for partial sum array
>> allocations..which is presumably numpy.dtype).
>>
>> This will mess up the estimate. I could recode this by writing a
>> routine by repeatedly dividing and calculating the necessary int ciel.
>> I feel the current expression is cleaner and concise. Let me know if
>> you still feel otherwise.
>>
>>>
>>> Once I get the file posted on the PyCUDA branch, I'll write a more
>>> complete review. I agree with your assessment of inclusive vs exclusive
>>> scan. I'd say go ahead and kill the inclusive version.
>>>
>>> Tomasz, what's your take here?
>>>
>>> Andreas
>>>
>>>
>>
>> Regards
>>
>> Nithin
>>
>>
>> @Bryan: Tomaszs' original inclusive scan impl was based on the naive
>> prefix scan algorithm at http://en.wikipedia.org/wiki/Prefix_sum. This
>> is not particularly work efficient. I don't (yet) see a neat way to
>> convert the Exclusive Mark Harris version to an inclusive one.Thus I
>> thought it better to maintain a single exclusive prefix scan.
>>
>> _______________________________________________
>> PyCUDA mailing list
>> [email protected]
>> http://lists.tiker.net/listinfo/pycuda
>>
>>
>
from pycuda import driver, compiler, gpuarray, tools, reduction
from warnings import warn
exclusive_scan_source = """
%(preamble)s
#define SUM(a, b) (%(scan_operation)s)
__device__ unsigned int %(name)s_next_power_of_2(unsigned int v)
{
v -= 1;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
return v + 1;
}
extern "C"
{
__global__ void %(name)s(const %(data_type)s *i_data,%(data_type)s *o_data
%(if_part_sum)s ,%(data_type)s *partial_sum
,const int n
%(if_tail)s ,const int block_index
)
{
extern __shared__ %(data_type)s shared_array[];
int source_node_index = threadIdx.x;
// Size of shared array that can contain input data
%(if_main)s const int full_array_size = n;
%(if_tail)s const int full_array_size = %(name)s_next_power_of_2(n);
%(if_main)s const int block_index = blockIdx.x;
const int block_offset = 2*block_index*blockDim.x;
int tree_node_distance = 1;
int target_node_index = threadIdx.x + (n/2);
%(if_tail)s if (source_node_index < n)
{
shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] =
i_data[source_node_index + block_offset];
}
%(if_tail)s else {shared_array[source_node_index + SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;}
%(if_tail)s if (target_node_index < n)
{
shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] =
i_data[target_node_index + block_offset];
}
%(if_tail)s else {shared_array[target_node_index + SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;}
// Travel upwards, from leaves to the root of the tree
// During each loop time distance between nodes increases two times,
// and number of nodes decreases two times
for (int number_of_nodes = full_array_size>>1; number_of_nodes > 0;
number_of_nodes >>= 1)
{
__syncthreads();
if (threadIdx.x < number_of_nodes)
{
int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1;
int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1;
source_node_index += SHARED_BANK_PADDING(source_node_index);
target_node_index += SHARED_BANK_PADDING(target_node_index);
shared_array[target_node_index] =
SUM(shared_array[target_node_index],
shared_array[source_node_index]);
}
tree_node_distance <<= 1;
}
if (threadIdx.x == 0)
{
%(if_part_sum)s partial_sum[block_index] = shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)];
shared_array[full_array_size-1 + SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s;
}
// Travel downwards, from root to the leaves
// During each loop number of nodes increases two times and
// distance between nodes decreases two times
for (int number_of_nodes = 1; number_of_nodes < full_array_size; number_of_nodes <<= 1)
{
tree_node_distance >>= 1;
__syncthreads();
if (threadIdx.x < number_of_nodes)
{
int source_node_index = tree_node_distance*(2*threadIdx.x+1)-1;
int target_node_index = tree_node_distance*(2*threadIdx.x+2)-1;
source_node_index += SHARED_BANK_PADDING(source_node_index);
target_node_index += SHARED_BANK_PADDING(target_node_index);
%(data_type)s temp = shared_array[source_node_index];
shared_array[source_node_index] = shared_array[target_node_index];
shared_array[target_node_index] =
SUM(shared_array[target_node_index], temp);
}
}
__syncthreads();
%(if_tail)s if (source_node_index < n)
{
o_data[source_node_index + block_offset] =
shared_array[source_node_index +
SHARED_BANK_PADDING(source_node_index)];
}
%(if_tail)s if (target_node_index < n)
{
o_data[target_node_index + block_offset] =
shared_array[target_node_index +
SHARED_BANK_PADDING(target_node_index)];
}
}
}
"""
uniform_sum_source = """
%(preamble)s
#define SUM(a, b) (%(scan_operation)s)
extern "C"
{
__global__ void %(name)s(%(data_type)s *o_data,%(data_type)s *partial_sum
,const int n
%(if_tail)s ,const int block_index
)
{
extern __shared__ %(data_type)s shared_array[];
int source_node_index = threadIdx.x;
// Size of shared array that can contain input data
%(if_main)s const int block_index = blockIdx.x+1;
const int block_offset = 2*block_index*blockDim.x;
int target_node_index = threadIdx.x + ((n==1)?(1):(n/2));
if (threadIdx.x == 0)
{
shared_array[0] = partial_sum[block_index];
}
__syncthreads();
%(data_type)s prev_block_sum = shared_array[0];
%(if_tail)s if (source_node_index < n)
{
o_data[source_node_index + block_offset] = SUM(prev_block_sum,o_data[source_node_index + block_offset]);
}
%(if_tail)s if (target_node_index < n)
{
o_data[target_node_index + block_offset] = SUM(prev_block_sum,o_data[target_node_index + block_offset]);
}
}
}
"""
def _div_ceil(nr, dr):
return int(int(nr + dr -1)/int(dr))
def get_num_levels(n,c):
l = 0
while(n != 1):
l += 1
n = _div_ceil(n,c)
return l
def get_num_chunks(n,c,i):
return _div_ceil(n,pow(c,i))
# TODO: check this four .. is it future proof??
padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n"
#padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n"
class ExclusivePrefixKernel(object):
def __init__(self, data_type, scan_operation, neutral,item_size = None,
keep = False, options = [], preamble = '',name='prefix_kernel'):
self.item_size = item_size
if self.item_size == None:
# Determine the size of used data type
self.numpy_type = tools.parse_c_arg("const %s * in" %
data_type).dtype
self.item_size = self.numpy_type.itemsize
# Determine the number of threads
dev = driver.Context.get_device()
block_size = dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK)
block_dimension = dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X)
self.block_size = min(block_size, block_dimension)
# Shared memory size: two items per thread, number of threads,
# size of one data item
self.shared_size = self.get_chunk_size()*self.item_size
# Padding to avoid bank conflicts
# TODO: is this always 4??
log_num_banks = 4
self.shared_size += ((self.get_chunk_size() >> log_num_banks) *
self.item_size)
# Check whether we do not exceed available shared memory size
max_shared_size = dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK)
if self.shared_size > max_shared_size:
warn ("Changed shared size")
self.shared_size = max_shared_size
self.block_size = self.shared_size/(2*self.item_size)
self.main_function = self.get_main_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_function.prepare("PPi", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.tail_function = self.get_tail_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_function.prepare("PPii", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.main_part_sum_function = self.get_main_part_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_part_sum_function.prepare("PPPi", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.tail_part_sum_function = self.get_tail_part_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_part_sum_function.prepare("PPPii", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.main_uniform_sum_function = self.get_main_uniform_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_uniform_sum_function.prepare("PPi", block=(self.block_size, 1, 1),
shared=self.item_size)
self.tail_uniform_sum_function = self.get_tail_uniform_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_uniform_sum_function.prepare("PPii", block=(self.block_size, 1, 1),
shared=self.item_size)
# Use maximum available shared memory in 2.x devices
# TODO: is it needed as we are more limited by number of threads?
# Might be needed for large data types (?)
if dev.compute_capability() >= (2, 0):
cache_size = pycuda.driver.func_cache.PREFER_SHARED
self.main_function.set_cache_config(cache_size)
self.tail_function.set_cache_config(cache_size)
self.main_part_sum_function.set_cache_config(cache_size)
self.tail_part_sum_function.set_cache_config(cache_size)
def get_chunk_size(self):
return 2 * self.block_size
def get_main_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
'if_part_sum': "//",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def get_tail_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
'if_part_sum': "//",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def get_main_part_sum_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
'if_part_sum': "",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def get_tail_part_sum_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
'if_part_sum': "",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def get_main_uniform_sum_function(self, data_type, scan_operation, neutral,
name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
src = uniform_sum_source % {
'data_type': data_type,
'name': name,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral,
name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
src = uniform_sum_source % {
'data_type': data_type,
'name': name,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
}
return compiler.SourceModule(src, options=options, keep=keep,no_extern_c=True).get_function(name)
def call_final(self,input_size,i_data,o_data):
block_count = _div_ceil(input_size,self.get_chunk_size())
if input_size != block_count * self.get_chunk_size():
if block_count > 1:
self.main_function.prepared_call((block_count-1, 1),
i_data, o_data, self.get_chunk_size())
self.tail_function.prepared_call((1, 1), i_data,
o_data,
input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
else:
self.main_function.prepared_call((block_count, 1),
i_data, o_data,
self.get_chunk_size())
def call_intermediate(self,input_size,i_data,o_data,part_sum_buf):
block_count = _div_ceil(input_size,self.get_chunk_size())
if input_size != block_count * self.get_chunk_size():
if block_count > 1:
self.main_part_sum_function.prepared_call((block_count-1, 1),
i_data, o_data,part_sum_buf, self.get_chunk_size())
self.tail_part_sum_function.prepared_call((1, 1), i_data,
o_data,part_sum_buf,
input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
else:
self.main_part_sum_function.prepared_call((block_count, 1),
i_data, o_data,part_sum_buf,
self.get_chunk_size())
def call_uniform_add(self,input_size,i_data,o_data,part_sum_buf):
block_count = _div_ceil(input_size,self.get_chunk_size())
assert block_count > 1
if input_size != block_count * self.get_chunk_size():
block_count -= 1
if block_count > 1:
self.main_uniform_sum_function.prepared_call((block_count-1, 1),
o_data, part_sum_buf, self.get_chunk_size())
block_count += 1
self.tail_uniform_sum_function.prepared_call((1, 1), o_data,part_sum_buf,
input_size - (block_count - 1) * self.get_chunk_size(), (block_count - 1))
else:
block_count -= 1
self.main_uniform_sum_function.prepared_call((block_count, 1),
o_data, part_sum_buf,
self.get_chunk_size())
def __call__(self, *args, **kwargs):
i_data = kwargs.get('i_data')
if i_data is None:
i_data = args[0]
o_data = kwargs.get('o_data')
if o_data is None:
o_data = args[1]
n = kwargs.get('n')
if n is None:
n = min(i_data.size,o_data.size)
num_levels = get_num_levels(n,self.get_chunk_size())
part_sum_buf_szs = [ get_num_chunks(n,self.get_chunk_size(),l) for l in range(1,num_levels) ]
part_sum_bufs = [ driver.mem_alloc(sz*self.item_size) for sz in part_sum_buf_szs]
callargsets = [ [n,i_data.gpudata,o_data.gpudata] ] + [ [sz,ps_buf,ps_buf] for sz,ps_buf in zip(part_sum_buf_szs,part_sum_bufs)]
for i,ps_buf in enumerate(part_sum_bufs):
callargsets[i] += [ps_buf]
for callargset in callargsets[0:-1]:
self.call_intermediate(*callargset)
self.call_final(*callargsets[-1])
for callargset in reversed(callargsets[0:-1]):
self.call_uniform_add(*callargset)
if __name__=='__main__':
from pycuda import driver, compiler, gpuarray, tools
from scan import *
import numpy as np
import pycuda.autoinit
# sample usage for vector types and in generel arbitrary types.
# note:: size in bytes needs to be specified for non trivial types
n = 1024*1024
i_data = gpuarray.empty([n,3],dtype=np.int32)
i_data.fill(np.int32(1))
preamble = """
inline __device__ int3 operator+(int3 a, int3 b)
{
return make_int3(a.x + b.x, a.y + b.y, a.z + b.z);
}
"""
scan_kern = ExclusivePrefixKernel('int3','a+b','make_int3(0,0,0)',item_size=4*3,preamble=preamble,)
print i_data.get()
scan_kern(i_data,i_data,n=n)
print i_data.get()
_______________________________________________
PyCUDA mailing list
[email protected]
http://lists.tiker.net/listinfo/pycuda