This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4454ab8699 Improve writePlacementsAllSettled performance in large
clusters with many ongoing range movements
4454ab8699 is described below
commit 4454ab86994b486a399937158ffbaa2837846af9
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Tue Mar 17 11:11:22 2026 +0000
Improve writePlacementsAllSettled performance in large clusters with many
ongoing range movements
Only inflight MSOs which affect the local node need to be
applied to determine the settled local ranges
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-21144
---
CHANGES.txt | 1 +
.../apache/cassandra/db/DiskBoundaryManager.java | 13 +-
.../apache/cassandra/service/StorageService.java | 2 +-
.../service/paxos/cleanup/PaxosCleanup.java | 2 +-
.../org/apache/cassandra/tcm/ClusterMetadata.java | 79 +++++-
.../apache/cassandra/tcm/MultiStepOperation.java | 43 +++
.../org/apache/cassandra/tcm/ownership/Delta.java | 10 +
.../cassandra/tcm/ownership/PlacementDeltas.java | 11 +
.../apache/cassandra/tcm/sequences/AddToCMS.java | 7 +
.../cassandra/tcm/sequences/BootstrapAndJoin.java | 12 +
.../tcm/sequences/BootstrapAndReplace.java | 11 +
.../cassandra/tcm/sequences/DropAccordTable.java | 9 +
.../org/apache/cassandra/tcm/sequences/Move.java | 12 +
.../cassandra/tcm/sequences/ReconfigureCMS.java | 7 +
.../tcm/sequences/UnbootstrapAndLeave.java | 14 +
.../tcm/transformations/ApplyPlacementDeltas.java | 10 +
.../test/log/ClusterMetadataTestHelper.java | 18 +-
.../test/log/MetadataChangeSimulationTest.java | 19 +-
.../microbench/LocalRangesAllSettledBench.java | 143 ++++++++++
.../CASSANDRA-21144_clustermetadata.gz | Bin 0 -> 1402477 bytes
...NewTransformationVersionCompatibilityTest.java} | 63 +----
.../tcm/ownership/LocalRangesAllSettledTest.java | 287 +++++++++++++++++++++
.../cassandra/tcm/ownership/OwnershipUtils.java | 17 ++
.../cassandra/tcm/ownership/RangeSetMap.java | 101 ++++++++
.../tcm/sequences/DropAccordTableTest.java | 8 +-
25 files changed, 805 insertions(+), 94 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c0ff70a6e..f9c6004d36 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Improve performance when calculating settled placements during range
movements (CASSANDRA-21144)
* Make shadow gossip round parameters configurable for testing
(CASSANDRA-21149)
* Avoid potential gossip thread deadlock during decommission (CASSANDRA-21143)
* Improve construction of consensus groups for range movements
(CASSANDRA-21142)
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 1b35423fe9..e1059f485e 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -36,8 +36,6 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
-import org.apache.cassandra.utils.FBUtilities;
public class DiskBoundaryManager
{
@@ -143,22 +141,17 @@ public class DiskBoundaryManager
private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs,
ClusterMetadata metadata)
{
- RangesAtEndpoint localRanges;
- DataPlacement placement;
- if (StorageService.instance.isBootstrapMode()
- && !StorageService.isReplacingSameAddress()) // When replacing
same address, the node marks itself as UN locally
+ if (StorageService.instance.isBootstrapMode() &&
!StorageService.isReplacingSameAddress()) // When replacing same address, the
node marks itself as UN locally
{
- placement =
metadata.placements.get(cfs.keyspace.getMetadata().params.replication);
+ return metadata.localWriteRanges(cfs.keyspace.getMetadata());
}
else
{
// Reason we use the future settled metadata is that if we
decommission a node, we want to stream
// from that node to the correct location on disk, if we didn't,
we would put new files in the wrong places.
// We do this to minimize the amount of data we need to move in
rebalancedisks once everything settled
- placement =
metadata.writePlacementAllSettled(cfs.keyspace.getMetadata());
+ return
metadata.localWriteRangesAllSettled(cfs.keyspace.getMetadata());
}
- localRanges =
placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort());
- return localRanges;
}
/**
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index bac2342c12..424df75b97 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -418,7 +418,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public Collection<Range<Token>> getLocalAndPendingRanges(String ks)
{
- return
ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata());
+ return
ClusterMetadata.current().localWriteRanges(Keyspace.open(ks).getMetadata()).ranges();
}
public OwnedRanges getNormalizedLocalRanges(String keyspaceName,
InetAddressAndPort broadcastAddress)
diff --git
a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
index fb92038cbf..660dca619b 100644
--- a/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
+++ b/src/java/org/apache/cassandra/service/paxos/cleanup/PaxosCleanup.java
@@ -119,7 +119,7 @@ public class PaxosCleanup extends AsyncFuture<Void>
implements Runnable
private static boolean isOutOfRange(SharedContext ctx, String ksName,
Collection<Range<Token>> repairRanges)
{
Keyspace keyspace = Keyspace.open(ksName);
- Collection<Range<Token>> localRanges =
Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(),
ctx.broadcastAddressAndPort()));
+ Collection<Range<Token>> localRanges =
Range.normalize(ClusterMetadata.current().writeRanges(keyspace.getMetadata(),
ctx.broadcastAddressAndPort()).ranges());
for (Range<Token> repairRange : Range.normalize(repairRanges))
{
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index fa2d8f5ee5..4a5f67c1fc 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -32,6 +32,7 @@ import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -52,6 +53,7 @@ import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Locator;
import org.apache.cassandra.locator.MetaStrategy;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.CMSIdentifierMismatchException;
import org.apache.cassandra.schema.DistributedSchema;
@@ -73,7 +75,6 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
@@ -119,7 +120,8 @@ public class ClusterMetadata
private EndpointsForRange fullCMSReplicas;
private Set<InetAddressAndPort> fullCMSEndpoints;
private Set<NodeId> fullCMSIds;
- private DataPlacements writePlacementAllSettled;
+ private volatile Map<ReplicationParams, RangesAtEndpoint>
localRangesAllSettled = null;
+ private static final RangesAtEndpoint EMPTY_LOCAL_RANGES =
RangesAtEndpoint.empty(FBUtilities.getBroadcastAddressAndPort());
public ClusterMetadata(IPartitioner partitioner)
{
@@ -317,21 +319,72 @@ public class ClusterMetadata
return epoch.nextEpoch();
}
- public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
+ public RangesAtEndpoint localWriteRangesAllSettled(KeyspaceMetadata ksm)
{
- if (writePlacementAllSettled == null)
+ // Local strategy ranges are constant
+ if (ksm.params.replication.isLocal())
+ return localWriteRanges(ksm);
+
+ if (localRangesAllSettled != null)
+ return localRangesAllSettled.getOrDefault(ksm.params.replication,
EMPTY_LOCAL_RANGES);
+
+ NodeId localId = myNodeId();
+ synchronized (this)
{
- ClusterMetadata metadata = this;
- Iterator<MultiStepOperation<?>> iter =
metadata.inProgressSequences.iterator();
- while (iter.hasNext())
+ if (localRangesAllSettled != null)
+ return
localRangesAllSettled.getOrDefault(ksm.params.replication, EMPTY_LOCAL_RANGES);
+
+ Map<ReplicationParams, RangesAtEndpoint> builder =
Maps.newHashMapWithExpectedSize(this.placements.size());
+ DataPlacements settled = placementsAllSettledForNode(localId);
+ settled.forEach((replication, placement) -> {
+ builder.put(replication,
placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()));
+ });
+ localRangesAllSettled = builder;
+ }
+ return localRangesAllSettled.getOrDefault(ksm.params.replication,
EMPTY_LOCAL_RANGES);
+ }
+
+ /**
+ * Run through all inflight MultiStepOperations and if any impact the
specifed node, apply their metadata
+ * transformations. Only used outside of tests by @{link
localWriteRangesAllSettled} to identify how placements for
+ * the local node will be affected by in flight operations. In that case,
the result is cached so this should be
+ * called at most once for a given ClusterMetadata instance.
+ */
+ @VisibleForTesting
+ public DataPlacements placementsAllSettledForNode(NodeId peer)
+ {
+ Iterator<MultiStepOperation<?>> iter = inProgressSequences.iterator();
+ ClusterMetadata metadata = this;
+ while (iter.hasNext())
+ {
+ MultiStepOperation<?> operation = iter.next();
+ // Check whether the MSO materially affects the local ranges of
the target node.
+ boolean isRelevantOperation =
operationAffectsLocalRangesOfPeer(peer,
+
operation,
+
metadata.directory);
+ if (isRelevantOperation)
{
- Transformation.Result result = iter.next().applyTo(metadata);
+ logger.debug("Operation {} affects node {}, calculating local
ranges after application",
+ operation.sequenceKey(), peer);
+ Transformation.Result result = operation.applyTo(metadata);
assert result.isSuccess();
metadata = result.success().metadata;
}
- writePlacementAllSettled = metadata.placements;
}
- return writePlacementAllSettled.get(ksm.params.replication);
+ return metadata.placements;
+ }
+
+ public static boolean operationAffectsLocalRangesOfPeer(NodeId peer,
+
MultiStepOperation<?> operation,
+ Directory
directory)
+ {
+ return operation.affectedPeers(directory).contains(peer);
+ }
+
+ @VisibleForTesting
+ public void unsafeClearLocalRangesAllSettled()
+ {
+ localRangesAllSettled = null;
}
// TODO Remove this as it isn't really an equivalent to the previous
concept of pending ranges
@@ -352,14 +405,14 @@ public class ClusterMetadata
return
!writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
}
- public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
+ public RangesAtEndpoint localWriteRanges(KeyspaceMetadata metadata)
{
return writeRanges(metadata, FBUtilities.getBroadcastAddressAndPort());
}
- public Collection<Range<Token>> writeRanges(KeyspaceMetadata metadata,
InetAddressAndPort peer)
+ public RangesAtEndpoint writeRanges(KeyspaceMetadata metadata,
InetAddressAndPort peer)
{
- return
placements.get(metadata.params.replication).writes.byEndpoint().get(peer).ranges();
+ return
placements.get(metadata.params.replication).writes.byEndpoint().get(peer);
}
// TODO Remove this as it isn't really an equivalent to the previous
concept of pending ranges
diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
index 7083f27b64..03164243b7 100644
--- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
+++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
@@ -19,9 +19,17 @@
package org.apache.cassandra.tcm;
import java.util.List;
+import java.util.Set;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.sequences.AddToCMS;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
@@ -57,6 +65,8 @@ import
org.apache.cassandra.tcm.serialization.MetadataSerializer;
*/
public abstract class MultiStepOperation<CONTEXT>
{
+ private static final Logger logger =
LoggerFactory.getLogger(MultiStepOperation.class);
+
public enum Kind
{
@Deprecated(since = "CEP-21")
@@ -155,6 +165,39 @@ public abstract class MultiStepOperation<CONTEXT>
*/
public abstract Transformation.Result applyTo(ClusterMetadata metadata);
+ /**
+ * Return the id of any peer known to be involved in the execution of the
operation.
+ * For example, in the case of a new node bootstrapping this would include
all current and proposed replicas of the
+ * affected ranges.
+ * Important: this currently requires a Directory to be supplied as many
MSO implementations are endpoint-centric
+ * The directory is used to convert endpoints to node ids, but this will
become unnecessary as placements & deltas
+ * evolve away from endpoints to use ids directly.
+ * @return Node ids of the peers involved in the operation
+ */
+ public abstract Set<NodeId> affectedPeers(Directory directory);
+
+ /**
+ * Helper method for affectedPeers implementations to convert from
endpoints to node ids
+ * @return set of node ids which map to the supplied endpoints using the
directory. Any endpoints without a
+ * corresponding id are ignored
+ */
+ protected Set<NodeId> endpointsToIds(Set<InetAddressAndPort> endpoints,
Directory directory)
+ {
+ Set<NodeId> affectedNodes =
Sets.newHashSetWithExpectedSize(endpoints.size());
+ for (InetAddressAndPort endpoint : endpoints)
+ {
+ NodeId id = directory.peerId(endpoint);
+ // TODO should we error here?
+ if (id == null)
+ logger.warn("No node id found for endpoint {} in directory
with epoch {} " +
+ "by MultiStepOperation {} with sequence key {}",
+ endpoint, directory.lastModified().getEpoch(),
kind(), sequenceKey());
+ else
+ affectedNodes.add(id);
+ }
+ return affectedNodes;
+ }
+
/**
* Helper method for the standard applyTo implementations where we just
execute a list of transformations, starting at `next`
* @return
diff --git a/src/java/org/apache/cassandra/tcm/ownership/Delta.java
b/src/java/org/apache/cassandra/tcm/ownership/Delta.java
index 9660bd255a..38fcdfde96 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/Delta.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/Delta.java
@@ -19,10 +19,13 @@
package org.apache.cassandra.tcm.ownership;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
@@ -52,6 +55,13 @@ public class Delta
return new Delta(removals, RangesByEndpoint.EMPTY);
}
+ public Set<InetAddressAndPort> allEndpoints()
+ {
+ Set<InetAddressAndPort> endpoints = new HashSet<>(removals.keySet());
+ endpoints.addAll(additions.keySet());
+ return endpoints;
+ }
+
/**
* Merges this delta with `other`
*
diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
index 6ba80a854b..760cd0243f 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.tcm.ownership;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
@@ -152,6 +155,14 @@ public class PlacementDeltas extends
ReplicationMap<PlacementDeltas.PlacementDel
return new PlacementDelta(reads.onlyRemovals(),
writes.onlyRemovals());
}
+ // TODO deltas (& placements in general) should deal in node ids, not
endpoints.
+ public Set<InetAddressAndPort> affectedEndpoints()
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new
HashSet<>(reads.allEndpoints());
+ affectedEndpoints.addAll(writes.allEndpoints());
+ return affectedEndpoints;
+ }
+
public DataPlacement apply(Epoch epoch, DataPlacement placement)
{
DataPlacement.Builder builder = placement.unbuild();
diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
index 0d5ee2f067..ec516854a0 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
@@ -125,6 +126,12 @@ public class AddToCMS extends MultiStepOperation<Epoch>
return finishJoin.execute(metadata);
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ return Set.of();
+ }
+
@Override
public SequenceState executeNext()
{
diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
index b410097f84..d2909bd873 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.sequences;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -56,6 +57,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.DataPlacement;
@@ -185,6 +187,16 @@ public class BootstrapAndJoin extends
MultiStepOperation<Epoch>
return applyMultipleTransformations(metadata, next, of(startJoin,
midJoin, finishJoin));
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
+ affectedEndpoints.addAll(startJoin.affectedEndpoints());
+ affectedEndpoints.addAll(midJoin.affectedEndpoints());
+ affectedEndpoints.addAll(finishJoin.affectedEndpoints());
+ return endpointsToIds(affectedEndpoints, directory);
+ }
+
@Override
public SequenceState executeNext()
{
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
index cb34179d44..32afe23491 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndReplace.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.DataPlacement;
@@ -180,6 +181,16 @@ public class BootstrapAndReplace extends
MultiStepOperation<Epoch>
return applyMultipleTransformations(metadata, next, of(startReplace,
midReplace, finishReplace));
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
+ affectedEndpoints.addAll(startReplace.affectedEndpoints());
+ affectedEndpoints.addAll(midReplace.affectedEndpoints());
+ affectedEndpoints.addAll(finishReplace.affectedEndpoints());
+ return endpointsToIds(affectedEndpoints, directory);
+ }
+
@Override
public SequenceState executeNext()
{
diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
index 0a3512296a..c95fe94435 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.sequences;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
@@ -39,6 +40,8 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
@@ -129,6 +132,12 @@ public class DropAccordTable extends
MultiStepOperation<Epoch>
return FINISH_DROP_ACCORD_TABLE;
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ return Set.of();
+ }
+
@Override
public SequenceState executeNext()
{
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index a26d6ded45..a7f316e38f 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.RangesByEndpoint;
import org.apache.cassandra.locator.Replica;
@@ -61,6 +62,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.DataPlacements;
@@ -194,6 +196,16 @@ public class Move extends MultiStepOperation<Epoch>
return applyMultipleTransformations(metadata, next, of(startMove,
midMove, finishMove));
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
+ affectedEndpoints.addAll(startMove.affectedEndpoints());
+ affectedEndpoints.addAll(midMove.affectedEndpoints());
+ affectedEndpoints.addAll(finishMove.affectedEndpoints());
+ return endpointsToIds(affectedEndpoints, directory);
+ }
+
@Override
public SequenceState executeNext()
{
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index ce2daf6b46..39613ce7b8 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.tcm.MetadataKey;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Retry;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.MovementMap;
import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
@@ -145,6 +146,12 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
return next.kind();
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ return Set.of();
+ }
+
@Override
public Transformation.Result applyTo(ClusterMetadata metadata)
{
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
index 83d543a8dd..da8ae74e6a 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
@@ -19,7 +19,9 @@
package org.apache.cassandra.tcm.sequences;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
@@ -31,12 +33,14 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
@@ -156,6 +160,16 @@ public class UnbootstrapAndLeave extends
MultiStepOperation<Epoch>
return applyMultipleTransformations(metadata, next, of(startLeave,
midLeave, finishLeave));
}
+ @Override
+ public Set<NodeId> affectedPeers(Directory directory)
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
+ affectedEndpoints.addAll(startLeave.affectedEndpoints());
+ affectedEndpoints.addAll(midLeave.affectedEndpoints());
+ affectedEndpoints.addAll(finishLeave.affectedEndpoints());
+ return endpointsToIds(affectedEndpoints, directory);
+ }
+
@Override
public SequenceState executeNext()
{
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java
b/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java
index 2a1eaba7e0..10f48609d4 100644
---
a/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java
+++
b/src/java/org/apache/cassandra/tcm/transformations/ApplyPlacementDeltas.java
@@ -19,11 +19,14 @@
package org.apache.cassandra.tcm.transformations;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.exceptions.ExceptionCode;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.NodeId;
@@ -60,6 +63,13 @@ public abstract class ApplyPlacementDeltas implements
Transformation
return delta;
}
+ public Set<InetAddressAndPort> affectedEndpoints()
+ {
+ Set<InetAddressAndPort> affectedEndpoints = new HashSet<>();
+ delta.forEach((replication, d) ->
affectedEndpoints.addAll(d.affectedEndpoints()));
+ return affectedEndpoints;
+ }
+
@Override
public final Result execute(ClusterMetadata prev)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 063f94857c..1598c5ca89 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -1018,6 +1018,12 @@ public class ClusterMetadataTestHelper
{
return prepareLeave(nodeId(idx));
}
+
+ public static PrepareLeave prepareLeave(int idx, boolean force)
+ {
+ return new PrepareLeave(nodeId(idx), force, new
UniformRangePlacement(), LeaveStreams.Kind.UNBOOTSTRAP);
+ }
+
public static PrepareLeave prepareLeave(NodeId nodeId)
{
return new PrepareLeave(nodeId,
@@ -1026,10 +1032,15 @@ public class ClusterMetadataTestHelper
LeaveStreams.Kind.UNBOOTSTRAP);
}
+ public static PrepareMove prepareMove(int idx, Token newToken)
+ {
+ return prepareMove(nodeId(idx), newToken);
+ }
+
public static PrepareMove prepareMove(NodeId id, Token newToken)
{
return new PrepareMove(id,
-
Collections.singleton(Murmur3Partitioner.instance.getRandomToken()),
+ Collections.singleton(newToken),
new UniformRangePlacement(),
false);
}
@@ -1103,6 +1114,11 @@ public class ClusterMetadataTestHelper
return (BootstrapAndReplace) metadata.inProgressSequences.get(nodeId);
}
+ public static Move getMovePlan(int peer)
+ {
+ return getMovePlan(addr(peer));
+ }
+
public static Move getMovePlan(InetAddressAndPort addr)
{
return getMovePlan(ClusterMetadata.current().directory.peerId(addr));
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
index 82e864b3ef..b13cab2660 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.harry.model.TokenPlacementModel;
import org.apache.cassandra.locator.CMSPlacementStrategy;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.ReplicationParams;
@@ -66,6 +67,7 @@ import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.OwnershipUtils;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.tcm.transformations.Register;
@@ -930,14 +932,27 @@ public class MetadataChangeSimulationTest extends
CMSTestBase
state = SimulatedOperation.leave(sut, state, toLeave);
KeyspaceMetadata ksm =
sut.service.metadata().schema.getKeyspaces().get("test").get();
- DataPlacement allSettled =
sut.service.metadata().writePlacementAllSettled(ksm);
+ // Calculate the full set of data placements when all in flight
operations have been completed
+ DataPlacement allSettled =
OwnershipUtils.placementsAllSettled(sut.service.metadata()).get(ksm.params.replication);
Assert.assertEquals(4, state.inFlightOperations.size()); // make
sure none was rejected
while (!state.inFlightOperations.isEmpty())
{
state =
state.inFlightOperations.get(random.nextInt(state.inFlightOperations.size())).advance(state);
-
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().writePlacementAllSettled(ksm)));
+ // for every node, ask ClusterMetadata for local ranges after
all operations are complete. These
+ // should not change as we progress
+ for (Node n : state.currentNodes)
+ {
+ RangesAtEndpoint localRanges = sut.service.metadata()
+
.placementsAllSettledForNode(n.nodeId())
+
.get(ksm.params.replication)
+ .writes
+ .byEndpoint()
+ .get(n.addr());
+ Assert.assertEquals(localRanges,
allSettled.writes.byEndpoint().get(n.addr()));
+ }
validatePlacements(sut, state);
}
+ // Finally verify that the predicted placements match the actual
ones
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().placements.get(ksm.params.replication)));
}
}
diff --git
a/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java
b/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java
new file mode 100644
index 0000000000..8b5917b420
--- /dev/null
+++
b/test/microbench/org/apache/cassandra/test/microbench/LocalRangesAllSettledBench.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.FileInputStream;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPInputStream;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.OwnershipUtils;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(value = 1)
+@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000)
+@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000)
+public class LocalRangesAllSettledBench
+{
+ static ClusterMetadata metadata;
+ @Setup(Level.Trial)
+ public void setup() throws Exception
+ {
+ DatabaseDescriptor.daemonInitialization();
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+
ClusterMetadataService.setInstance(ClusterMetadataTestHelper.syncInstanceForTest());
+ metadata = metadata();
+ }
+
+ @Benchmark
+ public void benchLocalRangesOnlyWithRelevantMSOs()
+ {
+ Map<KeyspaceMetadata, RangesAtEndpoint> settledByKeyspace = new
HashMap<>();
+ metadata.unsafeClearLocalRangesAllSettled();
+ // This peer is involved in a MOVE operation
+ InetAddressAndPort local =
InetAddressAndPort.getByNameUnchecked("10.10.9.129:7000");
+ FBUtilities.setBroadcastInetAddressAndPort(local);
+ for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces())
+ settledByKeyspace.put(ksm,
metadata.localWriteRangesAllSettled(ksm));
+ }
+
+ @Benchmark
+ public void benchLocalRangesOnlyNoRelevantMSOs()
+ {
+ Map<KeyspaceMetadata, RangesAtEndpoint> settledByKeyspace = new
HashMap<>();
+ metadata.unsafeClearLocalRangesAllSettled();
+ // This peer has no involvement in any in-flight MSOs
+ InetAddressAndPort local =
InetAddressAndPort.getByNameUnchecked("10.10.14.13:7000");
+ FBUtilities.setBroadcastInetAddressAndPort(local);
+ for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces())
+ settledByKeyspace.put(ksm,
metadata.localWriteRangesAllSettled(ksm));
+ }
+
+ @Benchmark
+ public void benchPlacementsAllSettled()
+ {
+ // Emulates the previous implementation of
ClusterMetadata::writePlacementsAllSettled
+ // which would be lazily computed during on first access.
+ // As this fully applies all in-flight MSOs to derive the final
settled placements,
+ // the local broadcast address is not significant.
+ DataPlacements placementAllSettled =
OwnershipUtils.placementsAllSettled(metadata);
+ Map<KeyspaceMetadata, RangesAtEndpoint> settledByKeyspace = new
HashMap<>();
+ for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces())
+ {
+ DataPlacement placement =
placementAllSettled.get(ksm.params.replication);
+ settledByKeyspace.put(ksm,
placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()));
+ }
+ }
+
+ public ClusterMetadata metadata() throws Exception
+ {
+ Path p =
Path.of(this.getClass().getClassLoader().getResource("cluster_metadata/CASSANDRA-21144_clustermetadata.gz").toURI());
+ try (DataInputStreamPlus in = Util.DataInputStreamPlusImpl.wrap(new
GZIPInputStream(new FileInputStream(p.toFile()))))
+ {
+ ClusterMetadata metadata =
VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in);
+ return metadata;
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ Options options = new OptionsBuilder()
+
.include(LocalRangesAllSettledBench.class.getSimpleName())
+ .build();
+ new Runner(options).run();
+ }
+/*
+$ ant microbench -Dbenchmark.name=LocalRangesAllSettledBench
+
+ [java] Benchmark
Mode Cnt Score Error Units
+ [java] LocalRangesAllSettledBench.benchLocalRangesOnlyNoRelevantMSOs
avgt 5 18.214 ± 4.350 ms/op
+ [java] LocalRangesAllSettledBench.benchLocalRangesOnlyWithRelevantMSOs
avgt 5 274.931 ± 14.193 ms/op
+ [java] LocalRangesAllSettledBench.benchPlacementsAllSettled
avgt 5 11465.778 ± 370.754 ms/op
+
+ */
+}
diff --git a/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz
b/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz
new file mode 100644
index 0000000000..0bc749ed5b
Binary files /dev/null and
b/test/resources/cluster_metadata/CASSANDRA-21144_clustermetadata.gz differ
diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
b/test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java
similarity index 63%
rename from test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
rename to
test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java
index 56af302c23..67b696e4aa 100644
--- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
+++
b/test/unit/org/apache/cassandra/tcm/NewTransformationVersionCompatibilityTest.java
@@ -30,17 +30,12 @@ import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.distributed.test.log.CMSTestBase;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.harry.model.TokenPlacementModel;
-import org.apache.cassandra.schema.KeyspaceMetadata;
-import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.SchemaTransformation;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
-import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.LockedRanges;
-import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.AlterSchema;
import org.apache.cassandra.tcm.transformations.CustomTransformation;
@@ -48,11 +43,10 @@ import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.utils.CassandraVersion;
import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr;
-import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class ClusterMetadataTest
+public class NewTransformationVersionCompatibilityTest
{
@BeforeClass
public static void beforeClass()
@@ -67,61 +61,6 @@ public class ClusterMetadataTest
new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, new
TokenPlacementModel.SimpleReplicationFactor(3));
}
- @Test
- public void testWritePlacementAllSettledLeaving()
- {
- for (int i = 1; i <= 4; i++)
- {
- ClusterMetadataTestHelper.register(i);
- ClusterMetadataTestHelper.join(i, i);
- }
-
ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareLeave(3));
- UnbootstrapAndLeave plan = getLeavePlan(3);
-
- ClusterMetadataService.instance().commit(plan.startLeave);
- KeyspaceMetadata ksm = KeyspaceMetadata.create("ks",
KeyspaceParams.simple(3));
-
- DataPlacement writeAllSettled =
ClusterMetadata.current().writePlacementAllSettled(ksm);
- ClusterMetadataService.instance().commit(plan.midLeave);
- ClusterMetadataService.instance().commit(plan.finishLeave);
-
- DataPlacement actualFinishedWritePlacements =
ClusterMetadata.current().placements.get(ksm.params.replication);
-
-
assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.removals.isEmpty());
-
assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.additions.isEmpty());
- }
-
- @Test
- public void testWritePlacementAllSettledJoining()
- {
- for (int i = 1; i <= 4; i++)
- {
- ClusterMetadataTestHelper.register(i);
- ClusterMetadataTestHelper.join(i, i);
- }
-
- ClusterMetadataTestHelper.register(10);
-
ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareJoin(10));
-
- BootstrapAndJoin plan = ClusterMetadataTestHelper.getBootstrapPlan(10);
- ClusterMetadataService.instance().commit(plan.startJoin);
- KeyspaceMetadata ksm = KeyspaceMetadata.create("ks",
KeyspaceParams.simple(3));
- DataPlacement writeAllSettled =
ClusterMetadata.current().writePlacementAllSettled(ksm);
-
- ClusterMetadataService.instance().commit(plan.midJoin);
- ClusterMetadataService.instance().commit(plan.finishJoin);
-
- DataPlacement actualFinishedWritePlacements =
ClusterMetadata.current().placements.get(ksm.params.replication);
-
assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.removals.isEmpty());
-
assertTrue(actualFinishedWritePlacements.difference(writeAllSettled).writes.additions.isEmpty());
- }
-
- @Test
- public void testWritePlacementAllSettledMoving()
- {
- // todo
- }
-
@Test
public void testNewTransformationCommit()
{
diff --git
a/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java
b/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java
new file mode 100644
index 0000000000..f9e8104ca9
--- /dev/null
+++
b/test/unit/org/apache/cassandra/tcm/ownership/LocalRangesAllSettledTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.harry.model.TokenPlacementModel;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
+import org.apache.cassandra.tcm.sequences.Move;
+import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr;
+import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getLeavePlan;
+import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.getMovePlan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class LocalRangesAllSettledTest
+{
+ private static final Logger logger =
LoggerFactory.getLogger(LocalRangesAllSettledTest.class);
+
+ private static final int[] INITIAL_NODES = new int[]{1, 2, 3, 4};
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ ServerTestUtils.prepareServerNoRegister();
+ }
+
+ @Before
+ public void before() throws ExecutionException, InterruptedException
+ {
+ ClusterMetadataService.unsetInstance();
+ new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, new
TokenPlacementModel.SimpleReplicationFactor(3));
+
+ // Join the first 4 nodes
+ for (int i : INITIAL_NODES)
+ {
+ ClusterMetadataTestHelper.register(i, "dc" + i % 3, "rack0");
+ ClusterMetadataTestHelper.join(i, i);
+ }
+
+ // Create keyspaces with various replication settings
+ for (int i = 1; i <= 3; i++)
+ {
+ ClusterMetadataTestHelper.createKeyspace("simple_" + i,
KeyspaceParams.simple(i));
+ ClusterMetadataTestHelper.createKeyspace("nts_" + i,
KeyspaceParams.nts("dc0", i, "dc1", i, "dc2", i));
+ }
+
+ }
+
+ @Test
+ public void testLeaving()
+ {
+ // Verify proposed ranges without any in flight operations
+ AllLocalRanges initial =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES);
+ AllLocalRanges proposed =
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES);
+ assertEquals(initial, proposed);
+ // Check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
initial, INITIAL_NODES);
+
+ // Initiate an operation which affects ownership. This will add the
MultiStepOperation which encodes any
+ // necessary range movements so subsequent calls to
ClusterMetadata::localRangesAllSettled
+ // should return the expected local ranges after the operation has
completed
+ // pick a random node to leave (but not the CMS node (1), for
simplicity's sake)
+ int leaving = INITIAL_NODES[Math.max(1, new Random().nextInt(4))];
+ logger.info("Selected node {} to leave", leaving);
+
ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareLeave(leaving,
true));
+ proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED,
INITIAL_NODES);
+ assertNotEquals(initial, proposed);
+
+ // Step through execution of the MSO, verifying after each step that
the proposed ranges don't change
+ UnbootstrapAndLeave plan = getLeavePlan(leaving);
+ ClusterMetadataService.instance().commit(plan.startLeave);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+ ClusterMetadataService.instance().commit(plan.midLeave);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+ ClusterMetadataService.instance().commit(plan.finishLeave);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+
+ // Verify that the final local ranges match what was proposed
+ AllLocalRanges finalized =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES);
+ assertEquals(proposed, finalized);
+
+ // Finally, check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
finalized, INITIAL_NODES);
+ }
+
+ @Test
+ public void testJoining()
+ {
+ // Verify proposed ranges without any in flight operations
+ AllLocalRanges initial =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES);
+ AllLocalRanges proposed =
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES);
+ assertEquals(initial, proposed);
+ // Check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
initial, INITIAL_NODES);
+
+ // Initiate an operation which affects ownership. This will add the
MultiStepOperation which encodes any
+ // necessary range movements so subsequent calls to
ClusterMetadata::localRangesAllSettled
+ // should return the expected local ranges after the operation has
completed
+ int newNode = 10;
+ ClusterMetadataTestHelper.register(newNode);
+ int[] expandedNodes = Arrays.copyOf(INITIAL_NODES,
INITIAL_NODES.length + 1);
+ expandedNodes[expandedNodes.length - 1] = newNode;
+
ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareJoin(newNode));
+ proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED,
expandedNodes);
+ assertNotEquals(initial, proposed);
+
+ // Step through execution of the MSO, verifying after each step that
the proposed ranges don't change
+ BootstrapAndJoin plan =
ClusterMetadataTestHelper.getBootstrapPlan(newNode);
+ ClusterMetadataService.instance().commit(plan.startJoin);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes));
+ ClusterMetadataService.instance().commit(plan.midJoin);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes));
+ ClusterMetadataService.instance().commit(plan.finishJoin);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, expandedNodes));
+
+ // Verify that the final local ranges match what was proposed
+ AllLocalRanges finalized =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, expandedNodes);
+ assertEquals(proposed, finalized);
+
+ // Finally, check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
finalized, expandedNodes);
+ }
+
+ @Test
+ public void testMoving()
+ {
+ // Verify proposed ranges without any in flight operations
+ AllLocalRanges initial =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES);
+ AllLocalRanges proposed =
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES);
+ assertEquals(initial, proposed);
+ // Check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
initial, INITIAL_NODES);
+
+ // Initiate an operation which affects ownership. This will add the
MultiStepOperation which encodes any
+ // necessary range movements so subsequent calls to
ClusterMetadata::localRangesAllSettled
+ // should return the expected local ranges after the operation has
completed
+ // pick a random node to leave (but not the CMS node (1), for
simplicity's sake)
+ int moving = INITIAL_NODES[Math.max(1, new Random().nextInt(4))];
+ Token newToken =
ClusterMetadata.current().partitioner.getRandomToken();
+ while (ClusterMetadata.current().tokenMap.tokens().contains(newToken))
+ newToken = ClusterMetadata.current().partitioner.getRandomToken();
+ logger.info("Selected node {} to move to token {} ", moving, newToken);
+
ClusterMetadataService.instance().commit(ClusterMetadataTestHelper.prepareMove(moving,
newToken));
+ proposed = snapshotAllLocalRanges(LocalRangeStatus.SETTLED,
INITIAL_NODES);
+ assertNotEquals(initial, proposed);
+
+ // Step through execution of the MSO, verifying after each step that
the proposed ranges don't change
+ Move plan = getMovePlan(moving);
+ ClusterMetadataService.instance().commit(plan.startMove);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+ ClusterMetadataService.instance().commit(plan.midMove);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+ ClusterMetadataService.instance().commit(plan.finishMove);
+ assertEquals(proposed,
snapshotAllLocalRanges(LocalRangeStatus.SETTLED, INITIAL_NODES));
+
+ // Verify that the final local ranges match what was proposed
+ AllLocalRanges finalized =
snapshotAllLocalRanges(LocalRangeStatus.CURRENT, INITIAL_NODES);
+ assertEquals(proposed, finalized);
+
+ // Finally, check against the actual write placements
+ assertLocalRangesMatchPlacements(ClusterMetadata.current().placements,
finalized, INITIAL_NODES);
+ }
+
+ private void assertLocalRangesMatchPlacements(DataPlacements placements,
+ AllLocalRanges
allLocalRanges,
+ int... nodes)
+ {
+ for (int id : nodes)
+ {
+ RangeSetMap localRanges = allLocalRanges.get(id);
+ InetAddressAndPort endpoint = addr(id);
+ placements.forEach((replication, placement) -> {
+ Set<Range<Token>> ranges = localRanges.get(replication);
+ Set<Range<Token>> fromPlacement =
placement.writes.byEndpoint().get(endpoint).ranges();
+ assertEquals(ranges, fromPlacement);
+ });
+ }
+ }
+
+ private enum LocalRangeStatus { CURRENT, SETTLED }
+ private static AllLocalRanges snapshotAllLocalRanges(LocalRangeStatus
status, int... nodes)
+ {
+ InetAddressAndPort realLocalAddress =
FBUtilities.getBroadcastAddressAndPort();
+ AllLocalRanges snapshot = new AllLocalRanges();
+ ClusterMetadata metadata = ClusterMetadata.current();
+ for (int id : nodes)
+ {
+ InetAddressAndPort address = addr(id);
+ // clear cached settled local ranges
+ metadata.unsafeClearLocalRangesAllSettled();
+ // temporarily set broadcast address to infer which node is "local"
+ FBUtilities.setBroadcastInetAddressAndPort(address);
+ RangeSetMap.Builder localRanges = RangeSetMap.builder();
+ for (KeyspaceMetadata ksm : metadata.schema.getKeyspaces())
+ {
+ Set<Range<Token>> ranges = status == LocalRangeStatus.SETTLED
+ ?
metadata.localWriteRangesAllSettled(ksm).ranges()
+ :
metadata.localWriteRanges(ksm).ranges();
+ localRanges.put(ksm.params.replication, ranges);
+ }
+ snapshot.put(id, localRanges.build());
+ }
+ // restore local address
+ FBUtilities.setBroadcastInetAddressAndPort(realLocalAddress);
+ metadata.unsafeClearLocalRangesAllSettled();
+ return snapshot;
+ }
+
+ // A snapshot of the local ranges for each replication setting for each
node
+ private static class AllLocalRanges
+ {
+ Map<Integer, RangeSetMap> localWriteRanges = new HashMap<>();
+
+ void put(int nodeId, RangeSetMap ranges)
+ {
+ localWriteRanges.put(nodeId, ranges);
+ }
+
+ RangeSetMap get(int nodeId)
+ {
+ return localWriteRanges.get(nodeId);
+ }
+
+ public final boolean equals(Object o)
+ {
+ if (!(o instanceof AllLocalRanges)) return false;
+
+ return Objects.equals(localWriteRanges,
((AllLocalRanges)o).localWriteRanges);
+ }
+
+ public int hashCode()
+ {
+ return Objects.hashCode(localWriteRanges);
+ }
+
+ public String toString()
+ {
+ return "AllLocalRanges{" +
+ "localWriteRanges=" + localWriteRanges +
+ '}';
+ }
+ }
+
+}
diff --git a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
index 0f29c46099..897dea5e5e 100644
--- a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
+++ b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -41,7 +42,10 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MultiStepOperation;
+import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.utils.ByteBufferUtil;
import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.broadcastAddress;
@@ -239,4 +243,17 @@ public class OwnershipUtils
tokens.add(partitioner.getRandomToken(random));
return tokens;
}
+
+ public static DataPlacements placementsAllSettled(ClusterMetadata metadata)
+ {
+ ClusterMetadata workingMetadata = metadata;
+ Iterator<MultiStepOperation<?>> iter =
metadata.inProgressSequences.iterator();
+ while (iter.hasNext())
+ {
+ Transformation.Result result =
iter.next().applyTo(workingMetadata);
+ assert result.isSuccess();
+ workingMetadata = result.success().metadata;
+ }
+ return workingMetadata.placements;
+ }
}
diff --git a/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java
b/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java
new file mode 100644
index 0000000000..a7357ebd90
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/ownership/RangeSetMap.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.ReplicationParams;
+
+public class RangeSetMap extends ReplicationMap<Set<Range<Token>>>
+{
+ public RangeSetMap()
+ {
+ super(new HashMap<>());
+ }
+
+ public RangeSetMap(Map<ReplicationParams, Set<Range<Token>>> map)
+ {
+ super(map);
+ }
+
+ @Override
+ protected Set<Range<Token>> defaultValue()
+ {
+ return Set.of();
+ }
+
+ @Override
+ protected Set<Range<Token>> localOnly()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Builder unbuild()
+ {
+ return new Builder(map);
+ }
+
+ public void clear()
+ {
+ map.clear();
+ }
+
+ public String toString()
+ {
+ return "RangeSetMap{" +
+ "map=" + map +
+ '}';
+ }
+
+ public static Builder builder()
+ {
+ return new Builder(new HashMap<>());
+ }
+
+ public static Builder builder(int expectedSize)
+ {
+ return new Builder(Maps.newHashMapWithExpectedSize(expectedSize));
+ }
+
+ public static class Builder
+ {
+ private final Map<ReplicationParams, Set<Range<Token>>> map;
+ private Builder(Map<ReplicationParams, Set<Range<Token>>> map)
+ {
+ this.map = map;
+ }
+
+ public Builder put(ReplicationParams params, Set<Range<Token>> ranges)
+ {
+ map.put(params, ranges);
+ return this;
+ }
+
+ public RangeSetMap build()
+ {
+ return new RangeSetMap(map);
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java
b/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java
index 470198cd5b..364b212f6c 100644
--- a/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java
+++ b/test/unit/org/apache/cassandra/tcm/sequences/DropAccordTableTest.java
@@ -96,11 +96,11 @@ public class DropAccordTableTest
cms.commit(new PrepareDropAccordTable(table));
- // This is only here because "applyTo" is not touched without it...
- for (KeyspaceMetadata ks : cms.metadata().schema.getKeyspaces())
- cms.metadata().writePlacementAllSettled(ks);
-
Assertions.assertThat(cms.metadata().inProgressSequences.isEmpty()).isFalse();
+ MultiStepOperation<?> operation =
cms.metadata().inProgressSequences.get(table);
+ Assertions.assertThat(operation).isNotNull();
+ Assertions.assertThat(operation.kind() ==
MultiStepOperation.Kind.DROP_ACCORD_TABLE).isTrue();
+
InProgressSequences.finishInProgressSequences(table);
Assertions.assertThat(cms.metadata().inProgressSequences.isEmpty()).isTrue();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]