This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 50c314eaec Consensus groups for range movements can omit future owners 
of merging ranges
50c314eaec is described below

commit 50c314eaec2c6e6c196d8dc77607a3a411c53e0a
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Mon Mar 16 15:15:23 2026 +0000

    Consensus groups for range movements can omit future owners of merging 
ranges
    
    * Expand affected ranges to include removals
    * Don't create waiters for replica groups of future ranges not yet in 
placements
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
    CASSANDRA-21142
---
 CHANGES.txt                                        |  1 +
 .../tcm/ownership/PlacementTransitionPlan.java     |  5 ++-
 .../cassandra/tcm/sequences/ProgressBarrier.java   |  6 +++
 .../tcm/sequences/ProgressBarrierTest.java         | 49 +++++++++++++++++++++-
 4 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 1c7b5a55a5..d42a73c539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Improve construction of consensus groups for range movements 
(CASSANDRA-21142)
  * Support compaction_read_disk_access_mode for cursor-based compaction 
(CASSANDRA-21147)
  * Allow value/element indexing on frozen collections in SAI (CASSANDRA-18492)
  * Add tool to offline dump cluster metadata and the log (CASSANDRA-21129)
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java 
b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
index aecab7f445..b012d7ac04 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
@@ -110,29 +110,32 @@ public class PlacementTransitionPlan
 
         toSplit.forEach((replication, delta) -> {
             delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            delta.reads.removals.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
         });
 
         toMaximal.forEach((replication, delta) -> {
             delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            delta.reads.removals.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
             addToWrites.put(replication, delta.onlyWrites());
             moveReads.put(replication, delta.onlyReads());
         });
 
         toFinal.forEach((replication, delta) -> {
             delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            delta.reads.removals.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
             moveReads.put(replication, delta.onlyReads());
             removeFromWrites.put(replication, delta.onlyWrites());
         });
 
         toMerged.forEach((replication, delta) -> {
             delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            delta.reads.removals.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
             removeFromWrites.put(replication, delta);
         });
         this.addToWrites = addToWrites.build();
         this.moveReads = moveReads.build();
         this.removeFromWrites = removeFromWrites.build();
         this.affectedRanges = affectedRanges.build();
-
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java 
b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
index 7b89acceb6..db728ef67e 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ProgressBarrier.java
@@ -172,6 +172,12 @@ public class ProgressBarrier
             {
                 EndpointsForRange writes = 
metadata.placements.get(params).writes.matchRange(range).get().filter(r -> 
filter.test(r.endpoint()));
                 EndpointsForRange reads = 
metadata.placements.get(params).reads.matchRange(range).get().filter(r -> 
filter.test(r.endpoint()));
+                // Affected ranges can contain ranges which are the results of 
merging or splitting and may not exist
+                // as keys in the existing ReplicaGroups. As such, no replicas 
will be found for these ranges and so no
+                // WaitFor is necessary.
+                if (reads.isEmpty() && writes.isEmpty())
+                    continue;
+
                 reads.stream().map(Replica::endpoint).forEach(superset::add);
                 writes.stream().map(Replica::endpoint).forEach(superset::add);
 
diff --git 
a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java 
b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
index ba431db848..cb4f15e5ef 100644
--- a/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
+++ b/test/unit/org/apache/cassandra/tcm/sequences/ProgressBarrierTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.net.ConnectionType;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -57,12 +58,16 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.MultiStepOperation;
 import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
+import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.UnsafeJoin;
 import org.apache.cassandra.utils.concurrent.Future;
 
+import static org.junit.Assert.assertEquals;
+
 public class ProgressBarrierTest extends CMSTestBase
 {
     static
@@ -72,6 +77,48 @@ public class ProgressBarrierTest extends CMSTestBase
         DatabaseDescriptor.setProgressBarrierBackoff(5);
     }
 
+    @Test
+    public void testProgressBarrierWithMergingRanges()
+    {
+        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(1);
+        try (CMSTestBase.CMSSut sut = new 
CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, rf))
+        {
+            TokenPlacementModel.NodeFactory nodeFactory = 
TokenPlacementModel.nodeFactory();
+            List<TokenPlacementModel.Node> nodes = new ArrayList<>();
+            for (int i = 1; i < 6; i++)
+            {
+                TokenPlacementModel.Node node = nodeFactory.make(i, 1, 
1).overrideToken(i*100);
+                nodes.add(node);
+                sut.service.commit(new Register(new NodeAddresses(node.addr()),
+                                                new Location(node.dc(), 
node.rack()),
+                                                NodeVersion.CURRENT));
+                sut.service.commit(new UnsafeJoin(node.nodeId(),
+                                                  
Collections.singleton(node.longToken()),
+                                                  
ClusterMetadataService.instance().placementProvider()));
+            }
+
+            // 6 node cluster, with a single RF1 keyspace
+            // node2 owns range (100,200] & node3 (200,300]
+            // if node2 leaves node3 will acquire its ranges and will then own 
(100, 300]
+            // no other peers are involved in this trivial operation
+            // the progress barrier should be looking for consensus from 
(nodes2, node3)
+            TokenPlacementModel.Node node2 = nodes.get(1);
+            TokenPlacementModel.Node node3 = nodes.get(2);
+            sut.service.commit(new PrepareLeave(node2.nodeId(),
+                                                true,
+                                                
ClusterMetadataService.instance().placementProvider(),
+                                                
LeaveStreams.Kind.UNBOOTSTRAP));
+            UnbootstrapAndLeave leave = (UnbootstrapAndLeave) 
sut.service.metadata().inProgressSequences.get(node2.nodeId());
+
+            // Internally affectedRanges::toPeers uses the same logic as
+            // the progress barrier does to identify the consensus group
+            Set<NodeId> consensusGroup = 
leave.barrier().affectedRanges.toPeers(ReplicationParams.simple(1),
+                                                                               
 sut.service.metadata().placements,
+                                                                               
 sut.service.metadata().directory);
+            assertEquals(Set.of(node2.nodeId(), node3.nodeId()), 
consensusGroup);
+        }
+    }
+
     @Test
     public void testProgressBarrier() throws Throwable
     {
@@ -143,7 +190,7 @@ public class ProgressBarrierTest extends CMSTestBase
                             if (respond.get())
                             {
                                 responded.add(to);
-                                cb.onResponse((Message<RSP>) 
message.responseWith(message.epoch()));
+                                cb.onResponse((Message<RSP>) 
message.responseWith(message.epoch()).withFrom(to));
                             }
                             else
                             {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to