Bootstrapping data from Cassandra 2.2.5 datacenter to 3.0.8 datacenter fails because of streaming errors

2016-10-10 Thread Abhishek Verma
Hi Cassandra users,

We are trying to upgrade our Cassandra version from 2.2.5 to 3.0.8 (running
on Mesos, but that's besides the point). We have two datacenters, so in
order to preserve our data, we are trying to upgrade one datacenter at a
time.

Initially both DCs (dc1 and dc2) are running 2.2.5. The idea is to tear
down dc1 completely (delete all the data in it), bring it up with 3.0.8,
let data replicate from dc2 to dc1, and then tear down dc2, bring it up
with 3.0.8 and replicate data from dc1.

I am able to reproduce the problem on bare metal clusters running on 3
nodes. I am using Oracle's server-jre-8u74-linux-x64 JRE.

*Node A*: Downloaded 2.2.5-bin.tar.gz, changed the seeds to include its own
IP address, changed listen_address and rpc_address to its own IP and
changed endpoint_snitch to GossipingPropertyFileSnitch. I
changed conf/cassandra-rackdc.properties to
dc=dc2
rack=rack2
This node started up fine and is UN in nodetool status in dc2.

I used CQL shell to create a table and insert 3 rows:
verma@x:~/apache-cassandra-2.2.5$ bin/cqlsh $HOSTNAME
Connected to Test Cluster at x:9042.
[cqlsh 5.0.1 | Cassandra 2.2.5 | CQL spec 3.3.1 | Native protocol v4]
Use HELP for help.
cqlsh> desc tmp

CREATE KEYSPACE tmp WITH replication = {'class': 'NetworkTopologyStrategy',
'dc1': '1', 'dc2': '1'}  AND durable_writes = true;

CREATE TABLE tmp.map (
key text PRIMARY KEY,
value text
)...;
cqlsh> select * from tmp.map;

 key | value
-+---
  k1 |v1
  k3 |v3
  k2 |v2


*Node B:* Downloaded 3.0.8-bin.tar.gz, changed the seeds to include itself
and node A, changed listen_address and rpc_address to its own IP, changed
endpoint_snitch to GossipingPropertyFileSnitch. I did not change
conf/cassandra-rackdc.properties and its contents are
dc=dc1
rack=rack1

In the logs, I see:
INFO  [main] 2016-10-10 22:42:42,850 MessagingService.java:557 - Starting
Messaging Service on /10.164.32.29:7000 (eth0)
INFO  [main] 2016-10-10 22:42:42,864 StorageService.java:784 - This node
will not auto bootstrap because it is configured to be a seed node.

So I start a third node:
*Node C:* Downloaded 3.0.8-bin.tar.gz, changed the seeds to include node A
and node B, changed listen_address and rpc_address to its own IP, changed
endpoint_snitch to GossipingPropertyFileSnitch. I did not change
conf/cassandra-rackdc.properties.
Now, nodetool status shows:

verma@xxx:~/apache-cassandra-3.0.8$ bin/nodetool status
Datacenter: dc1
===
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address   Load   Tokens   Owns (effective)  Host ID
  Rack
UJ 87.81 KB   256  ?
9064832d-ed5c-4c42-ad5a-f754b52b670c  rack1
UN107.72 KB  256  100.0%
 28b1043f-115b-46a5-b6b6-8609829cde76  rack1
Datacenter: dc2
===
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address   Load   Tokens   Owns (effective)  Host ID
  Rack
UN  73.2 KB256  100.0%
 09cc542c-2299-45a5-a4d1-159c239ded37  rack2

Nodetool describe cluster shows:
verma@xxx:~/apache-cassandra-3.0.8$ bin/nodetool describecluster
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
c2a2bb4f-7d31-3fb8-a216-00b41a643650: [, ]

9770e3c5-3135-32e2-b761-65a0f6d8824e: []

Note that there are two schema versions and they don't match.

I see the following in the system.log:

INFO  [InternalResponseStage:1] 2016-10-10 22:48:36,055
ColumnFamilyStore.java:390 - Initializing system_auth.roles
INFO  [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING:
waiting for schema information to complete
INFO  [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING:
schema complete, ready to bootstrap
INFO  [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING:
waiting for pending range calculation
INFO  [main] 2016-10-10 22:48:36,317 StorageService.java:1149 - JOINING:
calculation complete, ready to bootstrap
INFO  [main] 2016-10-10 22:48:36,319 StorageService.java:1149 - JOINING:
getting bootstrap token
INFO  [main] 2016-10-10 22:48:36,357 StorageService.java:1149 - JOINING:
sleeping 3 ms for pending range setup
INFO  [main] 2016-10-10 22:49:06,358 StorageService.java:1149 - JOINING:
Starting to bootstrap...
INFO  [main] 2016-10-10 22:49:06,494 StreamResultFuture.java:87 - [Stream
#bfb5e470-8f3b-11e6-b69a-1b451159408e] Executing streaming plan for
Bootstrap
INFO  [StreamConnectionEstablisher:1] 2016-10-10 22:49:06,495
StreamSession.java:242 - [Stream #bfb5e470-8f3b-11e6-b69a-1b451159408e]
Starting streaming to /
INFO  [StreamConnectionEstablisher:2] 2016-10-10 22:49:06,495
StreamSession.java:242 - [Stream #bfb5e470-8f3b-11e6-b69a-1b451159408e]
Starting streaming to /
INFO  [StreamConnectionEstablisher:2] 2016-10-10 22:49:06,500
StreamCoordinator.java:213 - [Stream #bfb5e470-8f3b-11e6-b69a-1b4511

Pluggable throttling of read and write queries

2017-02-17 Thread Abhishek Verma
Cassandra is being used on a large scale at Uber. We usually create
dedicated clusters for each of our internal use cases, however that is
difficult to scale and manage.

We are investigating the approach of using a single shared cluster with
100s of nodes and handle 10s to 100s of different use cases for different
products in the same cluster. We can define different keyspaces for each of
them, but that does not help in case of noisy neighbors.

Does anybody in the community have similar large shared clusters and/or
face noisy neighbor issues?

Is there a way to throttle read and write queries in Cassandra currently?
If not, what would be the right place in the code to implement a pluggable
interface for doing it. I have briefly considered using triggers, but that
is invoked only in the write path. The initial goal is to have a custom
pluggable class which would be a no-op.

We would like to enforce these rate limits per table and for different
query types (point or range queries, or LWT) separately.

Thank you in advance.

-Abhishek.


Re: Pluggable throttling of read and write queries

2017-02-22 Thread Abhishek Verma
We have lots of dedicated Cassandra clusters for large use cases, but we
have a long tail of (~100) of internal customers who want to store < 200GB
of data with < 5k qps and non-critical data. It does not make sense to
create a 3 node dedicated cluster for each of these small use cases. So we
have a shared cluster into which we onboard these users.

But once in a while, one of the customers will run a ingest job from HDFS
which will pound the shared cluster and break our SLA for the cluster for
all the other customers. Currently, I don't see anyway to signal back
pressure to the ingestion jobs or throttle their requests. Another example
is one customer doing a large number of range queries which has the same
effect.

A simple way to avoid this is to throttle the read or write requests based
on some quota limits for each keyspace or user.

Please see replies inlined:

On Mon, Feb 20, 2017 at 11:46 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Aren't you using mesos Cassandra framework to manage your multiple
> clusters ? (Seen a presentation in cass summit)
>
Yes we are using https://github.com/mesosphere/dcos-cassandra-service and
contribute heavily to it. I am aware of the presentation (
https://www.youtube.com/watch?v=4Ap-1VT2ChU) at the Cassandra summit as I
was the one who gave it :)
This has helped us automate the creation and management of these clusters.

> What's wrong with your current mesos approach ?
>
Hardware efficiency: Spinning up dedicated clusters for each use case
wastes a lot of hardware resources. One of the approaches we have taken is
spinning up multiple Cassandra nodes belonging to different clusters on the
same physical machine. However, we still have overhead of managing these
separate multi-tenant clusters.

> I am also thinking it's better to split a large cluster into smallers
> except if you also manage client layer that query cass and you can put some
> backpressure or rate limit in it.
>
We have an internal storage API layer that some of the clients use, but
there are many customers who use the vanilla DataStax Java or Python
driver. Implementing throttling in each of those clients does not seem like
a viable approach.

Le 21 févr. 2017 2:46 AM, "Edward Capriolo"  a
> écrit :
>
>> Older versions had a request scheduler api.
>
> I am not aware of the history behind it. Can you please point me to the
JIRA tickets and/or why it was removed?

On Monday, February 20, 2017, Ben Slater  wrote:
>>
>>> We’ve actually had several customers where we’ve done the opposite -
>>> split large clusters apart to separate uses cases. We found that this
>>> allowed us to better align hardware with use case requirements (for example
>>> using AWS c3.2xlarge for very hot data at low latency, m4.xlarge for more
>>> general purpose data) we can also tune JVM settings, etc to meet those uses
>>> cases.
>>>
>> There have been several instances where we have moved customers out of
the shared cluster to their own dedicated clusters because they outgrew our
limitations. But I don't think it makes sense to move all the small use
cases into their separate clusters.

On Mon, 20 Feb 2017 at 22:21 Oleksandr Shulgin 
>>> wrote:
>>>
>>>> On Sat, Feb 18, 2017 at 3:12 AM, Abhishek Verma  wrote:
>>>>
>>>>> Cassandra is being used on a large scale at Uber. We usually create
>>>>> dedicated clusters for each of our internal use cases, however that is
>>>>> difficult to scale and manage.
>>>>>
>>>>> We are investigating the approach of using a single shared cluster
>>>>> with 100s of nodes and handle 10s to 100s of different use cases for
>>>>> different products in the same cluster. We can define different keyspaces
>>>>> for each of them, but that does not help in case of noisy neighbors.
>>>>>
>>>>> Does anybody in the community have similar large shared clusters
>>>>> and/or face noisy neighbor issues?
>>>>>
>>>>
>>>> Hi,
>>>>
>>>> We've never tried this approach and given my limited experience I would
>>>> find this a terrible idea from the perspective of maintenance (remember the
>>>> old saying about basket and eggs?)
>>>>
>>> What if you have a limited number of baskets and several eggs which are
not critical if they break rarely.


> What potential benefits do you see?
>>>>
>>> The main benefit of sharing a single cluster among several small use
cases is increasing the hardware efficiency and decreasing the management
overhead of a large number of clusters.

Thanks everyone for your replies and questions.

-Abhishek.