Bootstrapping data from Cassandra 2.2.5 datacenter to 3.0.8 datacenter fails because of streaming errors
Hi Cassandra users, We are trying to upgrade our Cassandra version from 2.2.5 to 3.0.8 (running on Mesos, but that's besides the point). We have two datacenters, so in order to preserve our data, we are trying to upgrade one datacenter at a time. Initially both DCs (dc1 and dc2) are running 2.2.5. The idea is to tear down dc1 completely (delete all the data in it), bring it up with 3.0.8, let data replicate from dc2 to dc1, and then tear down dc2, bring it up with 3.0.8 and replicate data from dc1. I am able to reproduce the problem on bare metal clusters running on 3 nodes. I am using Oracle's server-jre-8u74-linux-x64 JRE. *Node A*: Downloaded 2.2.5-bin.tar.gz, changed the seeds to include its own IP address, changed listen_address and rpc_address to its own IP and changed endpoint_snitch to GossipingPropertyFileSnitch. I changed conf/cassandra-rackdc.properties to dc=dc2 rack=rack2 This node started up fine and is UN in nodetool status in dc2. I used CQL shell to create a table and insert 3 rows: verma@x:~/apache-cassandra-2.2.5$ bin/cqlsh $HOSTNAME Connected to Test Cluster at x:9042. [cqlsh 5.0.1 | Cassandra 2.2.5 | CQL spec 3.3.1 | Native protocol v4] Use HELP for help. cqlsh> desc tmp CREATE KEYSPACE tmp WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': '1', 'dc2': '1'} AND durable_writes = true; CREATE TABLE tmp.map ( key text PRIMARY KEY, value text )...; cqlsh> select * from tmp.map; key | value -+--- k1 |v1 k3 |v3 k2 |v2 *Node B:* Downloaded 3.0.8-bin.tar.gz, changed the seeds to include itself and node A, changed listen_address and rpc_address to its own IP, changed endpoint_snitch to GossipingPropertyFileSnitch. I did not change conf/cassandra-rackdc.properties and its contents are dc=dc1 rack=rack1 In the logs, I see: INFO [main] 2016-10-10 22:42:42,850 MessagingService.java:557 - Starting Messaging Service on /10.164.32.29:7000 (eth0) INFO [main] 2016-10-10 22:42:42,864 StorageService.java:784 - This node will not auto bootstrap because it is configured to be a seed node. So I start a third node: *Node C:* Downloaded 3.0.8-bin.tar.gz, changed the seeds to include node A and node B, changed listen_address and rpc_address to its own IP, changed endpoint_snitch to GossipingPropertyFileSnitch. I did not change conf/cassandra-rackdc.properties. Now, nodetool status shows: verma@xxx:~/apache-cassandra-3.0.8$ bin/nodetool status Datacenter: dc1 === Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns (effective) Host ID Rack UJ 87.81 KB 256 ? 9064832d-ed5c-4c42-ad5a-f754b52b670c rack1 UN107.72 KB 256 100.0% 28b1043f-115b-46a5-b6b6-8609829cde76 rack1 Datacenter: dc2 === Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Tokens Owns (effective) Host ID Rack UN 73.2 KB256 100.0% 09cc542c-2299-45a5-a4d1-159c239ded37 rack2 Nodetool describe cluster shows: verma@xxx:~/apache-cassandra-3.0.8$ bin/nodetool describecluster Cluster Information: Name: Test Cluster Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch Partitioner: org.apache.cassandra.dht.Murmur3Partitioner Schema versions: c2a2bb4f-7d31-3fb8-a216-00b41a643650: [, ] 9770e3c5-3135-32e2-b761-65a0f6d8824e: [] Note that there are two schema versions and they don't match. I see the following in the system.log: INFO [InternalResponseStage:1] 2016-10-10 22:48:36,055 ColumnFamilyStore.java:390 - Initializing system_auth.roles INFO [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING: waiting for schema information to complete INFO [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING: schema complete, ready to bootstrap INFO [main] 2016-10-10 22:48:36,316 StorageService.java:1149 - JOINING: waiting for pending range calculation INFO [main] 2016-10-10 22:48:36,317 StorageService.java:1149 - JOINING: calculation complete, ready to bootstrap INFO [main] 2016-10-10 22:48:36,319 StorageService.java:1149 - JOINING: getting bootstrap token INFO [main] 2016-10-10 22:48:36,357 StorageService.java:1149 - JOINING: sleeping 3 ms for pending range setup INFO [main] 2016-10-10 22:49:06,358 StorageService.java:1149 - JOINING: Starting to bootstrap... INFO [main] 2016-10-10 22:49:06,494 StreamResultFuture.java:87 - [Stream #bfb5e470-8f3b-11e6-b69a-1b451159408e] Executing streaming plan for Bootstrap INFO [StreamConnectionEstablisher:1] 2016-10-10 22:49:06,495 StreamSession.java:242 - [Stream #bfb5e470-8f3b-11e6-b69a-1b451159408e] Starting streaming to / INFO [StreamConnectionEstablisher:2] 2016-10-10 22:49:06,495 StreamSession.java:242 - [Stream #bfb5e470-8f3b-11e6-b69a-1b451159408e] Starting streaming to / INFO [StreamConnectionEstablisher:2] 2016-10-10 22:49:06,500 StreamCoordinator.java:213 - [Stream #bfb5e470-8f3b-11e6-b69a-1b4511
Pluggable throttling of read and write queries
Cassandra is being used on a large scale at Uber. We usually create dedicated clusters for each of our internal use cases, however that is difficult to scale and manage. We are investigating the approach of using a single shared cluster with 100s of nodes and handle 10s to 100s of different use cases for different products in the same cluster. We can define different keyspaces for each of them, but that does not help in case of noisy neighbors. Does anybody in the community have similar large shared clusters and/or face noisy neighbor issues? Is there a way to throttle read and write queries in Cassandra currently? If not, what would be the right place in the code to implement a pluggable interface for doing it. I have briefly considered using triggers, but that is invoked only in the write path. The initial goal is to have a custom pluggable class which would be a no-op. We would like to enforce these rate limits per table and for different query types (point or range queries, or LWT) separately. Thank you in advance. -Abhishek.
Re: Pluggable throttling of read and write queries
We have lots of dedicated Cassandra clusters for large use cases, but we have a long tail of (~100) of internal customers who want to store < 200GB of data with < 5k qps and non-critical data. It does not make sense to create a 3 node dedicated cluster for each of these small use cases. So we have a shared cluster into which we onboard these users. But once in a while, one of the customers will run a ingest job from HDFS which will pound the shared cluster and break our SLA for the cluster for all the other customers. Currently, I don't see anyway to signal back pressure to the ingestion jobs or throttle their requests. Another example is one customer doing a large number of range queries which has the same effect. A simple way to avoid this is to throttle the read or write requests based on some quota limits for each keyspace or user. Please see replies inlined: On Mon, Feb 20, 2017 at 11:46 PM, vincent gromakowski < vincent.gromakow...@gmail.com> wrote: > Aren't you using mesos Cassandra framework to manage your multiple > clusters ? (Seen a presentation in cass summit) > Yes we are using https://github.com/mesosphere/dcos-cassandra-service and contribute heavily to it. I am aware of the presentation ( https://www.youtube.com/watch?v=4Ap-1VT2ChU) at the Cassandra summit as I was the one who gave it :) This has helped us automate the creation and management of these clusters. > What's wrong with your current mesos approach ? > Hardware efficiency: Spinning up dedicated clusters for each use case wastes a lot of hardware resources. One of the approaches we have taken is spinning up multiple Cassandra nodes belonging to different clusters on the same physical machine. However, we still have overhead of managing these separate multi-tenant clusters. > I am also thinking it's better to split a large cluster into smallers > except if you also manage client layer that query cass and you can put some > backpressure or rate limit in it. > We have an internal storage API layer that some of the clients use, but there are many customers who use the vanilla DataStax Java or Python driver. Implementing throttling in each of those clients does not seem like a viable approach. Le 21 févr. 2017 2:46 AM, "Edward Capriolo" a > écrit : > >> Older versions had a request scheduler api. > > I am not aware of the history behind it. Can you please point me to the JIRA tickets and/or why it was removed? On Monday, February 20, 2017, Ben Slater wrote: >> >>> We’ve actually had several customers where we’ve done the opposite - >>> split large clusters apart to separate uses cases. We found that this >>> allowed us to better align hardware with use case requirements (for example >>> using AWS c3.2xlarge for very hot data at low latency, m4.xlarge for more >>> general purpose data) we can also tune JVM settings, etc to meet those uses >>> cases. >>> >> There have been several instances where we have moved customers out of the shared cluster to their own dedicated clusters because they outgrew our limitations. But I don't think it makes sense to move all the small use cases into their separate clusters. On Mon, 20 Feb 2017 at 22:21 Oleksandr Shulgin >>> wrote: >>> >>>> On Sat, Feb 18, 2017 at 3:12 AM, Abhishek Verma wrote: >>>> >>>>> Cassandra is being used on a large scale at Uber. We usually create >>>>> dedicated clusters for each of our internal use cases, however that is >>>>> difficult to scale and manage. >>>>> >>>>> We are investigating the approach of using a single shared cluster >>>>> with 100s of nodes and handle 10s to 100s of different use cases for >>>>> different products in the same cluster. We can define different keyspaces >>>>> for each of them, but that does not help in case of noisy neighbors. >>>>> >>>>> Does anybody in the community have similar large shared clusters >>>>> and/or face noisy neighbor issues? >>>>> >>>> >>>> Hi, >>>> >>>> We've never tried this approach and given my limited experience I would >>>> find this a terrible idea from the perspective of maintenance (remember the >>>> old saying about basket and eggs?) >>>> >>> What if you have a limited number of baskets and several eggs which are not critical if they break rarely. > What potential benefits do you see? >>>> >>> The main benefit of sharing a single cluster among several small use cases is increasing the hardware efficiency and decreasing the management overhead of a large number of clusters. Thanks everyone for your replies and questions. -Abhishek.