I've tried to reproduce my test data and the failing queries with stress.py.
So, I've slightly modified the stress.py and added 2 more indexes for insertion. The indexrangeslice query is also performed on 3 indexes. The insert is done using an uniform distribution of values. Then: 1. python contrib/py_stress/stress.py -r -C 32 -x keys 2. python contrib/py_stress/stress.py -C 32 -o indexedrangeslice -t 3 The queries fails as in the attachment: not on the first query but on the 3rd, 4th ... not allways the same. Dragos On Mon, Nov 22, 2010 at 9:39 PM, Jonathan Ellis <jbel...@gmail.com> wrote: > Let's start wth the low-hanging fruit: can you give steps to reproduce > queries that fail right away? > > On Wed, Nov 17, 2010 at 10:37 AM, dragos cernahoschi > <dragos.cernahos...@gmail.com> wrote: > > Back. I've tested the keys index pagination once again. 0.7 head. Smaller > > data set: 1 million rows. It seems there are still some issues: > > > > 1. *test*: query on one column, count: 1000, expected number of distinct > > results: 48251 > > *result*: 5 pages of 1000 results, than, after the 6th page, the > results > > begin to repeat, I would expect that repetition begins after the 48251-th > > row > > > > 2. *test*: query on 3 columns, count: 10 (count 100, count 1000 failed > with > > time out) > > *result*: 1 page of 10 results, than second page => time out > > > > 3. There are queries with combinations of 2, 3 columns that fail right > away > > with time out (count 10, 100). > > > > Dragos > > > > > > On Mon, Nov 15, 2010 at 2:29 PM, Jonathan Ellis <jbel...@gmail.com> > wrote: > > > >> On Mon, Nov 15, 2010 at 5:57 AM, dragos cernahoschi > >> <dragos.cernahos...@gmail.com> wrote: > >> > I've tested 0.7-beta3 branch index feature without the 1472 patch. The > >> > queries on more than one column works better than the patched version, > >> but > >> > definitely not correctly. > >> > >> Please test 0.7 branch head, as you can see from the CHANGES there > >> have been a lot of fixes. > >> > >> > 1. > >> > 2. > >> > 4. > >> > >> Should be fixed in head. > >> > >> > 3. Is there any example on the pagination feature? (without knowing > the > >> > expected number of rows). > >> > >> Same way you paginate through range slices or columns within a row, > >> set start to the last result you got w/ previous query. > >> > >> > Will the get_indexed_slices return an empty list when there is no more > >> > results? > >> > >> No, all queries are start-inclusive. > >> > >> -- > >> Jonathan Ellis > >> Project Chair, Apache Cassandra > >> co-founder of Riptano, the source for professional Cassandra support > >> http://riptano.com > >> > > > > > > -- > Jonathan Ellis > Project Chair, Apache Cassandra > co-founder of Riptano, the source for professional Cassandra support > http://riptano.com >
#!/usr/bin/python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # expects a Cassandra server to be running and listening on port 9160. # (read tests expect insert tests to have run first too.) have_multiproc = False try: from multiprocessing import Array as array, Process as Thread from uuid import uuid1 as get_ident Thread.isAlive = Thread.is_alive have_multiproc = True except ImportError: from threading import Thread from thread import get_ident from array import array from hashlib import md5 import time, random, sys, os from random import randint, gauss from optparse import OptionParser from thrift.transport import TTransport from thrift.transport import TSocket from thrift.transport import THttpClient from thrift.protocol import TBinaryProtocol try: from cassandra import Cassandra from cassandra.ttypes import * except ImportError: # add cassandra directory to sys.path L = os.path.abspath(__file__).split(os.path.sep)[:-3] root = os.path.sep.join(L) _ipath = os.path.join(root, 'interface', 'thrift', 'gen-py') sys.path.append(os.path.join(_ipath, 'cassandra')) import Cassandra from ttypes import * except ImportError: print "Cassandra thrift bindings not found, please run 'ant gen-thrift-py'" sys.exit(2) try: from thrift.protocol import fastbinary except ImportError: print "WARNING: thrift binary extension not found, benchmark will not be accurate!" parser = OptionParser() parser.add_option('-n', '--num-keys', type="int", dest="numkeys", help="Number of keys", default=1000**2) parser.add_option('-N', '--skip-keys', type="float", dest="skipkeys", help="Fraction of keys to skip initially", default=0) parser.add_option('-t', '--threads', type="int", dest="threads", help="Number of threads/procs to use", default=50) parser.add_option('-c', '--columns', type="int", dest="columns", help="Number of columns per key", default=5) parser.add_option('-S', '--column-size', type="int", dest="column_size", help="Size of column values in bytes", default=34) parser.add_option('-C', '--cardinality', type="int", dest="cardinality", help="Number of unique values stored in columns", default=50) parser.add_option('-d', '--nodes', type="string", dest="nodes", help="Host nodes (comma separated)", default="localhost") parser.add_option('-s', '--stdev', type="float", dest="stdev", default=0.1, help="standard deviation factor") parser.add_option('-r', '--random', action="store_true", dest="random", help="use random key generator (stdev will have no effect)") parser.add_option('-f', '--file', type="string", dest="file", help="write output to file") parser.add_option('-p', '--port', type="int", default=9160, dest="port", help="thrift port") parser.add_option('-m', '--unframed', action="store_true", dest="unframed", help="use unframed transport") parser.add_option('-o', '--operation', type="choice", dest="operation", default="insert", choices=('insert', 'read', 'rangeslice', 'indexedrangeslice', 'multiget'), help="operation to perform") parser.add_option('-u', '--supercolumns', type="int", dest="supers", default=1, help="number of super columns per key") parser.add_option('-y', '--family-type', type="choice", dest="cftype", choices=('regular','super'), default='regular', help="column family type") parser.add_option('-k', '--keep-going', action="store_true", dest="ignore", help="ignore errors inserting or reading") parser.add_option('-i', '--progress-interval', type="int", default=10, dest="interval", help="progress report interval (seconds)") parser.add_option('-g', '--keys-per-call', type="int", default=1000, dest="rangecount", help="amount of keys to get_range_slices or multiget per call") parser.add_option('-l', '--replication-factor', type="int", default=1, dest="replication", help="replication factor to use when creating needed column families") parser.add_option('-e', '--consistency-level', type="str", default='ONE', dest="consistency", help="consistency level to use") parser.add_option('-x', '--create-index', type="choice", choices=('keys','keys_bitmap', 'none'), default='none', dest="index", help="type of index to create on needed column families") (options, args) = parser.parse_args() total_keys = options.numkeys n_threads = options.threads keys_per_thread = total_keys / n_threads columns_per_key = options.columns supers_per_key = options.supers # this allows client to round robin requests directly for # simple request load-balancing nodes = options.nodes.split(',') # a generator that generates all keys according to a bell curve centered # around the middle of the keys generated (0..total_keys). Remember that # about 68% of keys will be within stdev away from the mean and # about 95% within 2*stdev. stdev = total_keys * options.stdev mean = total_keys / 2 consistency = getattr(ConsistencyLevel, options.consistency, None) if consistency is None: print "%s is not a valid consistency level" % options.consistency sys.exit(3) # generates a list of unique, deterministic values def generate_values(): values = [] for i in xrange(0, options.cardinality): h = md5(str(i)).hexdigest() values.append(h * int(options.column_size/len(h)) + h[:options.column_size % len(h)]) return values def key_generator_gauss(): fmt = '%0' + str(len(str(total_keys))) + 'd' while True: guess = gauss(mean, stdev) if 0 <= guess < total_keys: return fmt % int(guess) # a generator that will generate all keys w/ equal probability. this is the # worst case for caching. def key_generator_random(): fmt = '%0' + str(len(str(total_keys))) + 'd' return fmt % randint(0, total_keys - 1) key_generator = key_generator_gauss if options.random: key_generator = key_generator_random def get_client(host='127.0.0.1', port=9160): socket = TSocket.TSocket(host, port) if options.unframed: transport = TTransport.TBufferedTransport(socket) else: transport = TTransport.TFramedTransport(socket) protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) client = Cassandra.Client(protocol) client.transport = transport return client def make_keyspaces(): colms = [] if options.index == 'keys': colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C2', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C3', validation_class='UTF8Type', index_type=IndexType.KEYS)] elif options.index == 'keys_bitmap': colms = [ColumnDef(name='C1', validation_class='UTF8Type', index_type=IndexType.KEYS_BITMAP), ColumnDef(name='C2', validation_class='UTF8Type', index_type=IndexType.KEYS), ColumnDef(name='C3', validation_class='UTF8Type', index_type=IndexType.KEYS)] cfams = [CfDef(keyspace='Keyspace1', name='Standard1', column_metadata=colms), CfDef(keyspace='Keyspace1', name='Super1', column_type='Super')] keyspace = KsDef(name='Keyspace1', strategy_class='org.apache.cassandra.locator.SimpleStrategy', replication_factor=options.replication, cf_defs=cfams) client = get_client(nodes[0], options.port) client.transport.open() try: client.system_add_keyspace(keyspace) print "Created keyspaces. Sleeping %ss for propagation." % len(nodes) time.sleep(len(nodes)) except InvalidRequestException, e: print e.why client.transport.close() class Operation(Thread): def __init__(self, i, opcounts, keycounts, latencies): Thread.__init__(self) # generator of the keys to be used self.range = xrange(int(keys_per_thread * (i + options.skipkeys)), keys_per_thread * (i + 1)) # we can't use a local counter, since that won't be visible to the parent # under multiprocessing. instead, the parent passes a "opcounts" array # and an index that is our assigned counter. self.idx = i self.opcounts = opcounts # similarly, a shared array for latency and key totals self.latencies = latencies self.keycounts = keycounts # random host for pseudo-load-balancing [hostname] = random.sample(nodes, 1) # open client self.cclient = get_client(hostname, options.port) self.cclient.transport.open() self.cclient.set_keyspace('Keyspace1') class Inserter(Operation): def run(self): values = generate_values() columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)] fmt = '%0' + str(len(str(total_keys))) + 'd' if 'super' == options.cftype: supers = [SuperColumn('S' + str(j), columns) for j in xrange(supers_per_key)] for i in self.range: key = fmt % i if 'super' == options.cftype: cfmap= {key: {'Super1' : [Mutation(ColumnOrSuperColumn(super_column=s)) for s in supers]}} else: cfmap = {key: {'Standard1': [Mutation(ColumnOrSuperColumn(column=c)) for c in columns]}} # set the correct column values for this row value = values[i % len(values)] for column in columns: column.value = value start = time.time() try: self.cclient.batch_mutate(cfmap, consistency) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - start self.opcounts[self.idx] += 1 self.keycounts[self.idx] += 1 class Reader(Operation): def run(self): p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) if 'super' == options.cftype: for i in xrange(keys_per_thread): key = key_generator() for j in xrange(supers_per_key): parent = ColumnParent('Super1', 'S' + str(j)) start = time.time() try: r = self.cclient.get_slice(key, parent, p, consistency) if not r: raise RuntimeError("Key %s not found" % key) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - start self.opcounts[self.idx] += 1 self.keycounts[self.idx] += 1 else: parent = ColumnParent('Standard1') for i in xrange(keys_per_thread): key = key_generator() start = time.time() try: r = self.cclient.get_slice(key, parent, p, consistency) if not r: raise RuntimeError("Key %s not found" % key) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - start self.opcounts[self.idx] += 1 self.keycounts[self.idx] += 1 class RangeSlicer(Operation): def run(self): begin = self.range[0] end = self.range[-1] current = begin last = current + options.rangecount fmt = '%0' + str(len(str(total_keys))) + 'd' p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) if 'super' == options.cftype: while current < end: keyrange = KeyRange(fmt % current, fmt % last, count = options.rangecount) res = [] for j in xrange(supers_per_key): parent = ColumnParent('Super1', 'S' + str(j)) begin = time.time() try: res = self.cclient.get_range_slices(parent, p, keyrange, consistency) if not res: raise RuntimeError("Key %s not found" % key) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - begin self.opcounts[self.idx] += 1 current += len(r) + 1 last = current + len(r) + 1 self.keycounts[self.idx] += len(r) else: parent = ColumnParent('Standard1') while current < end: start = fmt % current finish = fmt % last keyrange = KeyRange(start, finish, count = options.rangecount) begin = time.time() try: r = self.cclient.get_range_slices(parent, p, keyrange, consistency) if not r: raise RuntimeError("Range not found:", start, finish) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: print start, finish raise current += len(r) + 1 last = current + len(r) + 1 self.latencies[self.idx] += time.time() - begin self.opcounts[self.idx] += 1 self.keycounts[self.idx] += len(r) # Each thread queries for a portion of the unique values # TODO: all threads start at the same key: implement wrapping, and start # from the thread's appointed range class IndexedRangeSlicer(Operation): def run(self): fmt = '%0' + str(len(str(total_keys))) + 'd' p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) values = generate_values() parent = ColumnParent('Standard1') # the number of rows with a particular value and the number of values we should query for expected_per_value = total_keys // len(values) valuebegin = self.range[0] // expected_per_value valuecount = len(self.range) // expected_per_value for valueidx in xrange(valuebegin, valuebegin + valuecount): received = 0 start = fmt % 0 value = values[valueidx % len(values)] expressions = [IndexExpression(column_name='C1', op=IndexOperator.EQ, value=value), IndexExpression(column_name='C2', op=IndexOperator.EQ, value=value), IndexExpression(column_name='C3', op=IndexOperator.EQ, value=value)] while received < expected_per_value: clause = IndexClause(start_key=start, count=options.rangecount, expressions=expressions) begin = time.time() try: r = self.cclient.get_indexed_slices(parent, clause, p, consistency) if not r: raise RuntimeError("No indexed values from offset received:", start) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e continue else: raise received += len(r) # convert max key found back to an integer, and increment it start = fmt % (1 + max([int(keyslice.key) for keyslice in r])) self.latencies[self.idx] += time.time() - begin self.opcounts[self.idx] += 1 self.keycounts[self.idx] += len(r) class MultiGetter(Operation): def run(self): p = SlicePredicate(slice_range=SliceRange('', '', False, columns_per_key)) offset = self.idx * keys_per_thread count = (((self.idx+1) * keys_per_thread) - offset) / options.rangecount if 'super' == options.cftype: for x in xrange(count): keys = [key_generator() for i in xrange(offset, offset + options.rangecount)] for j in xrange(supers_per_key): parent = ColumnParent('Super1', 'S' + str(j)) start = time.time() try: r = self.cclient.multiget_slice(keys, parent, p, consistency) if not r: raise RuntimeError("Keys %s not found" % keys) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - start self.opcounts[self.idx] += 1 self.keycounts[self.idx] += len(keys) offset += options.rangecount else: parent = ColumnParent('Standard1') for x in xrange(count): keys = [key_generator() for i in xrange(offset, offset + options.rangecount)] start = time.time() try: r = self.cclient.multiget_slice(keys, parent, p, consistency) if not r: raise RuntimeError("Keys %s not found" % keys) except KeyboardInterrupt: raise except Exception, e: if options.ignore: print e else: raise self.latencies[self.idx] += time.time() - start self.opcounts[self.idx] += 1 self.keycounts[self.idx] += len(keys) offset += options.rangecount class OperationFactory: @staticmethod def create(type, i, opcounts, keycounts, latencies): if type == 'read': return Reader(i, opcounts, keycounts, latencies) elif type == 'insert': return Inserter(i, opcounts, keycounts, latencies) elif type == 'rangeslice': return RangeSlicer(i, opcounts, keycounts, latencies) elif type == 'indexedrangeslice': return IndexedRangeSlicer(i, opcounts, keycounts, latencies) elif type == 'multiget': return MultiGetter(i, opcounts, keycounts, latencies) else: raise RuntimeError, 'Unsupported op!' class Stress(object): opcounts = array('i', [0] * n_threads) latencies = array('d', [0] * n_threads) keycounts = array('i', [0] * n_threads) def create_threads(self,type): threads = [] for i in xrange(n_threads): th = OperationFactory.create(type, i, self.opcounts, self.keycounts, self.latencies) threads.append(th) th.start() return threads def run_test(self,filename,threads): start_t = time.time() if filename: outf = open(filename,'w') else: outf = sys.stdout outf.write('total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time\n') epoch = total = old_total = latency = keycount = old_keycount = old_latency = 0 epoch_intervals = (options.interval * 10) # 1 epoch = 1 tenth of a second terminate = False while not terminate: time.sleep(0.1) if not [th for th in threads if th.isAlive()]: terminate = True epoch = epoch + 1 if terminate or epoch > epoch_intervals: epoch = 0 old_total, old_latency, old_keycount = total, latency, keycount total = sum(self.opcounts[th.idx] for th in threads) latency = sum(self.latencies[th.idx] for th in threads) keycount = sum(self.keycounts[th.idx] for th in threads) opdelta = total - old_total keydelta = keycount - old_keycount delta_latency = latency - old_latency if opdelta > 0: delta_formatted = (delta_latency / opdelta) else: delta_formatted = 'NaN' elapsed_t = int(time.time() - start_t) outf.write('%d,%d,%d,%s,%d\n' % (total, opdelta / options.interval, keydelta / options.interval, delta_formatted, elapsed_t)) def insert(self): threads = self.create_threads('insert') self.run_test(options.file,threads); def read(self): threads = self.create_threads('read') self.run_test(options.file,threads); def rangeslice(self): threads = self.create_threads('rangeslice') self.run_test(options.file,threads); def indexedrangeslice(self): threads = self.create_threads('indexedrangeslice') self.run_test(options.file,threads); def multiget(self): threads = self.create_threads('multiget') self.run_test(options.file,threads); stresser = Stress() benchmark = getattr(stresser, options.operation, None) if not have_multiproc: print """WARNING: multiprocessing not present, threading will be used. Benchmark may not be accurate!""" if options.operation == 'insert': make_keyspaces() benchmark()