This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cep-58-satellite-datacenters
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-58-satellite-datacenters
by this push:
new 9ef1e8e192 CEP-58: Add satellite replication strategy stub
9ef1e8e192 is described below
commit 9ef1e8e1924101d87e4c1dd0ffdcb2dc257d5356
Author: Blake Eggleston <[email protected]>
AuthorDate: Wed Feb 4 10:23:16 2026 -0800
CEP-58: Add satellite replication strategy stub
Patch by Blake Eggleston; Reviewed by Caleb Rackliffe for CASSANDRA-21105
---
.../locator/AbstractReplicationStrategy.java | 4 +-
.../cassandra/locator/ReplicationFactor.java | 4 +-
.../locator/SatelliteReplicationStrategy.java | 516 +++++++++++++++++++
...atelliteReplicationStrategyEquivalenceTest.java | 554 +++++++++++++++++++++
.../locator/SatelliteReplicationStrategyTest.java | 402 +++++++++++++++
5 files changed, 1477 insertions(+), 3 deletions(-)
diff --git
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 93b0d26fa6..0860692901 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -369,9 +369,11 @@ public abstract class AbstractReplicationStrategy
try
{
ReplicationFactor rf = ReplicationFactor.fromString(s);
-
+
if (rf.hasTransientReplicas())
{
+ if (rf.fullReplicas == 0)
+ throw new ConfigurationException("Replication factor must
have at least one full replica, got " + s);
if (DatabaseDescriptor.getNumTokens() > 1)
throw new ConfigurationException("Transient replication is
not supported with vnodes yet");
if (!replicationType.isTracked())
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java
b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
index ee971d900d..254f8d58af 100644
--- a/src/java/org/apache/cassandra/locator/ReplicationFactor.java
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -65,8 +65,8 @@ public class ReplicationFactor
"Transient replication is not enabled on
this node");
Preconditions.checkArgument(totalRF >= 0,
"Replication factor must be non-negative,
found %s", totalRF);
- Preconditions.checkArgument(transientRF == 0 || transientRF < totalRF,
- "Transient replicas must be zero, or less
than total replication factor. For %s/%s", totalRF, transientRF);
+ Preconditions.checkArgument(transientRF <= totalRF,
+ "Transient replicas must be less than or
equal to total replication factor. For %s/%s", totalRF, transientRF);
if (transientRF > 0)
{
Preconditions.checkArgument(DatabaseDescriptor.getNumTokens() == 1,
diff --git
a/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java
b/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java
new file mode 100644
index 0000000000..ce351a1236
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import
org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
+import
org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+
+import static java.lang.String.format;
+
+/**
+ * Replication strategy for CEP-58: Satellite Datacenters
+ * allows pairing full datacenters with smaller satellite DCs * that use
witness replicas for mutation tracking and
+ * enable transactionally consistent failover without requiring 3 full
datacenters.
+ *
+ * Configuration syntax uses dot notation:
+ *
+ * CREATE KEYSPACE ks WITH replication = {
+ * 'class': 'SatelliteReplicationStrategy',
+ * 'DC1': '3', // Full datacenter with RF=3
+ * 'DC1.satellite.ST1': '3/3', // Satellite ST1 for DC1 with 3 witness
replicas
+ * 'DC2': '3', // Second full DC
+ * 'DC2.satellite.ST2': '3/3', // Satellite ST2 for DC2
+ * 'primary': 'DC1' // Primary datacenter designation
+ * 'DC2.disabled': 'true', // Replication to DC1 disabled (disabled
DC can't be primary, use for down DCs)
+ * } AND replication_type = tracked;
+ *
+ * Requirements:
+ * - Keyspace must use tracked replication (replication_type = tracked)
+ * - Cluster must have transient replication enabled
(transient_replication_enabled: true)
+ * - Satellites must use witness replica format 'N/N' where all replicas are
transient
+ *
+ * Uses NetworkTopologyStrategy replica selection algorithm for compatibility
with existing NTS keyspaces
+ */
+public class SatelliteReplicationStrategy extends AbstractReplicationStrategy
+{
+ private static final Logger logger =
LoggerFactory.getLogger(SatelliteReplicationStrategy.class);
+
+ private static final String PRIMARY_DC_KEY = "primary";
+ private static final String SATELLITE_KEY_PATTERN = ".satellite.";
+ private static final String DISABLED_KEY_SUFFIX = ".disabled";
+
+ private final Map<String, ReplicationFactor> fullDCs;
+
+ private final Map<String, SatelliteInfo> satellites;
+
+ private final Map<String, ReplicationFactor> allDCs;
+
+ private final String primaryDC;
+
+ private final Set<String> disabledDCs;
+
+ private final ReplicationFactor aggregateRf;
+
+ private static class SatelliteInfo
+ {
+ public final String parentDC;
+
+ public final String name;
+
+ public final ReplicationFactor rf;
+
+ SatelliteInfo(String parentDC, String name, ReplicationFactor rf)
+ {
+ this.parentDC = parentDC;
+ this.name = name;
+ this.rf = rf;
+ }
+ }
+
+ private static void cfgEx(String format, Object... args)
+ {
+ throw new ConfigurationException(format(format, args));
+ }
+
+ private static class StrategyOptions
+ {
+ Map<String, ReplicationFactor> fullDatacenters = new HashMap<>();
+ Map<String, SatelliteInfo> satellites = new HashMap<>();
+ Map<String, ReplicationFactor> allDatacenters = new HashMap<>();
+ Set<String> disabledDCs = new HashSet<>();
+ String primary = null;
+
+ StrategyOptions(Map<String, String> configOptions)
+ {
+
+ if (configOptions != null)
+ {
+ for (Map.Entry<String, String> entry :
configOptions.entrySet())
+ {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (setPrimary(key, value))
+ continue;
+
+ if (addSatellite(key, value))
+ continue;
+
+ if (addPrimary(key, value))
+ continue;
+
+ if (key.contains("."))
+ cfgEx("Datacenter names cannot contain dots: '%s'. Use
'<DC>.satellite.<SAT>' for satellites", key);
+
+ ReplicationFactor rf = ReplicationFactor.fromString(value);
+ fullDatacenters.put(key, rf);
+ }
+ }
+
+ allDatacenters.putAll(fullDatacenters);
+ satellites.forEach((dc, info) -> allDatacenters.put(dc, info.rf));
+ }
+
+ boolean setPrimary(String key, String value)
+ {
+ if (!key.equalsIgnoreCase(PRIMARY_DC_KEY))
+ return false;
+
+ if (primary != null)
+ cfgEx("'primary' option specified multiple times");
+
+ primary = value;
+ return true;
+ }
+
+ boolean addSatellite(String key, String value)
+ {
+ if (!key.contains(SATELLITE_KEY_PATTERN))
+ return false;
+
+ String[] parts = key.split("\\" + SATELLITE_KEY_PATTERN);
+ if (parts.length != 2)
+ cfgEx("Invalid satellite configuration key '%s'. Expected
format: '<DC>.satellite.<SAT>'", key);
+
+ String parentDc = parts[0];
+ String satelliteName = parts[1];
+
+ if (parentDc.contains(".") || satelliteName.contains("."))
+ cfgEx("Datacenter names cannot contain dots: '%s'", key);
+
+ // Parse RF - satellites must use witness format (e.g., '3/3')
+ if (!value.contains("/"))
+ cfgEx("Satellite replicas must be specified as witness
replicas using format 'N/N' (e.g., '3/3'). Got '%s'", value);
+
+ String[] rfParts = value.split("/");
+ if (rfParts.length != 2)
+ cfgEx("Invalid replication factor format for satellite '%s':
'%s'", satelliteName, value);
+
+ try
+ {
+ int total = Integer.parseInt(rfParts[0]);
+ int transientCount = Integer.parseInt(rfParts[1]);
+
+ if (transientCount != total)
+ cfgEx("Satellite replicas must all be witnesses. Expected
'%d/%d' but got '%s'", total, total, value);
+
+ if (total <= 0)
+ cfgEx("Satellite replication factor must be positive. Got
'%s'", value);
+
+ satellites.put(satelliteName, new SatelliteInfo(parentDc,
satelliteName,
+
ReplicationFactor.withTransient(total, total)));
+ }
+ catch (NumberFormatException e)
+ {
+ cfgEx("Invalid replication factor format for satellite '%s':
'%s'", satelliteName, value);
+ }
+ return true;
+ }
+
+ boolean addPrimary(String key, String value)
+ {
+ if (!key.endsWith(DISABLED_KEY_SUFFIX))
+ return false;
+
+ String dcName = key.substring(0, key.length() -
DISABLED_KEY_SUFFIX.length());
+ if (dcName.isEmpty())
+ cfgEx("Invalid disabled configuration key '%s'", key);
+ if (dcName.contains("."))
+ cfgEx("Datacenter names cannot contain dots: '%s'", dcName);
+
+ if ("true".equalsIgnoreCase(value))
+ disabledDCs.add(dcName);
+ else if (!"false".equalsIgnoreCase(value))
+ cfgEx("Invalid value for '%s': expected 'true' or 'false', got
'%s'", key, value);
+
+ return true;
+ }
+ }
+
+ public SatelliteReplicationStrategy(String keyspaceName,
+ Map<String, String> configOptions,
+ ReplicationType replicationType)
throws ConfigurationException
+ {
+ super(keyspaceName, configOptions, replicationType);
+
+ StrategyOptions opts = new StrategyOptions(configOptions);
+
+ this.fullDCs = Collections.unmodifiableMap(opts.fullDatacenters);
+ this.satellites = Collections.unmodifiableMap(opts.satellites);
+ this.allDCs = Collections.unmodifiableMap(opts.allDatacenters);
+ this.primaryDC = opts.primary;
+ this.disabledDCs = Collections.unmodifiableSet(opts.disabledDCs);
+
+ validate();
+
+ int totalReplicas = 0;
+ int totalTransient = 0;
+ for (ReplicationFactor rf : fullDCs.values())
+ {
+ totalReplicas += rf.allReplicas;
+ totalTransient += rf.transientReplicas();
+ }
+
+ this.aggregateRf = ReplicationFactor.withTransient(totalReplicas,
totalTransient);
+
+ if (disabledDCs.isEmpty())
+ logger.info("Configured satellite datacenter replication for
keyspace {} with full datacenters {} (primary: {}), satellites {}",
+ keyspaceName, fullDCs, primaryDC, satellites);
+ else
+ logger.info("Configured satellite datacenter replication for
keyspace {} with full datacenters {} (primary: {}), satellites {}, disabled {}",
+ keyspaceName, fullDCs, primaryDC, satellites,
disabledDCs);
+ }
+
+ private void validate()
+ {
+ if (primaryDC == null)
+ cfgEx("'primary' option is required");
+
+ if (!fullDCs.containsKey(primaryDC))
+ cfgEx("Primary datacenter '%s' must be defined as a full
datacenter", primaryDC);
+
+ for (SatelliteInfo satInfo : satellites.values())
+ {
+ if (!fullDCs.containsKey(satInfo.parentDC))
+ cfgEx("Satellite '%s' references non-existent full datacenter
'%s'", satInfo.name, satInfo.parentDC);
+ }
+
+ for (String disabledDC : disabledDCs)
+ {
+ if (!fullDCs.containsKey(disabledDC))
+ cfgEx("Disabled datacenter '%s' is not defined as a full
datacenter", disabledDC);
+ }
+
+ if (disabledDCs.contains(primaryDC))
+ cfgEx("Primary datacenter '%s' cannot be disabled", primaryDC);
+ }
+
+ @Override
+ public EndpointsForRange calculateNaturalReplicas(Token searchToken,
ClusterMetadata metadata)
+ {
+ return NetworkTopologyStrategy.calculateNaturalReplicas(
+ searchToken,
+ TokenRingUtils.getRange(metadata.tokenMap.tokens(), searchToken),
+ metadata.directory,
+ metadata.tokenMap,
+ allDCs);
+ }
+
+ @Override
+ public DataPlacement calculateDataPlacement(Epoch epoch,
List<Range<Token>> ranges, ClusterMetadata metadata)
+ {
+ ReplicaGroups.Builder builder = ReplicaGroups.builder();
+
+ for (Range<Token> range : ranges)
+ {
+ EndpointsForRange endpointsForRange =
calculateNaturalReplicas(range.right, metadata);
+ builder.withReplicaGroup(VersionedEndpoints.forRange(epoch,
endpointsForRange));
+ }
+
+ ReplicaGroups built = builder.build();
+ return new DataPlacement(built, built);
+ }
+
+ @Override
+ public ReplicationFactor getReplicationFactor()
+ {
+ return aggregateRf;
+ }
+
+ @Override
+ public Collection<String> recognizedOptions(ClusterMetadata metadata)
+ {
+ // accept anything - validation happens in constructor
+ return null;
+ }
+
+ @Override
+ public void validateExpectedOptions(ClusterMetadata metadata) throws
ConfigurationException
+ {
+ // must use tracked replication type
+ if (!replicationType.isTracked())
+ {
+ throw new ConfigurationException(
+ getClass().getSimpleName() + " requires tracked replication. "
+
+ "Use 'AND replication_type = tracked' when creating/updating
the keyspace");
+ }
+
+ // must have witness replicas enabled
+ if (!DatabaseDescriptor.isTransientReplicationEnabled())
+ {
+ throw new ConfigurationException(
+ getClass().getSimpleName() + " requires transient replication
to be enabled. " +
+ "Set 'transient_replication_enabled: true' in cassandra.yaml");
+ }
+
+ // must have at least one full DC
+ if (fullDCs.isEmpty())
+ {
+ throw new ConfigurationException(
+ "Configuration for at least one full datacenter must be
present");
+ }
+
+ // validate datacenter names against topology
+ Set<String> knownDcs = metadata.directory.knownDatacenters();
+ for (String dc : fullDCs.keySet())
+ {
+ if (!knownDcs.contains(dc))
+ {
+ throw new ConfigurationException(format(
+ "Unrecognized datacenter '%s'. Known DCs: %s", dc,
knownDcs));
+ }
+ }
+
+ // validate satellite DCs exist and don't overlap with full DCs
+ for (SatelliteInfo satInfo : satellites.values())
+ {
+ if (!knownDcs.contains(satInfo.name))
+ {
+ throw new ConfigurationException(format(
+ "Unrecognized satellite datacenter '%s'. Known DCs: %s",
satInfo.name, knownDcs));
+ }
+
+ if (fullDCs.containsKey(satInfo.name))
+ {
+ throw new ConfigurationException(format(
+ "Datacenter '%s' cannot be both a full datacenter and a
satellite",
+ satInfo.name));
+ }
+ }
+
+ // check for pending mutation tracking migrations
+ MutationTrackingMigrationState migrationState =
metadata.mutationTrackingMigrationState;
+ KeyspaceMigrationInfo migrationInfo =
migrationState.getKeyspaceInfo(keyspaceName);
+
+ if (migrationInfo != null && !migrationInfo.isComplete())
+ {
+ throw new ConfigurationException(format(
+ "Cannot alter replication strategy for keyspace %s while
mutation tracking migration is in progress",
+ keyspaceName));
+ }
+ }
+
+ @Override
+ public void validateOptions() throws ConfigurationException
+ {
+ // validate full DC replication factors
+ for (Map.Entry<String, String> entry : this.configOptions.entrySet())
+ {
+ String key = entry.getKey();
+ if (key.equalsIgnoreCase(PRIMARY_DC_KEY) ||
key.contains(SATELLITE_KEY_PATTERN) || key.endsWith(DISABLED_KEY_SUFFIX))
+ continue;
+
+ validateReplicationFactor(entry.getValue());
+ }
+
+ // satellites are validated in constructor
+ }
+
+ @Override
+ public void maybeWarnOnOptions(ClientState state)
+ {
+ if (SchemaConstants.isSystemKeyspace(keyspaceName))
+ return;
+
+ Directory directory = ClusterMetadata.current().directory;
+ Multimap<String, InetAddressAndPort> dcsNodes =
+ directory.allDatacenterEndpoints();
+
+ for (Map.Entry<String, ReplicationFactor> entry : fullDCs.entrySet())
+ {
+ String dc = entry.getKey();
+ ReplicationFactor rf = entry.getValue();
+
+ if (disabledDCs.contains(dc))
+ continue;
+
+ // apply guardrails
+ Guardrails.minimumReplicationFactor.guard(rf.fullReplicas,
keyspaceName, false, state);
+ Guardrails.maximumReplicationFactor.guard(rf.fullReplicas,
keyspaceName, false, state);
+
+ // warn if rF > node count
+ int nodeCount = dcsNodes.containsKey(dc) ? dcsNodes.get(dc).size()
: 0;
+ if (rf.fullReplicas > nodeCount && nodeCount != 0)
+ {
+ String msg = format(
+ "Your replication factor %d for keyspace %s is higher than
the number of nodes %d for datacenter %s",
+ rf.fullReplicas, keyspaceName, nodeCount, dc);
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+ }
+
+ for (SatelliteInfo satInfo : satellites.values())
+ {
+ if (disabledDCs.contains(satInfo.parentDC))
+ continue;
+
+ String dc = satInfo.name;
+ int replicaCount = satInfo.rf.allReplicas;
+
+ int nodeCount = dcsNodes.containsKey(dc) ? dcsNodes.get(dc).size()
: 0;
+ if (replicaCount > nodeCount && nodeCount != 0)
+ {
+ String msg = format(
+ "Your replication factor %d for keyspace %s is higher than
the number of nodes %d for satellite datacenter %s",
+ replicaCount, keyspaceName, nodeCount, dc);
+ ClientWarn.instance.warn(msg);
+ logger.warn(msg);
+ }
+ }
+ }
+
+ public String getPrimaryDC()
+ {
+ return primaryDC;
+ }
+
+ public Set<String> getDatacenters()
+ {
+ return fullDCs.keySet();
+ }
+
+ public ReplicationFactor getReplicationFactor(String dc)
+ {
+ ReplicationFactor rf = fullDCs.get(dc);
+ return rf != null ? rf : ReplicationFactor.ZERO;
+ }
+
+ public Map<String, Integer> getSatellites()
+ {
+ Map<String, Integer> result =
Maps.newHashMapWithExpectedSize(satellites.size());
+ satellites.forEach((k, v) -> result.put(k, v.rf.allReplicas));
+ return result;
+ }
+
+ public Set<String> getDisabledDatacenters()
+ {
+ return disabledDCs;
+ }
+
+ public boolean isDisabled(String dc)
+ {
+ return disabledDCs.contains(dc);
+ }
+
+ @Override
+ public boolean hasSameSettings(AbstractReplicationStrategy other)
+ {
+ if (!super.hasSameSettings(other))
+ return false;
+
+ if (!(other instanceof SatelliteReplicationStrategy))
+ return false;
+
+ SatelliteReplicationStrategy otherStrategy =
(SatelliteReplicationStrategy) other;
+
+ return fullDCs.equals(otherStrategy.fullDCs) &&
+ satellites.equals(otherStrategy.satellites) &&
+ primaryDC.equals(otherStrategy.primaryDC) &&
+ disabledDCs.equals(otherStrategy.disabledDCs);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
new file mode 100644
index 0000000000..f6586eb0e7
--- /dev/null
+++
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import org.apache.cassandra.CassandraTestBase;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static org.apache.cassandra.CassandraTestBase.DisableMBeanRegistration;
+import static org.apache.cassandra.CassandraTestBase.PrepareServerNoRegister;
+import static org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.arbitrary;
+
+/**
+ * We need to be able to convert network topology strategy keyspace to
satellite replication strategy
+ * keyspaces without any differences relating to replica placement. A replica
placement bug will cause dataloss
+ *
+ * This test checks the replica placement of both strategies against each
other and a test model under
+ * a variety of configurations to confirm there are no differences between the
2.
+ */
+@PrepareServerNoRegister
+@DisableMBeanRegistration
+@UseMurmur3Partitioner
+public class SatelliteReplicationStrategyEquivalenceTest extends
CassandraTestBase
+{
+ private static final String KEYSPACE = "test";
+
+ @After
+ public void teardown()
+ {
+ ServerTestUtils.resetCMS();
+ }
+
+ static class TestCase
+ {
+ final List<String> fullDcs;
+ final List<String> satelliteDcs;
+ final Map<String, Integer> fullDcRf;
+ final Map<String, Integer> satelliteRf;
+ final String primaryDc;
+ final Set<String> disabledDCs;
+ final List<Token> nodeTokens;
+ final List<Token> queryTokens;
+
+ final List<Integer> nodeCounts;
+ final List<Integer> rackCounts;
+ final List<Integer> satNodeCounts;
+ final List<Integer> satRackCounts;
+ final Map<String, String> satelliteToParent;
+
+ TestCase(List<String> fullDcs, List<String> satelliteDcs,
+ Map<String, Integer> fullDcRf, Map<String, Integer>
satelliteRf,
+ String primaryDc, Set<String> disabledDCs,
+ List<Token> nodeTokens, List<Token> queryTokens,
+ List<Integer> nodeCounts, List<Integer> rackCounts,
+ List<Integer> satNodeCounts, List<Integer> satRackCounts,
+ Map<String, String> satelliteToParent)
+ {
+ this.fullDcs = fullDcs;
+ this.satelliteDcs = satelliteDcs;
+ this.fullDcRf = fullDcRf;
+ this.satelliteRf = satelliteRf;
+ this.primaryDc = primaryDc;
+ this.disabledDCs = disabledDCs;
+ this.nodeTokens = nodeTokens;
+ this.queryTokens = queryTokens;
+ this.nodeCounts = nodeCounts;
+ this.rackCounts = rackCounts;
+ this.satNodeCounts = satNodeCounts;
+ this.satRackCounts = satRackCounts;
+ this.satelliteToParent = satelliteToParent;
+ }
+
+
+ static Gen<TestCase> gen()
+ {
+ return rnd -> {
+ int numFullDcs = arbitrary().pick(1, 2, 3).generate(rnd);
+ List<String> fullDcs = new ArrayList<>();
+ List<Integer> nodeCounts = new ArrayList<>();
+ List<Integer> rackCounts = new ArrayList<>();
+ Map<String, Integer> fullDcRf = new HashMap<>();
+
+ for (int i = 1; i <= numFullDcs; i++)
+ {
+ String dcName = "dc" + i;
+ fullDcs.add(dcName);
+
+ int nodes = SourceDSL.integers().between(3,
10).generate(rnd);
+ nodeCounts.add(nodes);
+
+ int racks = SourceDSL.integers().between(1,
3).generate(rnd);
+ rackCounts.add(racks);
+
+ int rf = SourceDSL.integers().between(1, Math.min(7,
nodes)).generate(rnd);
+ fullDcRf.put(dcName, rf);
+ }
+
+ int numSatellites = arbitrary().pick(0, 1, 2, 3).generate(rnd);
+ numSatellites = Math.min(numSatellites, numFullDcs);
+
+ List<String> satelliteDcs = new ArrayList<>();
+ List<Integer> satNodeCounts = new ArrayList<>();
+ List<Integer> satRackCounts = new ArrayList<>();
+ Map<String, Integer> satelliteRf = new HashMap<>();
+ Map<String, String> satelliteToParent = new HashMap<>();
+
+ for (int i = 0; i < numSatellites; i++)
+ {
+ String satName = "sat" + (i + 1);
+ satelliteDcs.add(satName);
+
+ int satNodes = SourceDSL.integers().between(2,
5).generate(rnd);
+ satNodeCounts.add(satNodes);
+
+ int satRacks = SourceDSL.integers().between(1, Math.min(3,
satNodes)).generate(rnd);
+ satRackCounts.add(satRacks);
+
+ int satRf = SourceDSL.integers().between(1, Math.min(3,
satNodes)).generate(rnd);
+ satelliteRf.put(satName, satRf);
+
+ satelliteToParent.put(satName, fullDcs.get(i));
+ }
+
+ String primaryDc = arbitrary().pick(fullDcs).generate(rnd);
+
+ // Randomly disable some non-primary DCs (~25% chance per DC)
+ Set<String> disabledDCs = new HashSet<>();
+ for (String dc : fullDcs)
+ {
+ if (!dc.equals(primaryDc) && arbitrary().pick(true, false,
false, false).generate(rnd))
+ disabledDCs.add(dc);
+ }
+
+ int totalNodes =
nodeCounts.stream().mapToInt(Integer::intValue).sum() +
+
satNodeCounts.stream().mapToInt(Integer::intValue).sum();
+
+ Set<Long> uniqueTokenValues = new HashSet<>();
+ while (uniqueTokenValues.size() < totalNodes)
+ {
+ long tokenValue =
SourceDSL.longs().between(Long.MIN_VALUE, Long.MAX_VALUE).generate(rnd);
+ uniqueTokenValues.add(tokenValue);
+ }
+
+ List<Token> nodeTokens = new ArrayList<>();
+ for (long tokenValue : uniqueTokenValues)
+ {
+ nodeTokens.add(new LongToken(tokenValue));
+ }
+ nodeTokens.sort(null);
+
+ List<Token> queryTokens = new ArrayList<>();
+
+ // boundary tokens
+ for (Token nodeToken : nodeTokens)
+ {
+ long tokenValue = nodeToken.getLongValue();
+ queryTokens.add(nodeToken);
+ queryTokens.add(new LongToken(tokenValue - 1));
+ queryTokens.add(new LongToken(tokenValue + 1));
+ }
+
+ // random tokens
+ int remainingTokens = Math.max(0, 2000 - queryTokens.size());
+ for (int i = 0; i < remainingTokens; i++)
+ {
+ long tokenValue =
SourceDSL.longs().between(Long.MIN_VALUE, Long.MAX_VALUE).generate(rnd);
+ queryTokens.add(new LongToken(tokenValue));
+ }
+
+ return new TestCase(fullDcs, satelliteDcs, fullDcRf,
satelliteRf, primaryDc, disabledDCs, nodeTokens, queryTokens,
+ nodeCounts, rackCounts, satNodeCounts,
satRackCounts, satelliteToParent);
+ };
+ }
+ }
+
+ /**
+ * reference model for replica placement verification.
+ *
+ * implements basic rack-aware replica selection by walking the token ring.
+ */
+ static class ReplicaPlacementModel
+ {
+ private final Map<String, Integer> fullDcRf;
+ private final Map<String, Integer> satelliteRf;
+
+ ReplicaPlacementModel(Map<String, Integer> fullDcRf, Map<String,
Integer> satelliteRf)
+ {
+ this.fullDcRf = fullDcRf;
+ this.satelliteRf = satelliteRf;
+ }
+
+ EndpointsForRange calculateNaturalReplicas(Token searchToken,
ClusterMetadata metadata)
+ {
+ Range<Token> range =
TokenRingUtils.getRange(metadata.tokenMap.tokens(), searchToken);
+ List<Replica> allReplicas = new ArrayList<>();
+
+ for (Map.Entry<String, Integer> entry : fullDcRf.entrySet())
+ {
+ allReplicas.addAll(calculateReplicas(searchToken, range,
entry.getKey(), entry.getValue(), metadata, false));
+ }
+
+ for (Map.Entry<String, Integer> entry : satelliteRf.entrySet())
+ {
+ allReplicas.addAll(calculateReplicas(searchToken, range,
entry.getKey(), entry.getValue(), metadata, true));
+ }
+
+ return EndpointsForRange.copyOf(allReplicas);
+ }
+
+ /**
+ * Calculate replicas for a datacenter using simple ring walk with
rack awareness.
+ */
+ private static List<Replica> calculateReplicas(Token searchToken,
+ Range<Token> range,
+ String datacenter,
+ int rf,
+ ClusterMetadata
metadata,
+ boolean isTransient)
+ {
+ List<Replica> replicas = new ArrayList<>();
+ Set<String> seenRacks = new HashSet<>();
+ Set<InetAddressAndPort> seenEndpoints = new HashSet<>();
+
+ Iterator<Token> ringIter = TokenRingUtils.ringIterator(
+ metadata.tokenMap.tokens(), searchToken, false);
+
+ while (replicas.size() < rf && ringIter.hasNext())
+ {
+ Token token = ringIter.next();
+ NodeId owner = metadata.tokenMap.owner(token);
+ InetAddressAndPort endpoint =
metadata.directory.endpoint(owner);
+ Location location = metadata.directory.location(owner);
+
+ if (!location.datacenter.equals(datacenter))
+ continue;
+ if (seenEndpoints.contains(endpoint))
+ continue;
+
+ // rack awareness: prefer new racks when available
+ boolean newRack = seenRacks.add(location.rack);
+ int remainingSlots = rf - replicas.size();
+
+ // count unique racks in this DC
+ long totalRacks =
metadata.directory.datacenterEndpoints(datacenter).stream()
+ .map(ep ->
metadata.directory.location(metadata.directory.peerId(ep)).rack)
+ .distinct()
+ .count();
+ int remainingRacks = (int)(totalRacks - seenRacks.size());
+
+ if (newRack || remainingSlots > remainingRacks)
+ {
+ Replica replica = isTransient ?
Replica.transientReplica(endpoint, range)
+ :
Replica.fullReplica(endpoint, range);
+ replicas.add(replica);
+ seenEndpoints.add(endpoint);
+ }
+ }
+
+ return replicas;
+ }
+ }
+
+ private void setupCluster(TestCase testCase) throws UnknownHostException
+ {
+ ServerTestUtils.resetCMS();
+
+ int tokenIndex = 0;
+
+ // create full DCs
+ for (int dcIdx = 0; dcIdx < testCase.fullDcs.size(); dcIdx++)
+ {
+ String dcName = testCase.fullDcs.get(dcIdx);
+ int nodeCount = testCase.nodeCounts.get(dcIdx);
+ int rackCount = testCase.rackCounts.get(dcIdx);
+
+ for (int nodeIdx = 0; nodeIdx < nodeCount; nodeIdx++)
+ {
+ String rackName = "rack" + (nodeIdx % rackCount + 1);
+ Location location = new Location(dcName, rackName);
+ byte[] address = new byte[]{10, (byte) dcIdx, (byte) (nodeIdx
/ 256), (byte) (nodeIdx % 256)};
+
+ Token token = testCase.nodeTokens.get(tokenIndex++);
+
+ addEndpoint(token, address, location);
+ }
+ }
+
+ // create satellite DCs
+ for (int satIdx = 0; satIdx < testCase.satelliteDcs.size(); satIdx++)
+ {
+ String satName = testCase.satelliteDcs.get(satIdx);
+ int satNodeCount = testCase.satNodeCounts.get(satIdx);
+ int satRackCount = testCase.satRackCounts.get(satIdx);
+
+ for (int nodeIdx = 0; nodeIdx < satNodeCount; nodeIdx++)
+ {
+ String rackName = "rack" + (nodeIdx % satRackCount + 1);
+ Location location = new Location(satName, rackName);
+ byte[] address = new byte[]{10, (byte) (100 + satIdx), (byte)
(nodeIdx / 256), (byte) (nodeIdx % 256)};
+
+ Token token = testCase.nodeTokens.get(tokenIndex++);
+
+ addEndpoint(token, address, location);
+ }
+ }
+ }
+
+ private void addEndpoint(Token token, byte[] address, Location location)
throws UnknownHostException
+ {
+ InetAddressAndPort addr = InetAddressAndPort.getByAddress(address);
+ ClusterMetadataTestHelper.addEndpoint(addr, token, location);
+ }
+
+ private Map<String, String> buildSrsOptions(TestCase testCase)
+ {
+ Map<String, String> options = new HashMap<>();
+
+ for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+ {
+ options.put(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+
+ for (Map.Entry<String, Integer> entry :
testCase.satelliteRf.entrySet())
+ {
+ String satDc = entry.getKey();
+ int rf = entry.getValue();
+ String parentDc = testCase.satelliteToParent.get(satDc);
+ options.put(parentDc + ".satellite." + satDc, rf + "/" + rf);
+ }
+
+ for (String dc : testCase.disabledDCs)
+ {
+ options.put(dc + ".disabled", "true");
+ }
+
+ options.put("primary", testCase.primaryDc);
+ return options;
+ }
+
+ private Map<String, String> buildNtsOptions(TestCase testCase)
+ {
+ Map<String, String> options = new HashMap<>();
+
+ // full dcs only - NTS can't handle all-witness satellites
+ for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+ {
+ options.put(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+
+ return options;
+ }
+
+ private Map<String, String> buildNtsOptionsWithSatellites(TestCase
testCase)
+ {
+ Map<String, String> options = new HashMap<>();
+
+ // Full DCs
+ for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+ {
+ options.put(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+
+ // Satellites as normal full replicas
+ for (Map.Entry<String, Integer> entry :
testCase.satelliteRf.entrySet())
+ {
+ options.put(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+
+ return options;
+ }
+
+ private EndpointsForRange filterByDatacenters(EndpointsForRange replicas,
List<String> datacenters)
+ {
+ Range<Token> range = replicas.range();
+ List<Replica> filtered = new ArrayList<>();
+
+ for (Replica replica : replicas)
+ {
+ Location location = ClusterMetadata.current()
+ .directory
+
.location(ClusterMetadata.current().directory.peerId(replica.endpoint()));
+
+ if (datacenters.contains(location.datacenter))
+ {
+ filtered.add(replica);
+ }
+ }
+
+ if (filtered.isEmpty())
+ return EndpointsForRange.empty(range);
+
+ return EndpointsForRange.copyOf(filtered);
+ }
+
+ private static void assertReplicaSetsEqual(ReplicaCollection<?> actual,
ReplicaCollection<?> expected, String context)
+ {
+ if (actual.size() != expected.size())
+ {
+ throw new AssertionError(String.format("%s: Replica count mismatch
- expected %d, got %d\nExpected: %s\nActual: %s",
+ context, expected.size(), actual.size(), expected, actual));
+ }
+
+ Set<Replica> actualSet = new HashSet<>();
+ Set<Replica> expectedSet = new HashSet<>();
+ actual.forEach(actualSet::add);
+ expected.forEach(expectedSet::add);
+
+ if (!actualSet.equals(expectedSet))
+ {
+ Set<Replica> missing = new HashSet<>(expectedSet);
+ missing.removeAll(actualSet);
+ Set<Replica> extra = new HashSet<>(actualSet);
+ extra.removeAll(expectedSet);
+
+ throw new AssertionError(String.format("%s: Replica sets don't
match\nMissing: %s\nExtra: %s",
+ context, missing, extra));
+ }
+ }
+
+ @Test
+ public void testSrsEquivalenceToNts() throws Exception
+ {
+ qt().withShrinkCycles(0)
+ .withExamples(100)
+ .forAll(TestCase.gen())
+ .checkAssert(testCase -> {
+ try
+ {
+ setupCluster(testCase);
+
+ // create strategies
+ Map<String, String> srsOptions = buildSrsOptions(testCase);
+ SatelliteReplicationStrategy srs = new
SatelliteReplicationStrategy(KEYSPACE,
+
srsOptions,
+
ReplicationType.tracked);
+
+ // NTS without satellite options to confirm that SRS full
datacenter replica selection (even with satellites) matches
+ // the NTS replica selection without satellites. This is
the migration use case where the satellites are added to the
+ // replication strategy
+ Map<String, String> ntsOptions = buildNtsOptions(testCase);
+ NetworkTopologyStrategy nts = new
NetworkTopologyStrategy(KEYSPACE,
+
ntsOptions,
+
ReplicationType.untracked);
+
+ // Create NTS with satellites as full replicas to verify
that the replica selection is the same as NTS
+ Map<String, String> ntsWithSatellitesOptions =
buildNtsOptionsWithSatellites(testCase);
+ NetworkTopologyStrategy ntsWithSatellites = new
NetworkTopologyStrategy(KEYSPACE,
+
ntsWithSatellitesOptions,
+
ReplicationType.untracked);
+
+ ReplicaPlacementModel model = new
ReplicaPlacementModel(testCase.fullDcRf, testCase.satelliteRf);
+
+ for (Token queryToken : testCase.queryTokens)
+ {
+ EndpointsForRange srsReplicas =
srs.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+ EndpointsForRange ntsReplicas =
nts.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+ EndpointsForRange ntsWithSatellitesReplicas =
ntsWithSatellites.calculateNaturalReplicas(queryToken,
ClusterMetadata.current());
+ EndpointsForRange modelReplicas =
model.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+
+ // SRS full DC replicas == NTS replicas
+ EndpointsForRange srsFullDcReplicas =
filterByDatacenters(srsReplicas, testCase.fullDcs);
+ assertReplicaSetsEqual(srsFullDcReplicas, ntsReplicas,
"SRS full DC vs NTS");
+
+ // Model full DC replicas == NTS replicas
+ EndpointsForRange modelFullDcReplicas =
filterByDatacenters(modelReplicas, testCase.fullDcs);
+ assertReplicaSetsEqual(modelFullDcReplicas,
ntsReplicas, "Model full DC vs NTS");
+
+ // SRS replicas == Model replicas
+ assertReplicaSetsEqual(srsReplicas, modelReplicas,
"SRS vs Model");
+
+ // SRS satellite node selection == NTS satellite node
selection
+ if (!testCase.satelliteDcs.isEmpty())
+ {
+ EndpointsForRange srsSatelliteReplicas =
filterByDatacenters(srsReplicas, testCase.satelliteDcs);
+ EndpointsForRange ntsSatelliteReplicas =
filterByDatacenters(ntsWithSatellitesReplicas, testCase.satelliteDcs);
+
+ // endpoints only - isTransient will be different
+ Set<InetAddressAndPort> srsEndpoints = new
HashSet<>();
+ Set<InetAddressAndPort> ntsEndpoints = new
HashSet<>();
+ srsSatelliteReplicas.forEach(r ->
srsEndpoints.add(r.endpoint()));
+ ntsSatelliteReplicas.forEach(r ->
ntsEndpoints.add(r.endpoint()));
+
+ if (!srsEndpoints.equals(ntsEndpoints))
+ {
+ throw new AssertionError(String.format(
+ "Satellite node selection differs between
SRS and NTS\nSRS endpoints: %s\nNTS endpoints: %s",
+ srsEndpoints, ntsEndpoints));
+ }
+ }
+
+ // check all satellite replicas are transient
+ for (Replica replica : srsReplicas)
+ {
+ Location location = ClusterMetadata.current()
+ .directory
+
.location(ClusterMetadata.current().directory.peerId(replica.endpoint()));
+
+ if
(testCase.satelliteDcs.contains(location.datacenter))
+ {
+ if (!replica.isTransient())
+ {
+ throw new AssertionError("Satellite
replica must be transient: " + replica);
+ }
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Test failed", e);
+ }
+ });
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java
new file mode 100644
index 0000000000..fc2a81e2e1
--- /dev/null
+++
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.CassandraTestBase;
+import org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+
+import static org.apache.cassandra.CassandraTestBase.DisableMBeanRegistration;
+import static org.apache.cassandra.CassandraTestBase.PrepareServerNoRegister;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@PrepareServerNoRegister
+@DisableMBeanRegistration
+@UseMurmur3Partitioner
+public class SatelliteReplicationStrategyTest extends CassandraTestBase
+{
+ private static final String KEYSPACE = "test";
+
+ @After
+ public void teardown()
+ {
+ ServerTestUtils.resetCMS();
+ }
+
+ private void addToken(long token, String address, Location location)
throws UnknownHostException
+ {
+ InetAddressAndPort addr = InetAddressAndPort.getByName(address);
+ ClusterMetadataTestHelper.addEndpoint(addr, new LongToken(token),
location);
+ }
+
+ private void setupDCs() throws UnknownHostException
+ {
+ Location dc1 = new Location("dc1", "rack1");
+ Location dc2 = new Location("dc2", "rack1");
+ Location sat1 = new Location("sat1", "rack1");
+ Location sat2 = new Location("sat2", "rack1");
+
+ // DC1
+ addToken(100, "10.0.0.10", dc1);
+ addToken(200, "10.0.0.11", dc1);
+ addToken(300, "10.0.0.12", dc1);
+
+ // DC2
+ addToken(400, "10.1.0.10", dc2);
+ addToken(500, "10.1.0.11", dc2);
+ addToken(600, "10.1.0.12", dc2);
+
+ // SAT1
+ addToken(700, "10.2.0.10", sat1);
+ addToken(800, "10.2.0.11", sat1);
+
+ // SAT2
+ addToken(900, "10.3.0.10", sat2);
+ addToken(1000, "10.3.0.11", sat2);
+ }
+
+ private static SatelliteReplicationStrategy getSRS(String keyspace)
+ {
+ KeyspaceMetadata ksm =
ClusterMetadata.current().schema.getKeyspaces().getNullable(keyspace);
+ return (SatelliteReplicationStrategy) ksm.replicationStrategy;
+ }
+
+ @Test
+ public void testValidSingleDCWithSatellite() throws Exception
+ {
+ setupDCs();
+
+ String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+ "'class': 'SatelliteReplicationStrategy', " +
+ "'dc1': '3', " +
+ "'dc1.satellite.sat1': '2/2', " +
+ "'primary': 'dc1'" +
+ "} AND replication_type = 'tracked'";
+
+ ClusterMetadataTestHelper.createKeyspace(cql);
+
+ SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+ assertEquals("dc1", strategy.getPrimaryDC());
+ assertEquals(1, strategy.getDatacenters().size());
+ assertTrue(strategy.getDatacenters().contains("dc1"));
+ assertEquals(1, strategy.getSatellites().size());
+ assertTrue(strategy.getSatellites().containsKey("sat1"));
+ }
+
+ @Test
+ public void testValidMultipleDCsWithSatellites() throws Exception
+ {
+ setupDCs();
+
+ String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+ "'class': 'SatelliteReplicationStrategy', " +
+ "'dc1': '3', " +
+ "'dc1.satellite.sat1': '2/2', " +
+ "'dc2': '3', " +
+ "'dc2.satellite.sat2': '2/2', " +
+ "'primary': 'dc1'" +
+ "} AND replication_type = 'tracked'";
+
+ ClusterMetadataTestHelper.createKeyspace(cql);
+
+ SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+ assertEquals("dc1", strategy.getPrimaryDC());
+ assertEquals(2, strategy.getDatacenters().size());
+ assertEquals(2, strategy.getSatellites().size());
+ }
+
+ private void testConfigurationException(Map<String, String> options,
String messageContains) throws UnknownHostException
+ {
+ setupDCs();
+
+ try
+ {
+ new SatelliteReplicationStrategy(KEYSPACE, options,
ReplicationType.tracked);
+ fail("ConfigurationException expected");
+ }
+ catch (ConfigurationException e)
+ {
+ assertTrue(e.getMessage().contains(messageContains));
+ }
+ }
+
+ @Test
+ public void testMissingPrimaryFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+
+ testConfigurationException(options, "'primary' option is required");
+ }
+
+ @Test
+ public void testPrimaryNotInFullDCsFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("primary", "dc2");
+
+ testConfigurationException(options, "Primary datacenter 'dc2' must be
defined");
+ }
+
+ @Test
+ public void testUntrackedReplicationFails() throws Exception
+ {
+ setupDCs();
+
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("primary", "dc1");
+
+ SatelliteReplicationStrategy strategy = new
SatelliteReplicationStrategy(
+ KEYSPACE, options, ReplicationType.untracked);
+
+ try
+ {
+ strategy.validateExpectedOptions(ClusterMetadata.current());
+ fail("ConfigurationException expected");
+ }
+ catch (ConfigurationException e)
+ {
+ assertTrue(e.getMessage().contains("requires tracked
replication"));
+ }
+ }
+
+ @Test
+ public void testDotsInDCNamesFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc.1", "3");
+ options.put("primary", "dc.1");
+
+ testConfigurationException(options, "cannot contain dots");
+ }
+
+ @Test
+ public void testOrphanedSatelliteFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc2.satellite.sat1", "2/2");
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "references non-existent full
datacenter 'dc2'");
+ }
+
+ @Test
+ public void testSatellitePartialWitnessFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc1.satellite.sat1", "3/1");
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "must all be witnesses");
+ }
+
+ @Test
+ public void testSatelliteRequiresWitnessFormat() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc1.satellite.sat1", "3"); // Missing witness format
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "witness replicas using format");
+ }
+
+ @Test
+ public void testReplicaCalculationWithSatellites() throws Exception
+ {
+ setupDCs();
+
+ String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+ "'class': 'SatelliteReplicationStrategy', " +
+ "'dc1': '3', " +
+ "'dc1.satellite.sat1': '2/2', " +
+ "'primary': 'dc1'" +
+ "} AND replication_type = 'tracked'";
+
+ ClusterMetadataTestHelper.createKeyspace(cql);
+
+ SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+ EndpointsForRange replicas = strategy.calculateNaturalReplicas(
+ new LongToken(150), ClusterMetadata.current());
+
+ // Should have 3 full replicas from dc1 + 2 satellite replicas from
sat1
+ assertEquals(5, replicas.size());
+
+ int fullCount = 0;
+ int witnessCount = 0;
+ for (Replica replica : replicas)
+ {
+ if (replica.isFull())
+ fullCount++;
+ else
+ witnessCount++;
+ }
+
+ assertEquals(3, fullCount);
+ assertEquals(2, witnessCount);
+ }
+
+ @Test
+ public void testDisableNonPrimaryDC() throws Exception
+ {
+ setupDCs();
+
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc2", "3");
+ options.put("dc2.disabled", "true");
+ options.put("primary", "dc1");
+
+ SatelliteReplicationStrategy strategy =
+ new SatelliteReplicationStrategy(KEYSPACE, options,
ReplicationType.tracked);
+
+ assertTrue(strategy.isDisabled("dc2"));
+ assertFalse(strategy.isDisabled("dc1"));
+ assertEquals(1, strategy.getDisabledDatacenters().size());
+ assertTrue(strategy.getDisabledDatacenters().contains("dc2"));
+
+ // getDatacenters returns all DCs including disabled (config view)
+ assertEquals(2, strategy.getDatacenters().size());
+ assertTrue(strategy.getDatacenters().contains("dc2"));
+
+ // getReplicationFactor(dc) returns configured RF even for disabled DCs
+ assertEquals(3, strategy.getReplicationFactor("dc2").allReplicas);
+
+ // aggregate RF includes all DCs (disabled does not affect placement)
+ assertEquals(6, strategy.getReplicationFactor().allReplicas);
+ }
+
+ @Test
+ public void testDisablePrimaryDCFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc2", "3");
+ options.put("dc1.disabled", "true");
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "Primary datacenter 'dc1' cannot
be disabled");
+ }
+
+ @Test
+ public void testDisableNonExistentDCFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc99.disabled", "true");
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "not defined as a full
datacenter");
+ }
+
+ @Test
+ public void testDisabledDCSatelliteStillGetsReplicas() throws Exception
+ {
+ setupDCs();
+
+ String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+ "'class': 'SatelliteReplicationStrategy', " +
+ "'dc1': '3', " +
+ "'dc1.satellite.sat1': '2/2', " +
+ "'dc2': '3', " +
+ "'dc2.satellite.sat2': '2/2', " +
+ "'dc2.disabled': 'true', " +
+ "'primary': 'dc1'" +
+ "} AND replication_type = 'tracked'";
+
+ ClusterMetadataTestHelper.createKeyspace(cql);
+
+ SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+ EndpointsForRange replicas = strategy.calculateNaturalReplicas(
+ new LongToken(150), ClusterMetadata.current());
+
+ // Disabled does not affect placement — all DCs and satellites still
get replicas
+ // 3 full from dc1 + 3 full from dc2 + 2 witness from sat1 + 2 witness
from sat2
+ assertEquals(10, replicas.size());
+ }
+
+ @Test
+ public void testDisabledInvalidValueFails() throws Exception
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put("dc1", "3");
+ options.put("dc2", "3");
+ options.put("dc2.disabled", "maybe");
+ options.put("primary", "dc1");
+
+ testConfigurationException(options, "expected 'true' or 'false'");
+ }
+
+ @Test
+ public void testHasSameSettingsWithDisabled() throws Exception
+ {
+ setupDCs();
+
+ Map<String, String> optionsA = new HashMap<>();
+ optionsA.put("dc1", "3");
+ optionsA.put("dc2", "3");
+ optionsA.put("dc2.disabled", "true");
+ optionsA.put("primary", "dc1");
+
+ Map<String, String> optionsB = new HashMap<>();
+ optionsB.put("dc1", "3");
+ optionsB.put("dc2", "3");
+ optionsB.put("dc2.disabled", "true");
+ optionsB.put("primary", "dc1");
+
+ Map<String, String> optionsC = new HashMap<>();
+ optionsC.put("dc1", "3");
+ optionsC.put("dc2", "3");
+ optionsC.put("primary", "dc1");
+
+ SatelliteReplicationStrategy strategyA =
+ new SatelliteReplicationStrategy(KEYSPACE, optionsA,
ReplicationType.tracked);
+ SatelliteReplicationStrategy strategyB =
+ new SatelliteReplicationStrategy(KEYSPACE, optionsB,
ReplicationType.tracked);
+ SatelliteReplicationStrategy strategyC =
+ new SatelliteReplicationStrategy(KEYSPACE, optionsC,
ReplicationType.tracked);
+
+ assertTrue(strategyA.hasSameSettings(strategyB));
+ assertFalse(strategyA.hasSameSettings(strategyC));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]