This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 50c314eaec Consensus groups for range movements can omit future owners
of merging ranges
50c314eaec is described below
commit 50c314eaec2c6e6c196d8dc77607a3a411c53e0a
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Mon Mar 16 15:15:23 2026 +0000
Consensus groups for range movements can omit future owners of merging
ranges
* Expand affected ranges to include removals
* Don't create waiters for replica groups of future ranges not yet in
placements
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-21142
---
CHANGES.txt | 1 +
.../tcm/ownership/PlacementTransitionPlan.java | 5 ++-
.../cassandra/tcm/sequences/ProgressBarrier.java | 6 +++
.../tcm/sequences/ProgressBarrierTest.java | 49 +++++++++++++++++++++-
4 files changed, 59 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c7b5a55a5..d42a73c539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Improve construction of consensus groups for range movements
(CASSANDRA-21142)
* Support compaction_read_disk_access_mode for cursor-based compaction
(CASSANDRA-21147)
* Allow value/element indexing on frozen collections in SAI (CASSANDRA-18492)
* Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
diff --git
a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
index aecab7f445..b012d7ac04 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
@@ -110,29 +110,32 @@ public class PlacementTransitionPlan
toSplit.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
+ delta.reads.removals.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
});
toMaximal.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
+ delta.reads.removals.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
addToWrites.put(replication, delta.onlyWrites());
moveReads.put(replication, delta.onlyReads());
});
toFinal.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
+ delta.reads.removals.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
moveReads.put(replication, delta.onlyReads());
removeFromWrites.put(replication, delta.onlyWrites());
});
toMerged.forEach((replication, delta) -> {
delta.reads.additions.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
+ delta.reads.removals.flattenValues().forEach(r ->
affectedRanges.add(replication, r.range()));
removeFromWrites.put(replication, delta);
});
this.addToWrites = addToWrites.build();
this.moveReads = moveReads.build();
this.removeFromWrites = removeFromWrites.build();
this.affectedRanges = affectedRanges.build();
-
}
@Override
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
index 7b89acceb6..db728ef67e 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
@@ -172,6 +172,12 @@ public class ProgressBarrier
{
EndpointsForRange writes =
metadata.placements.get(params).writes.matchRange(range).get().filter(r ->
filter.test(r.endpoint()));
EndpointsForRange reads =
metadata.placements.get(params).reads.matchRange(range).get().filter(r ->
filter.test(r.endpoint()));
+ // Affected ranges can contain ranges which are the results of
merging or splitting and may not exist
+ // as keys in the existing ReplicaGroups. As such, no replicas
will be found for these ranges and so no
+ // WaitFor is necessary.
+ if (reads.isEmpty() && writes.isEmpty())
+ continue;
+
reads.stream().map(Replica::endpoint).forEach(superset::add);
writes.stream().map(Replica::endpoint).forEach(superset::add);
diff --git
a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
index ba431db848..cb4f15e5ef 100644
--- a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
+++ b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -57,12 +58,16 @@ import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
+import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.UnsafeJoin;
import org.apache.cassandra.utils.concurrent.Future;
+import static org.junit.Assert.assertEquals;
+
public class ProgressBarrierTest extends CMSTestBase
{
static
@@ -72,6 +77,48 @@ public class ProgressBarrierTest extends CMSTestBase
DatabaseDescriptor.setProgressBarrierBackoff(5);
}
+ @Test
+ public void testProgressBarrierWithMergingRanges()
+ {
+ TokenPlacementModel.ReplicationFactor rf = new
TokenPlacementModel.SimpleReplicationFactor(1);
+ try (CMSTestBase.CMSSut sut = new
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, rf))
+ {
+ TokenPlacementModel.NodeFactory nodeFactory =
TokenPlacementModel.nodeFactory();
+ List<TokenPlacementModel.Node> nodes = new ArrayList<>();
+ for (int i = 1; i < 6; i++)
+ {
+ TokenPlacementModel.Node node = nodeFactory.make(i, 1,
1).overrideToken(i*100);
+ nodes.add(node);
+ sut.service.commit(new Register(new NodeAddresses(node.addr()),
+ new Location(node.dc(),
node.rack()),
+ NodeVersion.CURRENT));
+ sut.service.commit(new UnsafeJoin(node.nodeId(),
+
Collections.singleton(node.longToken()),
+
ClusterMetadataService.instance().placementProvider()));
+ }
+
+ // 6 node cluster, with a single RF1 keyspace
+ // node2 owns range (100,200] & node3 (200,300]
+ // if node2 leaves node3 will acquire its ranges and will then own
(100, 300]
+ // no other peers are involved in this trivial operation
+ // the progress barrier should be looking for consensus from
(nodes2, node3)
+ TokenPlacementModel.Node node2 = nodes.get(1);
+ TokenPlacementModel.Node node3 = nodes.get(2);
+ sut.service.commit(new PrepareLeave(node2.nodeId(),
+ true,
+
ClusterMetadataService.instance().placementProvider(),
+
LeaveStreams.Kind.UNBOOTSTRAP));
+ UnbootstrapAndLeave leave = (UnbootstrapAndLeave)
sut.service.metadata().inProgressSequences.get(node2.nodeId());
+
+ // Internally affectedRanges::toPeers uses the same logic as
+ // the progress barrier does to identify the consensus group
+ Set<NodeId> consensusGroup =
leave.barrier().affectedRanges.toPeers(ReplicationParams.simple(1),
+
sut.service.metadata().placements,
+
sut.service.metadata().directory);
+ assertEquals(Set.of(node2.nodeId(), node3.nodeId()),
consensusGroup);
+ }
+ }
+
@Test
public void testProgressBarrier() throws Throwable
{
@@ -143,7 +190,7 @@ public class ProgressBarrierTest extends CMSTestBase
if (respond.get())
{
responded.add(to);
- cb.onResponse((Message<RSP>)
message.responseWith(message.epoch()));
+ cb.onResponse((Message<RSP>)
message.responseWith(message.epoch()).withFrom(to));
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]