Hi Andreas,
I've recoded much of the scan code. Also I've been having
problems with my git setup with proxy. So I'm just attaching the
source code as is with a sample test code.
I'm of the opinion that a single exclusive scan be part of the
implementation. Simply because the inclusive scan is just a step away
(by reading and adding the nth element).
Regards
Nithin
from pycuda import driver, compiler, gpuarray, tools, reduction
from math import log,ceil,floor
import warnings
exclusive_scan_source = """
#define SUM(a, b) (%(scan_operation)s)
%(preamble)s
__global__ void %(name)s(%(data_type)s *result,const
%(data_type)s *material
%(if_part_sum)s ,%(data_type)s *partial_sum
,const int n
%(if_tail)s ,const int block_index
)
{
extern __shared__ %(data_type)s shared_array[];
int source_node_index = threadIdx.x;
// Size of shared array that can contain input data
%(if_main)s const int full_array_size = n;
%(if_tail)s const int full_array_size = 1<<((int)(floor(log2((float)n))+1));
%(if_main)s const int block_index = blockIdx.x;
const int block_offset = 2*block_index*blockDim.x;
int tree_node_distance = 1;
int target_node_index = threadIdx.x + (n/2);
%(if_tail)s if (source_node_index < n)
{
shared_array[source_node_index +
SHARED_BANK_PADDING(source_node_index)] =
material[source_node_index + block_offset];
}
%(if_tail)s else {shared_array[source_node_index +
SHARED_BANK_PADDING(source_node_index)] = %(neutral)s;}
%(if_tail)s if (target_node_index < n)
{
shared_array[target_node_index +
SHARED_BANK_PADDING(target_node_index)] =
material[target_node_index + block_offset];
}
%(if_tail)s else {shared_array[target_node_index +
SHARED_BANK_PADDING(target_node_index)] = %(neutral)s;}
// Travel upwards, from leaves to the root of the tree
// During each loop time distance between nodes
increases two times,
// and number of nodes decreases two times
for (int number_of_nodes = full_array_size>>1;
number_of_nodes > 0;
number_of_nodes >>= 1)
{
__syncthreads();
if (threadIdx.x < number_of_nodes)
{
int source_node_index =
tree_node_distance*(2*threadIdx.x+1)-1;
int target_node_index =
tree_node_distance*(2*threadIdx.x+2)-1;
source_node_index +=
SHARED_BANK_PADDING(source_node_index);
target_node_index +=
SHARED_BANK_PADDING(target_node_index);
shared_array[target_node_index] =
SUM(shared_array[target_node_index],
shared_array[source_node_index]);
}
tree_node_distance <<= 1;
}
if (threadIdx.x == 0)
{
%(if_part_sum)s partial_sum[block_index] =
shared_array[full_array_size-1 +
SHARED_BANK_PADDING(full_array_size-1)];
shared_array[full_array_size-1 +
SHARED_BANK_PADDING(full_array_size-1)] = %(neutral)s;
}
// Travel downwards, from root to the leaves
// During each loop number of nodes increases two times and
// distance between nodes decreases two times
for (int number_of_nodes = 1; number_of_nodes <
full_array_size; number_of_nodes <<= 1)
{
tree_node_distance >>= 1;
__syncthreads();
if (threadIdx.x < number_of_nodes)
{
int source_node_index =
tree_node_distance*(2*threadIdx.x+1)-1;
int target_node_index =
tree_node_distance*(2*threadIdx.x+2)-1;
source_node_index +=
SHARED_BANK_PADDING(source_node_index);
target_node_index +=
SHARED_BANK_PADDING(target_node_index);
%(data_type)s temp = shared_array[source_node_index];
shared_array[source_node_index] =
shared_array[target_node_index];
shared_array[target_node_index] =
SUM(shared_array[target_node_index], temp);
}
}
__syncthreads();
%(if_tail)s if (source_node_index < n)
{
result[source_node_index + block_offset] =
shared_array[source_node_index +
SHARED_BANK_PADDING(source_node_index)];
}
%(if_tail)s if (target_node_index < n)
{
result[target_node_index + block_offset] =
shared_array[target_node_index +
SHARED_BANK_PADDING(target_node_index)];
}
}
"""
uniform_sum_source = """
#define SUM(a, b) (%(scan_operation)s)
%(preamble)s
__global__ void %(name)s(%(data_type)s
*result,%(data_type)s *partial_sum
,const int n
%(if_tail)s ,const int block_index
)
{
extern __shared__ %(data_type)s shared_array[];
int source_node_index = threadIdx.x;
// Size of shared array that can contain input data
%(if_main)s const int block_index = blockIdx.x+1;
const int block_offset = 2*block_index*blockDim.x;
int target_node_index = threadIdx.x + (n/2);
if (threadIdx.x == 0)
{
shared_array[0] = partial_sum[block_index];
}
__syncthreads();
// is this a bad call??
%(data_type)s prev_block_sum = shared_array[0];
%(if_tail)s if (source_node_index < n)
{
result[source_node_index + block_offset] =
SUM(prev_block_sum,result[source_node_index + block_offset]);
}
%(if_tail)s if (target_node_index < n)
{
result[target_node_index + block_offset] =
SUM(prev_block_sum,result[target_node_index + block_offset]);
}
}
"""
# TODO: check this four .. is it future proof??
padding_preamble = "\n#define SHARED_BANK_PADDING(n) ((n) >> 4)\n"
#padding_preamble = "\n#define SHARED_BANK_PADDING(n) 0\n"
class ExclusivePrefixKernel(object):
def __init__(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
# Determine the size of used data type
self.numpy_type = tools.parse_c_arg("const %s * in" %
data_type).dtype
item_size = self.numpy_type.itemsize
# Determine the number of threads
dev = driver.Context.get_device()
block_size =
dev.get_attribute(driver.device_attribute.MAX_THREADS_PER_BLOCK)
block_dimension =
dev.get_attribute(driver.device_attribute.MAX_BLOCK_DIM_X)
self.block_size = min(block_size, block_dimension)
# Shared memory size: two items per thread, number of threads,
# size of one data item
self.shared_size = self.get_block_size()*item_size
# Padding to avoid bank conflicts
# TODO: is this always 4??
log_num_banks = 4
self.shared_size += ((self.get_block_size() >> log_num_banks) *
item_size)
# Check whether we do not exceed available shared memory size
max_shared_size =
dev.get_attribute(driver.device_attribute.MAX_SHARED_MEMORY_PER_BLOCK)
if self.shared_size > max_shared_size:
warnings.warn ("Changed shared size")
self.shared_size = max_shared_size
self.block_size = self.shared_size//(2*item_size)
self.main_function = self.get_main_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_function.prepare("PPi", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.tail_function = self.get_tail_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_function.prepare("PPii", block=(self.block_size, 1, 1),
shared=self.shared_size)
self.main_part_sum_function =
self.get_main_part_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_part_sum_function.prepare("PPPi",
block=(self.block_size, 1, 1),
shared=self.shared_size)
self.tail_part_sum_function =
self.get_tail_part_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_part_sum_function.prepare("PPPii",
block=(self.block_size, 1, 1),
shared=self.shared_size)
self.main_uniform_sum_function =
self.get_main_uniform_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.main_uniform_sum_function.prepare("PPi",
block=(self.block_size, 1, 1),
shared=item_size)
self.tail_uniform_sum_function =
self.get_tail_uniform_sum_function(data_type, scan_operation,
neutral, name, keep, options, preamble)
self.tail_uniform_sum_function.prepare("PPii",
block=(self.block_size, 1, 1),
shared=item_size)
# Use maximum available shared memory in 2.x devices
# TODO: is it needed as we are more limited by number of threads?
# Might be needed for large data types (?)
if dev.compute_capability() >= (2, 0):
cache_size = pycuda.driver.func_cache.PREFER_SHARED
self.main_function.set_cache_config(cache_size)
self.tail_function.set_cache_config(cache_size)
self.main_part_sum_function.set_cache_config(cache_size)
self.tail_part_sum_function.set_cache_config(cache_size)
def get_block_size(self):
return 2 * self.block_size
def get_main_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
'if_part_sum': "//",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def get_tail_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
'if_part_sum': "//",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def get_main_part_sum_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
'if_part_sum': "",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def get_tail_part_sum_function(self, data_type, scan_operation, neutral,
name = 'prefix_kernel', keep = False, options = [], preamble = ''):
src = exclusive_scan_source % {
'data_type': data_type,
'name': name,
'neutral': neutral,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
'if_part_sum': "",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def get_main_uniform_sum_function(self, data_type, scan_operation, neutral,
name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
src = uniform_sum_source % {
'data_type': data_type,
'name': name,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "//",
'if_main': "",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def get_tail_uniform_sum_function(self, data_type, scan_operation, neutral,
name = 'uniform_add_kernel', keep = False, options = [], preamble = ''):
src = uniform_sum_source % {
'data_type': data_type,
'name': name,
'scan_operation': scan_operation,
'preamble': preamble+padding_preamble,
'if_tail': "",
'if_main': "//",
}
return compiler.SourceModule(src, options=options,
keep=keep).get_function(name)
def call_final(self,input_size,result,material):
block_count = ((input_size + self.get_block_size() -
1)/(self.get_block_size()))
if input_size != block_count * self.get_block_size():
if block_count > 1:
self.main_function.prepared_call((block_count-1, 1),
result.gpudata, material.gpudata, self.get_block_size())
self.tail_function.prepared_call((1, 1), result.gpudata,
material.gpudata,
input_size - (block_count - 1) *
self.get_block_size(), (block_count - 1))
else:
self.main_function.prepared_call((block_count, 1),
result.gpudata, material.gpudata,
self.get_block_size())
def call_intermediate(self,input_size,result,material,part_sum_buf):
block_count = ((input_size + self.get_block_size() -
1)/(self.get_block_size()))
if input_size != block_count * self.get_block_size():
if block_count > 1:
self.main_part_sum_function.prepared_call((block_count-1, 1),
result.gpudata,
material.gpudata,part_sum_buf.gpudata, self.get_block_size())
self.tail_part_sum_function.prepared_call((1, 1), result.gpudata,
material.gpudata,part_sum_buf.gpudata,
input_size - (block_count - 1) *
self.get_block_size(), (block_count - 1))
else:
self.main_part_sum_function.prepared_call((block_count, 1),
result.gpudata, material.gpudata,part_sum_buf.gpudata,
self.get_block_size())
def call_uniform_add(self,input_size,result,material,part_sum_buf):
block_count = ((input_size + self.get_block_size() -
1)/(self.get_block_size()))
if block_count <= 1:
raise Exception("This should not have happened .. nothing
to be uniform added")
if input_size != block_count * self.get_block_size():
block_count -= 1
if block_count > 1:
self.main_uniform_sum_function.prepared_call((block_count-1, 1),
result.gpudata, part_sum_buf.gpudata, self.get_block_size())
block_count += 1
self.tail_uniform_sum_function.prepared_call((1, 1),
result.gpudata,part_sum_buf.gpudata,
input_size - (block_count - 1) *
self.get_block_size(), (block_count - 1))
else:
block_count -= 1
self.main_uniform_sum_function.prepared_call((block_count, 1),
result.gpudata, part_sum_buf.gpudata,
self.get_block_size())
def __call__(self, *args, **kwargs):
result = kwargs.get('result')
if result is None:
result = args[0]
material = kwargs.get('material')
if material is None:
material = args[1]
input_size = kwargs.get('size')
if input_size is None:
input_size = material.size
part_sum_bufs = [
gpuarray.empty(int(ceil(float(input_size)/pow(2*self.block_size,i))),result.dtype)
for i in
range(1,int(ceil(log(input_size,2*self.block_size)))) ]
callargsets = [ [input_size,result,material] ] + [
[ps_buf.size,ps_buf,ps_buf] for ps_buf in part_sum_bufs]
for i,ps_buf in enumerate(part_sum_bufs):
callargsets[i] += [ps_buf]
for callargset in callargsets[0:-1]:
self.call_intermediate(*callargset)
self.call_final(*callargsets[-1])
callargsets.reverse()
for callargset in callargsets[1:]:
self.call_uniform_add(*callargset)
if __name__=="__main__":
import time
import pycuda.autoinit
import numpy as np
sz = 512*512*512/8
a = gpuarray.empty(sz, dtype=np.int32)
b = a
a.fill(np.int32(1))
krnl = ExclusivePrefixKernel('int', 'a+b', '0')
time_start = time.time()
krnl(a, b, sz)
print "kernel finished in %f seconds "%(time.time() - time_start)
print "a = ",a.get()
print "b = ",b.get()
On 21 February 2011 23:27, Andreas Kloeckner <[email protected]> wrote:
> Hi Nithin,
>
> On Mon, 21 Feb 2011 22:27:04 +0530, nithin s <[email protected]> wrote:
>> I believe there are some errors in the implementation. Im
>> basing my comments only on the exclusive version.
>>
>> The final call to finish adds the "each" of the partial sums to
>> every element of the result. That is to say that if my array size was
>> 1024x1024 and each thread block worked on 1024 elements. My partial
>> sum array would be as large as 1024 and the last(or second to last)
>> block would have to iterate 1024 sums to produce the result.
>>
>> Isn't this wrong? shouldn't the partial sums be prefix scanned
>> and then each block adds the associated partial sum o/p to each of its
>> elements. That way the loop for (int i = 1; i <= blockIdx.x; i++) is
>> not needed.
>
> We know it's broken at the moment--that's why it's currently living on a
> branch and not in mainline PyCUDA yet. Patches welcome.
>
> Andreas
>
>
_______________________________________________
PyCUDA mailing list
[email protected]
http://lists.tiker.net/listinfo/pycuda