I've tried to reproduce my test data and the failing queries with stress.py.

So, I've slightly modified the stress.py and added 2 more indexes for
insertion. The indexrangeslice query is also performed on 3 indexes. The
insert is done using an uniform distribution of values.

Then:

1. python contrib/py_stress/stress.py -r -C 32 -x keys
2. python contrib/py_stress/stress.py -C 32 -o indexedrangeslice -t 3

The queries fails as in the attachment: not on the first query but on the
3rd, 4th ... not allways the same.

Dragos

On Mon, Nov 22, 2010 at 9:39 PM, Jonathan Ellis <jbel...@gmail.com> wrote:

> Let's start wth the low-hanging fruit: can you give steps to reproduce
> queries that fail right away?
>
> On Wed, Nov 17, 2010 at 10:37 AM, dragos cernahoschi
> <dragos.cernahos...@gmail.com> wrote:
> > Back. I've tested the keys index pagination once again. 0.7 head. Smaller
> > data set: 1 million rows. It seems there are still some issues:
> >
> > 1. *test*: query on one column, count: 1000, expected number of distinct
> > results: 48251
> >    *result*: 5 pages of 1000 results, than, after the 6th page, the
> results
> > begin to repeat, I would expect that repetition begins after the 48251-th
> > row
> >
> > 2. *test*: query on 3 columns, count: 10 (count 100, count 1000 failed
> with
> > time out)
> >    *result*: 1 page of 10 results, than second page => time out
> >
> > 3. There are queries with combinations of 2, 3 columns that fail right
> away
> > with time out (count 10, 100).
> >
> > Dragos
> >
> >
> > On Mon, Nov 15, 2010 at 2:29 PM, Jonathan Ellis <jbel...@gmail.com>
> wrote:
> >
> >> On Mon, Nov 15, 2010 at 5:57 AM, dragos cernahoschi
> >> <dragos.cernahos...@gmail.com> wrote:
> >> > I've tested 0.7-beta3 branch index feature without the 1472 patch. The
> >> > queries on more than one column works better than the patched version,
> >> but
> >> > definitely not correctly.
> >>
> >> Please test 0.7 branch head, as you can see from the CHANGES there
> >> have been a lot of fixes.
> >>
> >> > 1.
> >> > 2.
> >> > 4.
> >>
> >> Should be fixed in head.
> >>
> >> > 3. Is there any example on the pagination feature? (without knowing
> the
> >> > expected number of rows).
> >>
> >> Same way you paginate through range slices or columns within a row,
> >> set start to the last result you got w/ previous query.
> >>
> >> > Will the get_indexed_slices return an empty list when there is no more
> >> > results?
> >>
> >> No, all queries are start-inclusive.
> >>
> >> --
> >> Jonathan Ellis
> >> Project Chair, Apache Cassandra
> >> co-founder of Riptano, the source for professional Cassandra support
> >> http://riptano.com
> >>
> >
>
>
>
> --
> Jonathan Ellis
> Project Chair, Apache Cassandra
> co-founder of Riptano, the source for professional Cassandra support
> http://riptano.com
>
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# expects a Cassandra server to be running and listening on port 9160.
# (read tests expect insert tests to have run first too.)

have_multiproc = False
try:
    from multiprocessing import Array as array, Process as Thread
    from uuid import uuid1 as get_ident
    Thread.isAlive = Thread.is_alive
    have_multiproc = True
except ImportError:
    from threading import Thread
    from thread import get_ident
    from array import array
from hashlib import md5
import time, random, sys, os
from random import randint, gauss
from optparse import OptionParser

from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.transport import THttpClient
from thrift.protocol import TBinaryProtocol

try:
    from cassandra import Cassandra
    from cassandra.ttypes import *
except ImportError:
    # add cassandra directory to sys.path
    L = os.path.abspath(__file__).split(os.path.sep)[:-3]
    root = os.path.sep.join(L)
    _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py')
    sys.path.append(os.path.join(_ipath, 'cassandra'))
    import Cassandra
    from ttypes import *
except ImportError:
    print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'"
    sys.exit(2)

try:
    from thrift.protocol import fastbinary
except ImportError:
    print "WARNING: thrift binary extension not found, benchmark will not be accurate!"

parser = OptionParser()
parser.add_option('-n', '--num-keys', type="int", dest="numkeys",
                  help="Number of keys", default=1000**2)
parser.add_option('-N', '--skip-keys', type="float", dest="skipkeys",
                  help="Fraction of keys to skip initially", default=0)
parser.add_option('-t', '--threads', type="int", dest="threads",
                  help="Number of threads/procs to use", default=50)
parser.add_option('-c', '--columns', type="int", dest="columns",
                  help="Number of columns per key", default=5)
parser.add_option('-S', '--column-size', type="int", dest="column_size",
                  help="Size of column values in bytes", default=34)
parser.add_option('-C', '--cardinality', type="int", dest="cardinality",
                  help="Number of unique values stored in columns", default=50)
parser.add_option('-d', '--nodes', type="string", dest="nodes",
                  help="Host nodes (comma separated)", default="localhost")
parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1,
                  help="standard deviation factor")
parser.add_option('-r', '--random', action="store_true", dest="random",
                  help="use random key generator (stdev will have no effect)")
parser.add_option('-f', '--file', type="string", dest="file", 
                  help="write output to file")
parser.add_option('-p', '--port', type="int", default=9160, dest="port",
                  help="thrift port")
parser.add_option('-m', '--unframed', action="store_true", dest="unframed",
                  help="use unframed transport")
parser.add_option('-o', '--operation', type="choice", dest="operation",
                  default="insert", choices=('insert', 'read', 'rangeslice',
                  'indexedrangeslice', 'multiget'),
                  help="operation to perform")
parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1,
                  help="number of super columns per key")
parser.add_option('-y', '--family-type', type="choice", dest="cftype",
                  choices=('regular','super'), default='regular',
                  help="column family type")
parser.add_option('-k', '--keep-going', action="store_true", dest="ignore",
                  help="ignore errors inserting or reading")
parser.add_option('-i', '--progress-interval', type="int", default=10,
                  dest="interval", help="progress report interval (seconds)")
parser.add_option('-g', '--keys-per-call', type="int", default=1000,
                  dest="rangecount",
                  help="amount of keys to get_range_slices or multiget per call")
parser.add_option('-l', '--replication-factor', type="int", default=1,
                  dest="replication",
                  help="replication factor to use when creating needed column families")
parser.add_option('-e', '--consistency-level', type="str", default='ONE',
                  dest="consistency", help="consistency level to use")
parser.add_option('-x', '--create-index', type="choice",
                  choices=('keys','keys_bitmap', 'none'), default='none',
                  dest="index", help="type of index to create on needed column families")

(options, args) = parser.parse_args()
 
total_keys = options.numkeys
n_threads = options.threads
keys_per_thread = total_keys / n_threads
columns_per_key = options.columns
supers_per_key = options.supers
# this allows client to round robin requests directly for
# simple request load-balancing
nodes = options.nodes.split(',')

# a generator that generates all keys according to a bell curve centered
# around the middle of the keys generated (0..total_keys).  Remember that
# about 68% of keys will be within stdev away from the mean and 
# about 95% within 2*stdev.
stdev = total_keys * options.stdev
mean = total_keys / 2

consistency = getattr(ConsistencyLevel, options.consistency, None)
if consistency is None:
    print "%s is not a valid consistency level" % options.consistency
    sys.exit(3)

# generates a list of unique, deterministic values
def generate_values():
    values = []
    for i in xrange(0, options.cardinality):
        h = md5(str(i)).hexdigest()
        values.append(h * int(options.column_size/len(h)) + h[:options.column_size % len(h)])
    return values

def key_generator_gauss():
    fmt = '%0' + str(len(str(total_keys))) + 'd'
    while True:
        guess = gauss(mean, stdev)
        if 0 <= guess < total_keys:
            return fmt % int(guess)
    
# a generator that will generate all keys w/ equal probability.  this is the
# worst case for caching.
def key_generator_random():
    fmt = '%0' + str(len(str(total_keys))) + 'd'
    return fmt % randint(0, total_keys - 1)

key_generator = key_generator_gauss
if options.random:
    key_generator = key_generator_random


def get_client(host='127.0.0.1', port=9160):
    socket = TSocket.TSocket(host, port)
    if options.unframed:
        transport = TTransport.TBufferedTransport(socket)
    else:
        transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    client = Cassandra.Client(protocol)
    client.transport = transport
    return client

def make_keyspaces():
    colms = []
    if options.index == 'keys':
        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C2', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C3', validation_class='UTF8Type', index_type=IndexType.KEYS)]
    elif options.index == 'keys_bitmap':
        colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP), ColumnDef(name='C2', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C3', validation_class='UTF8Type', index_type=IndexType.KEYS)]
    cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms),
             CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')]
    keyspace = KsDef(name='Keyspace1', strategy_class='org.apache.cassandra.locator.SimpleStrategy', replication_factor=options.replication, cf_defs=cfams)
    client = get_client(nodes[0], options.port)
    client.transport.open()
    try:
        client.system_add_keyspace(keyspace)
        print "Created keyspaces.  Sleeping %ss for propagation." % len(nodes)
        time.sleep(len(nodes))
    except InvalidRequestException, e:
        print e.why
    client.transport.close()

class Operation(Thread):
    def __init__(self, i, opcounts, keycounts, latencies):
        Thread.__init__(self)
        # generator of the keys to be used
        self.range = xrange(int(keys_per_thread * (i + options.skipkeys)), 
                            keys_per_thread * (i + 1))
        # we can't use a local counter, since that won't be visible to the parent
        # under multiprocessing.  instead, the parent passes a "opcounts" array
        # and an index that is our assigned counter.
        self.idx = i
        self.opcounts = opcounts
        # similarly, a shared array for latency and key totals
        self.latencies = latencies
        self.keycounts = keycounts
        # random host for pseudo-load-balancing
        [hostname] = random.sample(nodes, 1)
        # open client
        self.cclient = get_client(hostname, options.port)
        self.cclient.transport.open()
        self.cclient.set_keyspace('Keyspace1')

class Inserter(Operation):
    def run(self):
        values = generate_values()
        columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)]
        fmt = '%0' + str(len(str(total_keys))) + 'd'
        if 'super' == options.cftype:
            supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)]
        for i in self.range:
            key = fmt % i
            if 'super' == options.cftype:
                cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}}
            else:
                cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}}
            # set the correct column values for this row
            value = values[i % len(values)]
            for column in columns:
                column.value = value
            start = time.time()
            try:
                self.cclient.batch_mutate(cfmap, consistency)
            except KeyboardInterrupt:
                raise
            except Exception, e:
                if options.ignore:
                    print e
                else:
                    raise
            self.latencies[self.idx] += time.time() - start
            self.opcounts[self.idx] += 1
            self.keycounts[self.idx] += 1


class Reader(Operation):
    def run(self):
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        if 'super' == options.cftype:
            for i in xrange(keys_per_thread):
                key = key_generator()
                for j in xrange(supers_per_key):
                    parent = ColumnParent('Super1', 'S' + str(j))
                    start = time.time()
                    try:
                        r = self.cclient.get_slice(key, parent, p, consistency)
                        if not r: raise RuntimeError("Key %s not found" % key)
                    except KeyboardInterrupt:
                        raise
                    except Exception, e:
                        if options.ignore:
                            print e
                        else:
                            raise
                    self.latencies[self.idx] += time.time() - start
                    self.opcounts[self.idx] += 1
                    self.keycounts[self.idx] += 1
        else:
            parent = ColumnParent('Standard1')
            for i in xrange(keys_per_thread):
                key = key_generator()
                start = time.time()
                try:
                    r = self.cclient.get_slice(key, parent, p, consistency)
                    if not r: raise RuntimeError("Key %s not found" % key)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                    else:
                        raise
                self.latencies[self.idx] += time.time() - start
                self.opcounts[self.idx] += 1
                self.keycounts[self.idx] += 1

class RangeSlicer(Operation):
    def run(self):
        begin = self.range[0]
        end = self.range[-1]
        current = begin
        last = current + options.rangecount
        fmt = '%0' + str(len(str(total_keys))) + 'd'
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        if 'super' == options.cftype:
            while current < end:
                keyrange = KeyRange(fmt % current, fmt % last, count = options.rangecount)
                res = []
                for j in xrange(supers_per_key):
                    parent = ColumnParent('Super1', 'S' + str(j)) 
                    begin = time.time()
                    try:
                        res = self.cclient.get_range_slices(parent, p, keyrange, consistency)
                        if not res: raise RuntimeError("Key %s not found" % key)
                    except KeyboardInterrupt:
                        raise
                    except Exception, e:
                        if options.ignore:
                            print e
                        else:
                            raise
                    self.latencies[self.idx] += time.time() - begin
                    self.opcounts[self.idx] += 1
                current += len(r) + 1
                last = current + len(r) + 1
                self.keycounts[self.idx] += len(r)
        else:
            parent = ColumnParent('Standard1')
            while current < end:
                start = fmt % current 
                finish = fmt % last
                keyrange = KeyRange(start, finish, count = options.rangecount)
                begin = time.time()
                try:
                    r = self.cclient.get_range_slices(parent, p, keyrange, consistency)
                    if not r: raise RuntimeError("Range not found:", start, finish)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                    else:
                        print start, finish
                        raise
                current += len(r) + 1
                last = current + len(r)  + 1
                self.latencies[self.idx] += time.time() - begin
                self.opcounts[self.idx] += 1
                self.keycounts[self.idx] += len(r)

# Each thread queries for a portion of the unique values
# TODO: all threads start at the same key: implement wrapping, and start
# from the thread's appointed range
class IndexedRangeSlicer(Operation):
    def run(self):
        fmt = '%0' + str(len(str(total_keys))) + 'd'
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        values = generate_values()
        parent = ColumnParent('Standard1')
        # the number of rows with a particular value and the number of values we should query for
        expected_per_value = total_keys // len(values)
        valuebegin = self.range[0] // expected_per_value
        valuecount = len(self.range) // expected_per_value
        for valueidx in xrange(valuebegin, valuebegin + valuecount):
            received = 0
            start = fmt % 0
            value = values[valueidx % len(values)]
            expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value), IndexExpression(column_name='C2', op=IndexOperator.EQ, value=value), IndexExpression(column_name='C3', op=IndexOperator.EQ, value=value)]
            while received < expected_per_value:
                clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions)
                begin = time.time()
                try:
                    r = self.cclient.get_indexed_slices(parent, clause, p, consistency)
                    if not r: raise RuntimeError("No indexed values from offset received:", start)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                        continue
                    else:
                        raise
                received += len(r)
                # convert max key found back to an integer, and increment it
                start = fmt % (1 + max([int(keyslice.key) for keyslice in r]))
                self.latencies[self.idx] += time.time() - begin
                self.opcounts[self.idx] += 1
                self.keycounts[self.idx] += len(r)


class MultiGetter(Operation):
    def run(self):
        p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key))
        offset = self.idx * keys_per_thread
        count = (((self.idx+1) * keys_per_thread) - offset) / options.rangecount
        if 'super' == options.cftype:
            for x in xrange(count):
                keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
                for j in xrange(supers_per_key):
                    parent = ColumnParent('Super1', 'S' + str(j))
                    start = time.time()
                    try:
                        r = self.cclient.multiget_slice(keys, parent, p, consistency)
                        if not r: raise RuntimeError("Keys %s not found" % keys)
                    except KeyboardInterrupt:
                        raise
                    except Exception, e:
                        if options.ignore:
                            print e
                        else:
                            raise
                    self.latencies[self.idx] += time.time() - start
                    self.opcounts[self.idx] += 1
                    self.keycounts[self.idx] += len(keys)
                    offset += options.rangecount
        else:
            parent = ColumnParent('Standard1')
            for x in xrange(count):
                keys = [key_generator() for i in xrange(offset, offset + options.rangecount)]
                start = time.time()
                try:
                    r = self.cclient.multiget_slice(keys, parent, p, consistency)
                    if not r: raise RuntimeError("Keys %s not found" % keys)
                except KeyboardInterrupt:
                    raise
                except Exception, e:
                    if options.ignore:
                        print e
                    else:
                        raise
                self.latencies[self.idx] += time.time() - start
                self.opcounts[self.idx] += 1
                self.keycounts[self.idx] += len(keys)
                offset += options.rangecount


class OperationFactory:
    @staticmethod
    def create(type, i, opcounts, keycounts, latencies):
        if type == 'read':
            return Reader(i, opcounts, keycounts, latencies)
        elif type == 'insert':
            return Inserter(i, opcounts, keycounts, latencies)
        elif type == 'rangeslice':
            return RangeSlicer(i, opcounts, keycounts, latencies)
        elif type == 'indexedrangeslice':
            return IndexedRangeSlicer(i, opcounts, keycounts, latencies)
        elif type == 'multiget':
            return MultiGetter(i, opcounts, keycounts, latencies)
        else:
            raise RuntimeError, 'Unsupported op!'


class Stress(object):
    opcounts = array('i', [0] * n_threads)
    latencies = array('d', [0] * n_threads)
    keycounts = array('i', [0] * n_threads)

    def create_threads(self,type):
        threads = []
        for i in xrange(n_threads):
            th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies)
            threads.append(th)
            th.start()
        return threads

    def run_test(self,filename,threads):
        start_t = time.time()
        if filename:
            outf = open(filename,'w')
        else:
            outf = sys.stdout
        outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n')
        epoch = total = old_total = latency = keycount = old_keycount = old_latency = 0
        epoch_intervals = (options.interval * 10) # 1 epoch = 1 tenth of a second
        terminate = False
        while not terminate:
            time.sleep(0.1)
            if not [th for th in threads if th.isAlive()]:
                terminate = True
            epoch = epoch + 1
            if terminate or epoch > epoch_intervals:
                epoch = 0
                old_total, old_latency, old_keycount = total, latency, keycount
                total = sum(self.opcounts[th.idx] for th in threads)
                latency = sum(self.latencies[th.idx] for th in threads)
                keycount = sum(self.keycounts[th.idx] for th in threads)
                opdelta = total - old_total
                keydelta = keycount - old_keycount
                delta_latency = latency - old_latency
                if opdelta > 0:
                    delta_formatted = (delta_latency / opdelta)
                else:
                    delta_formatted = 'NaN'
                elapsed_t = int(time.time() - start_t)
                outf.write('%d,%d,%d,%s,%d\n' 
                           % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t))

    def insert(self):
        threads = self.create_threads('insert')
        self.run_test(options.file,threads);

    def read(self):
        threads = self.create_threads('read')
        self.run_test(options.file,threads);
        
    def rangeslice(self):
        threads = self.create_threads('rangeslice')
        self.run_test(options.file,threads);

    def indexedrangeslice(self):
        threads = self.create_threads('indexedrangeslice')
        self.run_test(options.file,threads);

    def multiget(self):
        threads = self.create_threads('multiget')
        self.run_test(options.file,threads);

stresser = Stress()
benchmark = getattr(stresser, options.operation, None)
if not have_multiproc:
    print """WARNING: multiprocessing not present, threading will be used.
        Benchmark may not be accurate!"""
if options.operation == 'insert':
    make_keyspaces()
benchmark()

Reply via email to