This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-58-satellite-datacenters
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-58-satellite-datacenters 
by this push:
     new 9ef1e8e192 CEP-58: Add satellite replication strategy stub
9ef1e8e192 is described below

commit 9ef1e8e1924101d87e4c1dd0ffdcb2dc257d5356
Author: Blake Eggleston <[email protected]>
AuthorDate: Wed Feb 4 10:23:16 2026 -0800

    CEP-58: Add satellite replication strategy stub
    
    Patch by Blake Eggleston; Reviewed by Caleb Rackliffe for CASSANDRA-21105
---
 .../locator/AbstractReplicationStrategy.java       |   4 +-
 .../cassandra/locator/ReplicationFactor.java       |   4 +-
 .../locator/SatelliteReplicationStrategy.java      | 516 +++++++++++++++++++
 ...atelliteReplicationStrategyEquivalenceTest.java | 554 +++++++++++++++++++++
 .../locator/SatelliteReplicationStrategyTest.java  | 402 +++++++++++++++
 5 files changed, 1477 insertions(+), 3 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 93b0d26fa6..0860692901 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -369,9 +369,11 @@ public abstract class AbstractReplicationStrategy
         try
         {
             ReplicationFactor rf = ReplicationFactor.fromString(s);
-            
+
             if (rf.hasTransientReplicas())
             {
+                if (rf.fullReplicas == 0)
+                    throw new ConfigurationException("Replication factor must 
have at least one full replica, got " + s);
                 if (DatabaseDescriptor.getNumTokens() > 1)
                     throw new ConfigurationException("Transient replication is 
not supported with vnodes yet");
                 if (!replicationType.isTracked())
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java 
b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
index ee971d900d..254f8d58af 100644
--- a/src/java/org/apache/cassandra/locator/ReplicationFactor.java
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -65,8 +65,8 @@ public class ReplicationFactor
                                     "Transient replication is not enabled on 
this node");
         Preconditions.checkArgument(totalRF >= 0,
                                     "Replication factor must be non-negative, 
found %s", totalRF);
-        Preconditions.checkArgument(transientRF == 0 || transientRF < totalRF,
-                                    "Transient replicas must be zero, or less 
than total replication factor. For %s/%s", totalRF, transientRF);
+        Preconditions.checkArgument(transientRF <= totalRF,
+                                    "Transient replicas must be less than or 
equal to total replication factor. For %s/%s", totalRF, transientRF);
         if (transientRF > 0)
         {
             Preconditions.checkArgument(DatabaseDescriptor.getNumTokens() == 1,
diff --git 
a/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java
new file mode 100644
index 0000000000..ce351a1236
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/SatelliteReplicationStrategy.java
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import 
org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo;
+import 
org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.ReplicaGroups;
+import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+
+import static java.lang.String.format;
+
+/**
+ * Replication strategy for  CEP-58: Satellite Datacenters
+ * allows pairing full datacenters with smaller satellite DCs * that use 
witness replicas for mutation tracking and
+ * enable transactionally consistent failover without requiring 3 full 
datacenters.
+ *
+ * Configuration syntax uses dot notation:
+ *
+ * CREATE KEYSPACE ks WITH replication = {
+ *   'class': 'SatelliteReplicationStrategy',
+ *   'DC1': '3',                    // Full datacenter with RF=3
+ *   'DC1.satellite.ST1': '3/3',    // Satellite ST1 for DC1 with 3 witness 
replicas
+ *   'DC2': '3',                    // Second full DC
+ *   'DC2.satellite.ST2': '3/3',    // Satellite ST2 for DC2
+ *   'primary': 'DC1'               // Primary datacenter designation
+ *   'DC2.disabled': 'true',        // Replication to DC1 disabled (disabled 
DC can't be primary, use for down DCs)
+ * } AND replication_type = tracked;
+ *
+ * Requirements:
+ * - Keyspace must use tracked replication (replication_type = tracked)
+ * - Cluster must have transient replication enabled 
(transient_replication_enabled: true)
+ * - Satellites must use witness replica format 'N/N' where all replicas are 
transient
+ *
+ * Uses NetworkTopologyStrategy replica selection algorithm for compatibility 
with existing NTS keyspaces
+ */
+public class SatelliteReplicationStrategy extends AbstractReplicationStrategy
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SatelliteReplicationStrategy.class);
+
+    private static final String PRIMARY_DC_KEY = "primary";
+    private static final String SATELLITE_KEY_PATTERN = ".satellite.";
+    private static final String DISABLED_KEY_SUFFIX = ".disabled";
+
+    private final Map<String, ReplicationFactor> fullDCs;
+
+    private final Map<String, SatelliteInfo> satellites;
+
+    private final Map<String, ReplicationFactor> allDCs;
+
+    private final String primaryDC;
+
+    private final Set<String> disabledDCs;
+
+    private final ReplicationFactor aggregateRf;
+
+    private static class SatelliteInfo
+    {
+        public final String parentDC;
+
+        public final String name;
+
+        public final ReplicationFactor rf;
+
+        SatelliteInfo(String parentDC, String name, ReplicationFactor rf)
+        {
+            this.parentDC = parentDC;
+            this.name = name;
+            this.rf = rf;
+        }
+    }
+
+    private static void cfgEx(String format, Object... args)
+    {
+        throw new ConfigurationException(format(format, args));
+    }
+
+    private static class StrategyOptions
+    {
+        Map<String, ReplicationFactor> fullDatacenters = new HashMap<>();
+        Map<String, SatelliteInfo> satellites = new HashMap<>();
+        Map<String, ReplicationFactor> allDatacenters = new HashMap<>();
+        Set<String> disabledDCs = new HashSet<>();
+        String primary = null;
+
+        StrategyOptions(Map<String, String> configOptions)
+        {
+
+            if (configOptions != null)
+            {
+                for (Map.Entry<String, String> entry : 
configOptions.entrySet())
+                {
+                    String key = entry.getKey();
+                    String value = entry.getValue();
+
+                    if (setPrimary(key, value))
+                        continue;
+
+                    if (addSatellite(key, value))
+                        continue;
+
+                    if (addPrimary(key, value))
+                        continue;
+
+                    if (key.contains("."))
+                        cfgEx("Datacenter names cannot contain dots: '%s'. Use 
'<DC>.satellite.<SAT>' for satellites", key);
+
+                    ReplicationFactor rf = ReplicationFactor.fromString(value);
+                    fullDatacenters.put(key, rf);
+                }
+            }
+
+            allDatacenters.putAll(fullDatacenters);
+            satellites.forEach((dc, info) -> allDatacenters.put(dc, info.rf));
+        }
+
+        boolean setPrimary(String key, String value)
+        {
+            if (!key.equalsIgnoreCase(PRIMARY_DC_KEY))
+                return false;
+
+            if (primary != null)
+                cfgEx("'primary' option specified multiple times");
+
+            primary = value;
+            return true;
+        }
+
+        boolean addSatellite(String key, String value)
+        {
+            if (!key.contains(SATELLITE_KEY_PATTERN))
+                return false;
+
+            String[] parts = key.split("\\" + SATELLITE_KEY_PATTERN);
+            if (parts.length != 2)
+                cfgEx("Invalid satellite configuration key '%s'. Expected 
format: '<DC>.satellite.<SAT>'", key);
+
+            String parentDc = parts[0];
+            String satelliteName = parts[1];
+
+            if (parentDc.contains(".") || satelliteName.contains("."))
+                cfgEx("Datacenter names cannot contain dots: '%s'", key);
+
+            // Parse RF - satellites must use witness format (e.g., '3/3')
+            if (!value.contains("/"))
+                cfgEx("Satellite replicas must be specified as witness 
replicas using format 'N/N' (e.g., '3/3'). Got '%s'", value);
+
+            String[] rfParts = value.split("/");
+            if (rfParts.length != 2)
+                cfgEx("Invalid replication factor format for satellite '%s': 
'%s'", satelliteName, value);
+
+            try
+            {
+                int total = Integer.parseInt(rfParts[0]);
+                int transientCount = Integer.parseInt(rfParts[1]);
+
+                if (transientCount != total)
+                    cfgEx("Satellite replicas must all be witnesses. Expected 
'%d/%d' but got '%s'", total, total, value);
+
+                if (total <= 0)
+                    cfgEx("Satellite replication factor must be positive. Got 
'%s'", value);
+
+                satellites.put(satelliteName, new SatelliteInfo(parentDc, 
satelliteName,
+                                                               
ReplicationFactor.withTransient(total, total)));
+            }
+            catch (NumberFormatException e)
+            {
+                cfgEx("Invalid replication factor format for satellite '%s': 
'%s'", satelliteName, value);
+            }
+            return true;
+        }
+
+        boolean addPrimary(String key, String value)
+        {
+            if (!key.endsWith(DISABLED_KEY_SUFFIX))
+                return false;
+
+            String dcName = key.substring(0, key.length() - 
DISABLED_KEY_SUFFIX.length());
+            if (dcName.isEmpty())
+                cfgEx("Invalid disabled configuration key '%s'", key);
+            if (dcName.contains("."))
+                cfgEx("Datacenter names cannot contain dots: '%s'", dcName);
+
+            if ("true".equalsIgnoreCase(value))
+                disabledDCs.add(dcName);
+            else if (!"false".equalsIgnoreCase(value))
+                cfgEx("Invalid value for '%s': expected 'true' or 'false', got 
'%s'", key, value);
+
+            return true;
+        }
+    }
+
+    public SatelliteReplicationStrategy(String keyspaceName,
+                                        Map<String, String> configOptions,
+                                        ReplicationType replicationType) 
throws ConfigurationException
+    {
+        super(keyspaceName, configOptions, replicationType);
+
+        StrategyOptions opts = new StrategyOptions(configOptions);
+
+        this.fullDCs = Collections.unmodifiableMap(opts.fullDatacenters);
+        this.satellites = Collections.unmodifiableMap(opts.satellites);
+        this.allDCs = Collections.unmodifiableMap(opts.allDatacenters);
+        this.primaryDC = opts.primary;
+        this.disabledDCs = Collections.unmodifiableSet(opts.disabledDCs);
+
+        validate();
+
+        int totalReplicas = 0;
+        int totalTransient = 0;
+        for (ReplicationFactor rf : fullDCs.values())
+        {
+            totalReplicas += rf.allReplicas;
+            totalTransient += rf.transientReplicas();
+        }
+
+        this.aggregateRf = ReplicationFactor.withTransient(totalReplicas, 
totalTransient);
+
+        if (disabledDCs.isEmpty())
+            logger.info("Configured satellite datacenter replication for 
keyspace {} with full datacenters {} (primary: {}), satellites {}",
+                        keyspaceName, fullDCs, primaryDC, satellites);
+        else
+            logger.info("Configured satellite datacenter replication for 
keyspace {} with full datacenters {} (primary: {}), satellites {}, disabled {}",
+                        keyspaceName, fullDCs, primaryDC, satellites, 
disabledDCs);
+    }
+
+    private void validate()
+    {
+        if (primaryDC == null)
+            cfgEx("'primary' option is required");
+
+        if (!fullDCs.containsKey(primaryDC))
+            cfgEx("Primary datacenter '%s' must be defined as a full 
datacenter", primaryDC);
+
+        for (SatelliteInfo satInfo : satellites.values())
+        {
+            if (!fullDCs.containsKey(satInfo.parentDC))
+                cfgEx("Satellite '%s' references non-existent full datacenter 
'%s'", satInfo.name, satInfo.parentDC);
+        }
+
+        for (String disabledDC : disabledDCs)
+        {
+            if (!fullDCs.containsKey(disabledDC))
+                cfgEx("Disabled datacenter '%s' is not defined as a full 
datacenter", disabledDC);
+        }
+
+        if (disabledDCs.contains(primaryDC))
+            cfgEx("Primary datacenter '%s' cannot be disabled", primaryDC);
+    }
+
+    @Override
+    public EndpointsForRange calculateNaturalReplicas(Token searchToken, 
ClusterMetadata metadata)
+    {
+        return NetworkTopologyStrategy.calculateNaturalReplicas(
+            searchToken,
+            TokenRingUtils.getRange(metadata.tokenMap.tokens(), searchToken),
+            metadata.directory,
+            metadata.tokenMap,
+            allDCs);
+    }
+
+    @Override
+    public DataPlacement calculateDataPlacement(Epoch epoch, 
List<Range<Token>> ranges, ClusterMetadata metadata)
+    {
+        ReplicaGroups.Builder builder = ReplicaGroups.builder();
+
+        for (Range<Token> range : ranges)
+        {
+            EndpointsForRange endpointsForRange = 
calculateNaturalReplicas(range.right, metadata);
+            builder.withReplicaGroup(VersionedEndpoints.forRange(epoch, 
endpointsForRange));
+        }
+
+        ReplicaGroups built = builder.build();
+        return new DataPlacement(built, built);
+    }
+
+    @Override
+    public ReplicationFactor getReplicationFactor()
+    {
+        return aggregateRf;
+    }
+
+    @Override
+    public Collection<String> recognizedOptions(ClusterMetadata metadata)
+    {
+        // accept anything - validation happens in constructor
+        return null;
+    }
+
+    @Override
+    public void validateExpectedOptions(ClusterMetadata metadata) throws 
ConfigurationException
+    {
+        // must use tracked replication type
+        if (!replicationType.isTracked())
+        {
+            throw new ConfigurationException(
+                getClass().getSimpleName() + " requires tracked replication. " 
+
+                "Use 'AND replication_type = tracked' when creating/updating 
the keyspace");
+        }
+
+        // must have witness replicas enabled
+        if (!DatabaseDescriptor.isTransientReplicationEnabled())
+        {
+            throw new ConfigurationException(
+                getClass().getSimpleName() + " requires transient replication 
to be enabled. " +
+                "Set 'transient_replication_enabled: true' in cassandra.yaml");
+        }
+
+        // must have at least one full DC
+        if (fullDCs.isEmpty())
+        {
+            throw new ConfigurationException(
+                "Configuration for at least one full datacenter must be 
present");
+        }
+
+        // validate datacenter names against topology
+        Set<String> knownDcs = metadata.directory.knownDatacenters();
+        for (String dc : fullDCs.keySet())
+        {
+            if (!knownDcs.contains(dc))
+            {
+                throw new ConfigurationException(format(
+                    "Unrecognized datacenter '%s'. Known DCs: %s", dc, 
knownDcs));
+            }
+        }
+
+        // validate satellite DCs exist and don't overlap with full DCs
+        for (SatelliteInfo satInfo : satellites.values())
+        {
+            if (!knownDcs.contains(satInfo.name))
+            {
+                throw new ConfigurationException(format(
+                    "Unrecognized satellite datacenter '%s'. Known DCs: %s", 
satInfo.name, knownDcs));
+            }
+
+            if (fullDCs.containsKey(satInfo.name))
+            {
+                throw new ConfigurationException(format(
+                    "Datacenter '%s' cannot be both a full datacenter and a 
satellite",
+                    satInfo.name));
+            }
+        }
+
+        // check for pending mutation tracking migrations
+        MutationTrackingMigrationState migrationState = 
metadata.mutationTrackingMigrationState;
+        KeyspaceMigrationInfo migrationInfo = 
migrationState.getKeyspaceInfo(keyspaceName);
+
+        if (migrationInfo != null && !migrationInfo.isComplete())
+        {
+            throw new ConfigurationException(format(
+                "Cannot alter replication strategy for keyspace %s while 
mutation tracking migration is in progress",
+                keyspaceName));
+        }
+    }
+
+    @Override
+    public void validateOptions() throws ConfigurationException
+    {
+        // validate full DC replication factors
+        for (Map.Entry<String, String> entry : this.configOptions.entrySet())
+        {
+            String key = entry.getKey();
+            if (key.equalsIgnoreCase(PRIMARY_DC_KEY) || 
key.contains(SATELLITE_KEY_PATTERN) || key.endsWith(DISABLED_KEY_SUFFIX))
+                continue;
+
+            validateReplicationFactor(entry.getValue());
+        }
+
+        // satellites are validated in constructor
+    }
+
+    @Override
+    public void maybeWarnOnOptions(ClientState state)
+    {
+        if (SchemaConstants.isSystemKeyspace(keyspaceName))
+            return;
+
+        Directory directory = ClusterMetadata.current().directory;
+        Multimap<String, InetAddressAndPort> dcsNodes =
+            directory.allDatacenterEndpoints();
+
+        for (Map.Entry<String, ReplicationFactor> entry : fullDCs.entrySet())
+        {
+            String dc = entry.getKey();
+            ReplicationFactor rf = entry.getValue();
+
+            if (disabledDCs.contains(dc))
+                continue;
+
+            // apply guardrails
+            Guardrails.minimumReplicationFactor.guard(rf.fullReplicas, 
keyspaceName, false, state);
+            Guardrails.maximumReplicationFactor.guard(rf.fullReplicas, 
keyspaceName, false, state);
+
+            // warn if rF > node count
+            int nodeCount = dcsNodes.containsKey(dc) ? dcsNodes.get(dc).size() 
: 0;
+            if (rf.fullReplicas > nodeCount && nodeCount != 0)
+            {
+                String msg = format(
+                    "Your replication factor %d for keyspace %s is higher than 
the number of nodes %d for datacenter %s",
+                    rf.fullReplicas, keyspaceName, nodeCount, dc);
+                ClientWarn.instance.warn(msg);
+                logger.warn(msg);
+            }
+        }
+
+        for (SatelliteInfo satInfo : satellites.values())
+        {
+            if (disabledDCs.contains(satInfo.parentDC))
+                continue;
+
+            String dc = satInfo.name;
+            int replicaCount = satInfo.rf.allReplicas;
+
+            int nodeCount = dcsNodes.containsKey(dc) ? dcsNodes.get(dc).size() 
: 0;
+            if (replicaCount > nodeCount && nodeCount != 0)
+            {
+                String msg = format(
+                    "Your replication factor %d for keyspace %s is higher than 
the number of nodes %d for satellite datacenter %s",
+                    replicaCount, keyspaceName, nodeCount, dc);
+                ClientWarn.instance.warn(msg);
+                logger.warn(msg);
+            }
+        }
+    }
+
+    public String getPrimaryDC()
+    {
+        return primaryDC;
+    }
+
+    public Set<String> getDatacenters()
+    {
+        return fullDCs.keySet();
+    }
+
+    public ReplicationFactor getReplicationFactor(String dc)
+    {
+        ReplicationFactor rf = fullDCs.get(dc);
+        return rf != null ? rf : ReplicationFactor.ZERO;
+    }
+
+    public Map<String, Integer> getSatellites()
+    {
+        Map<String, Integer> result = 
Maps.newHashMapWithExpectedSize(satellites.size());
+        satellites.forEach((k, v) -> result.put(k, v.rf.allReplicas));
+        return result;
+    }
+
+    public Set<String> getDisabledDatacenters()
+    {
+        return disabledDCs;
+    }
+
+    public boolean isDisabled(String dc)
+    {
+        return disabledDCs.contains(dc);
+    }
+
+    @Override
+    public boolean hasSameSettings(AbstractReplicationStrategy other)
+    {
+        if (!super.hasSameSettings(other))
+            return false;
+
+        if (!(other instanceof SatelliteReplicationStrategy))
+            return false;
+
+        SatelliteReplicationStrategy otherStrategy = 
(SatelliteReplicationStrategy) other;
+
+        return fullDCs.equals(otherStrategy.fullDCs) &&
+               satellites.equals(otherStrategy.satellites) &&
+               primaryDC.equals(otherStrategy.primaryDC) &&
+               disabledDCs.equals(otherStrategy.disabledDCs);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
 
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
new file mode 100644
index 0000000000..f6586eb0e7
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyEquivalenceTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+
+import org.apache.cassandra.CassandraTestBase;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static org.apache.cassandra.CassandraTestBase.DisableMBeanRegistration;
+import static org.apache.cassandra.CassandraTestBase.PrepareServerNoRegister;
+import static org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.arbitrary;
+
+/**
+ * We need to be able to convert network topology strategy keyspace to 
satellite replication strategy
+ * keyspaces without any differences relating to replica placement. A replica 
placement bug will cause dataloss
+ *
+ * This test checks the replica placement of both strategies against each 
other and a test model under
+ * a variety of configurations to confirm there are no differences between the 
2.
+ */
+@PrepareServerNoRegister
+@DisableMBeanRegistration
+@UseMurmur3Partitioner
+public class SatelliteReplicationStrategyEquivalenceTest extends 
CassandraTestBase
+{
+    private static final String KEYSPACE = "test";
+
+    @After
+    public void teardown()
+    {
+        ServerTestUtils.resetCMS();
+    }
+
+    static class TestCase
+    {
+        final List<String> fullDcs;
+        final List<String> satelliteDcs;
+        final Map<String, Integer> fullDcRf;
+        final Map<String, Integer> satelliteRf;
+        final String primaryDc;
+        final Set<String> disabledDCs;
+        final List<Token> nodeTokens;
+        final List<Token> queryTokens;
+
+        final List<Integer> nodeCounts;
+        final List<Integer> rackCounts;
+        final List<Integer> satNodeCounts;
+        final List<Integer> satRackCounts;
+        final Map<String, String> satelliteToParent;
+
+        TestCase(List<String> fullDcs, List<String> satelliteDcs,
+                 Map<String, Integer> fullDcRf, Map<String, Integer> 
satelliteRf,
+                 String primaryDc, Set<String> disabledDCs,
+                 List<Token> nodeTokens, List<Token> queryTokens,
+                 List<Integer> nodeCounts, List<Integer> rackCounts,
+                 List<Integer> satNodeCounts, List<Integer> satRackCounts,
+                 Map<String, String> satelliteToParent)
+        {
+            this.fullDcs = fullDcs;
+            this.satelliteDcs = satelliteDcs;
+            this.fullDcRf = fullDcRf;
+            this.satelliteRf = satelliteRf;
+            this.primaryDc = primaryDc;
+            this.disabledDCs = disabledDCs;
+            this.nodeTokens = nodeTokens;
+            this.queryTokens = queryTokens;
+            this.nodeCounts = nodeCounts;
+            this.rackCounts = rackCounts;
+            this.satNodeCounts = satNodeCounts;
+            this.satRackCounts = satRackCounts;
+            this.satelliteToParent = satelliteToParent;
+        }
+
+
+        static Gen<TestCase> gen()
+        {
+            return rnd -> {
+                int numFullDcs = arbitrary().pick(1, 2, 3).generate(rnd);
+                List<String> fullDcs = new ArrayList<>();
+                List<Integer> nodeCounts = new ArrayList<>();
+                List<Integer> rackCounts = new ArrayList<>();
+                Map<String, Integer> fullDcRf = new HashMap<>();
+
+                for (int i = 1; i <= numFullDcs; i++)
+                {
+                    String dcName = "dc" + i;
+                    fullDcs.add(dcName);
+
+                    int nodes = SourceDSL.integers().between(3, 
10).generate(rnd);
+                    nodeCounts.add(nodes);
+
+                    int racks = SourceDSL.integers().between(1, 
3).generate(rnd);
+                    rackCounts.add(racks);
+
+                    int rf = SourceDSL.integers().between(1, Math.min(7, 
nodes)).generate(rnd);
+                    fullDcRf.put(dcName, rf);
+                }
+
+                int numSatellites = arbitrary().pick(0, 1, 2, 3).generate(rnd);
+                numSatellites = Math.min(numSatellites, numFullDcs);
+
+                List<String> satelliteDcs = new ArrayList<>();
+                List<Integer> satNodeCounts = new ArrayList<>();
+                List<Integer> satRackCounts = new ArrayList<>();
+                Map<String, Integer> satelliteRf = new HashMap<>();
+                Map<String, String> satelliteToParent = new HashMap<>();
+
+                for (int i = 0; i < numSatellites; i++)
+                {
+                    String satName = "sat" + (i + 1);
+                    satelliteDcs.add(satName);
+
+                    int satNodes = SourceDSL.integers().between(2, 
5).generate(rnd);
+                    satNodeCounts.add(satNodes);
+
+                    int satRacks = SourceDSL.integers().between(1, Math.min(3, 
satNodes)).generate(rnd);
+                    satRackCounts.add(satRacks);
+
+                    int satRf = SourceDSL.integers().between(1, Math.min(3, 
satNodes)).generate(rnd);
+                    satelliteRf.put(satName, satRf);
+
+                    satelliteToParent.put(satName, fullDcs.get(i));
+                }
+
+                String primaryDc = arbitrary().pick(fullDcs).generate(rnd);
+
+                // Randomly disable some non-primary DCs (~25% chance per DC)
+                Set<String> disabledDCs = new HashSet<>();
+                for (String dc : fullDcs)
+                {
+                    if (!dc.equals(primaryDc) && arbitrary().pick(true, false, 
false, false).generate(rnd))
+                        disabledDCs.add(dc);
+                }
+
+                int totalNodes = 
nodeCounts.stream().mapToInt(Integer::intValue).sum() +
+                                 
satNodeCounts.stream().mapToInt(Integer::intValue).sum();
+
+                Set<Long> uniqueTokenValues = new HashSet<>();
+                while (uniqueTokenValues.size() < totalNodes)
+                {
+                    long tokenValue = 
SourceDSL.longs().between(Long.MIN_VALUE, Long.MAX_VALUE).generate(rnd);
+                    uniqueTokenValues.add(tokenValue);
+                }
+
+                List<Token> nodeTokens = new ArrayList<>();
+                for (long tokenValue : uniqueTokenValues)
+                {
+                    nodeTokens.add(new LongToken(tokenValue));
+                }
+                nodeTokens.sort(null);
+
+                List<Token> queryTokens = new ArrayList<>();
+
+                // boundary tokens
+                for (Token nodeToken : nodeTokens)
+                {
+                    long tokenValue = nodeToken.getLongValue();
+                    queryTokens.add(nodeToken);
+                    queryTokens.add(new LongToken(tokenValue - 1));
+                    queryTokens.add(new LongToken(tokenValue + 1));
+                }
+
+                // random tokens
+                int remainingTokens = Math.max(0, 2000 - queryTokens.size());
+                for (int i = 0; i < remainingTokens; i++)
+                {
+                    long tokenValue = 
SourceDSL.longs().between(Long.MIN_VALUE, Long.MAX_VALUE).generate(rnd);
+                    queryTokens.add(new LongToken(tokenValue));
+                }
+
+                return new TestCase(fullDcs, satelliteDcs, fullDcRf, 
satelliteRf, primaryDc, disabledDCs, nodeTokens, queryTokens,
+                                    nodeCounts, rackCounts, satNodeCounts, 
satRackCounts, satelliteToParent);
+            };
+        }
+    }
+
+    /**
+     * reference model for replica placement verification.
+     *
+     * implements basic rack-aware replica selection by walking the token ring.
+     */
+    static class ReplicaPlacementModel
+    {
+        private final Map<String, Integer> fullDcRf;
+        private final Map<String, Integer> satelliteRf;
+
+        ReplicaPlacementModel(Map<String, Integer> fullDcRf, Map<String, 
Integer> satelliteRf)
+        {
+            this.fullDcRf = fullDcRf;
+            this.satelliteRf = satelliteRf;
+        }
+
+        EndpointsForRange calculateNaturalReplicas(Token searchToken, 
ClusterMetadata metadata)
+        {
+            Range<Token> range = 
TokenRingUtils.getRange(metadata.tokenMap.tokens(), searchToken);
+            List<Replica> allReplicas = new ArrayList<>();
+
+            for (Map.Entry<String, Integer> entry : fullDcRf.entrySet())
+            {
+                allReplicas.addAll(calculateReplicas(searchToken, range, 
entry.getKey(), entry.getValue(), metadata, false));
+            }
+
+            for (Map.Entry<String, Integer> entry : satelliteRf.entrySet())
+            {
+                allReplicas.addAll(calculateReplicas(searchToken, range, 
entry.getKey(), entry.getValue(), metadata, true));
+            }
+
+            return EndpointsForRange.copyOf(allReplicas);
+        }
+
+        /**
+         * Calculate replicas for a datacenter using simple ring walk with 
rack awareness.
+         */
+        private static List<Replica> calculateReplicas(Token searchToken,
+                                                       Range<Token> range,
+                                                       String datacenter,
+                                                       int rf,
+                                                       ClusterMetadata 
metadata,
+                                                       boolean isTransient)
+        {
+            List<Replica> replicas = new ArrayList<>();
+            Set<String> seenRacks = new HashSet<>();
+            Set<InetAddressAndPort> seenEndpoints = new HashSet<>();
+
+            Iterator<Token> ringIter = TokenRingUtils.ringIterator(
+                metadata.tokenMap.tokens(), searchToken, false);
+
+            while (replicas.size() < rf && ringIter.hasNext())
+            {
+                Token token = ringIter.next();
+                NodeId owner = metadata.tokenMap.owner(token);
+                InetAddressAndPort endpoint = 
metadata.directory.endpoint(owner);
+                Location location = metadata.directory.location(owner);
+
+                if (!location.datacenter.equals(datacenter))
+                    continue;
+                if (seenEndpoints.contains(endpoint))
+                    continue;
+
+                // rack awareness: prefer new racks when available
+                boolean newRack = seenRacks.add(location.rack);
+                int remainingSlots = rf - replicas.size();
+
+                // count unique racks in this DC
+                long totalRacks = 
metadata.directory.datacenterEndpoints(datacenter).stream()
+                    .map(ep -> 
metadata.directory.location(metadata.directory.peerId(ep)).rack)
+                    .distinct()
+                    .count();
+                int remainingRacks = (int)(totalRacks - seenRacks.size());
+
+                if (newRack || remainingSlots > remainingRacks)
+                {
+                    Replica replica = isTransient ? 
Replica.transientReplica(endpoint, range)
+                                                : 
Replica.fullReplica(endpoint, range);
+                    replicas.add(replica);
+                    seenEndpoints.add(endpoint);
+                }
+            }
+
+            return replicas;
+        }
+    }
+
+    private void setupCluster(TestCase testCase) throws UnknownHostException
+    {
+        ServerTestUtils.resetCMS();
+
+        int tokenIndex = 0;
+
+        // create full DCs
+        for (int dcIdx = 0; dcIdx < testCase.fullDcs.size(); dcIdx++)
+        {
+            String dcName = testCase.fullDcs.get(dcIdx);
+            int nodeCount = testCase.nodeCounts.get(dcIdx);
+            int rackCount = testCase.rackCounts.get(dcIdx);
+
+            for (int nodeIdx = 0; nodeIdx < nodeCount; nodeIdx++)
+            {
+                String rackName = "rack" + (nodeIdx % rackCount + 1);
+                Location location = new Location(dcName, rackName);
+                byte[] address = new byte[]{10, (byte) dcIdx, (byte) (nodeIdx 
/ 256), (byte) (nodeIdx % 256)};
+
+                Token token = testCase.nodeTokens.get(tokenIndex++);
+
+                addEndpoint(token, address, location);
+            }
+        }
+
+        // create satellite DCs
+        for (int satIdx = 0; satIdx < testCase.satelliteDcs.size(); satIdx++)
+        {
+            String satName = testCase.satelliteDcs.get(satIdx);
+            int satNodeCount = testCase.satNodeCounts.get(satIdx);
+            int satRackCount = testCase.satRackCounts.get(satIdx);
+
+            for (int nodeIdx = 0; nodeIdx < satNodeCount; nodeIdx++)
+            {
+                String rackName = "rack" + (nodeIdx % satRackCount + 1);
+                Location location = new Location(satName, rackName);
+                byte[] address = new byte[]{10, (byte) (100 + satIdx), (byte) 
(nodeIdx / 256), (byte) (nodeIdx % 256)};
+
+                Token token = testCase.nodeTokens.get(tokenIndex++);
+
+                addEndpoint(token, address, location);
+            }
+        }
+    }
+
+    private void addEndpoint(Token token, byte[] address, Location location) 
throws UnknownHostException
+    {
+        InetAddressAndPort addr = InetAddressAndPort.getByAddress(address);
+        ClusterMetadataTestHelper.addEndpoint(addr, token, location);
+    }
+
+    private Map<String, String> buildSrsOptions(TestCase testCase)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+        {
+            options.put(entry.getKey(), String.valueOf(entry.getValue()));
+        }
+
+        for (Map.Entry<String, Integer> entry : 
testCase.satelliteRf.entrySet())
+        {
+            String satDc = entry.getKey();
+            int rf = entry.getValue();
+            String parentDc = testCase.satelliteToParent.get(satDc);
+            options.put(parentDc + ".satellite." + satDc, rf + "/" + rf);
+        }
+
+        for (String dc : testCase.disabledDCs)
+        {
+            options.put(dc + ".disabled", "true");
+        }
+
+        options.put("primary", testCase.primaryDc);
+        return options;
+    }
+
+    private Map<String, String> buildNtsOptions(TestCase testCase)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        // full dcs only - NTS can't handle all-witness satellites
+        for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+        {
+            options.put(entry.getKey(), String.valueOf(entry.getValue()));
+        }
+
+        return options;
+    }
+
+    private Map<String, String> buildNtsOptionsWithSatellites(TestCase 
testCase)
+    {
+        Map<String, String> options = new HashMap<>();
+
+        // Full DCs
+        for (Map.Entry<String, Integer> entry : testCase.fullDcRf.entrySet())
+        {
+            options.put(entry.getKey(), String.valueOf(entry.getValue()));
+        }
+
+        // Satellites as normal full replicas
+        for (Map.Entry<String, Integer> entry : 
testCase.satelliteRf.entrySet())
+        {
+            options.put(entry.getKey(), String.valueOf(entry.getValue()));
+        }
+
+        return options;
+    }
+
+    private EndpointsForRange filterByDatacenters(EndpointsForRange replicas, 
List<String> datacenters)
+    {
+        Range<Token> range = replicas.range();
+        List<Replica> filtered = new ArrayList<>();
+
+        for (Replica replica : replicas)
+        {
+            Location location = ClusterMetadata.current()
+                .directory
+                
.location(ClusterMetadata.current().directory.peerId(replica.endpoint()));
+
+            if (datacenters.contains(location.datacenter))
+            {
+                filtered.add(replica);
+            }
+        }
+
+        if (filtered.isEmpty())
+            return EndpointsForRange.empty(range);
+
+        return EndpointsForRange.copyOf(filtered);
+    }
+
+    private static void assertReplicaSetsEqual(ReplicaCollection<?> actual, 
ReplicaCollection<?> expected, String context)
+    {
+        if (actual.size() != expected.size())
+        {
+            throw new AssertionError(String.format("%s: Replica count mismatch 
- expected %d, got %d\nExpected: %s\nActual: %s",
+                context, expected.size(), actual.size(), expected, actual));
+        }
+
+        Set<Replica> actualSet = new HashSet<>();
+        Set<Replica> expectedSet = new HashSet<>();
+        actual.forEach(actualSet::add);
+        expected.forEach(expectedSet::add);
+
+        if (!actualSet.equals(expectedSet))
+        {
+            Set<Replica> missing = new HashSet<>(expectedSet);
+            missing.removeAll(actualSet);
+            Set<Replica> extra = new HashSet<>(actualSet);
+            extra.removeAll(expectedSet);
+
+            throw new AssertionError(String.format("%s: Replica sets don't 
match\nMissing: %s\nExtra: %s",
+                context, missing, extra));
+        }
+    }
+
+    @Test
+    public void testSrsEquivalenceToNts() throws Exception
+    {
+        qt().withShrinkCycles(0)
+            .withExamples(100)
+            .forAll(TestCase.gen())
+            .checkAssert(testCase -> {
+                try
+                {
+                    setupCluster(testCase);
+
+                    // create strategies
+                    Map<String, String> srsOptions = buildSrsOptions(testCase);
+                    SatelliteReplicationStrategy srs = new 
SatelliteReplicationStrategy(KEYSPACE,
+                                                                               
         srsOptions,
+                                                                               
         ReplicationType.tracked);
+
+                    // NTS without satellite options to confirm that SRS full 
datacenter replica selection (even with satellites) matches
+                    // the NTS replica selection without satellites. This is 
the migration use case where the satellites are added to the
+                    // replication strategy
+                    Map<String, String> ntsOptions = buildNtsOptions(testCase);
+                    NetworkTopologyStrategy nts = new 
NetworkTopologyStrategy(KEYSPACE,
+                                                                              
ntsOptions,
+                                                                              
ReplicationType.untracked);
+
+                    // Create NTS with satellites as full replicas to verify 
that the replica selection is the same as NTS
+                    Map<String, String> ntsWithSatellitesOptions = 
buildNtsOptionsWithSatellites(testCase);
+                    NetworkTopologyStrategy ntsWithSatellites = new 
NetworkTopologyStrategy(KEYSPACE,
+                                                                               
             ntsWithSatellitesOptions,
+                                                                               
             ReplicationType.untracked);
+
+                    ReplicaPlacementModel model = new 
ReplicaPlacementModel(testCase.fullDcRf, testCase.satelliteRf);
+
+                    for (Token queryToken : testCase.queryTokens)
+                    {
+                        EndpointsForRange srsReplicas = 
srs.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+                        EndpointsForRange ntsReplicas = 
nts.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+                        EndpointsForRange ntsWithSatellitesReplicas = 
ntsWithSatellites.calculateNaturalReplicas(queryToken, 
ClusterMetadata.current());
+                        EndpointsForRange modelReplicas = 
model.calculateNaturalReplicas(queryToken, ClusterMetadata.current());
+
+                        // SRS full DC replicas == NTS replicas
+                        EndpointsForRange srsFullDcReplicas = 
filterByDatacenters(srsReplicas, testCase.fullDcs);
+                        assertReplicaSetsEqual(srsFullDcReplicas, ntsReplicas, 
"SRS full DC vs NTS");
+
+                        // Model full DC replicas == NTS replicas
+                        EndpointsForRange modelFullDcReplicas = 
filterByDatacenters(modelReplicas, testCase.fullDcs);
+                        assertReplicaSetsEqual(modelFullDcReplicas, 
ntsReplicas, "Model full DC vs NTS");
+
+                        // SRS replicas == Model replicas
+                        assertReplicaSetsEqual(srsReplicas, modelReplicas, 
"SRS vs Model");
+
+                        // SRS satellite node selection == NTS satellite node 
selection
+                        if (!testCase.satelliteDcs.isEmpty())
+                        {
+                            EndpointsForRange srsSatelliteReplicas = 
filterByDatacenters(srsReplicas, testCase.satelliteDcs);
+                            EndpointsForRange ntsSatelliteReplicas = 
filterByDatacenters(ntsWithSatellitesReplicas, testCase.satelliteDcs);
+
+                            // endpoints only - isTransient will be different
+                            Set<InetAddressAndPort> srsEndpoints = new 
HashSet<>();
+                            Set<InetAddressAndPort> ntsEndpoints = new 
HashSet<>();
+                            srsSatelliteReplicas.forEach(r -> 
srsEndpoints.add(r.endpoint()));
+                            ntsSatelliteReplicas.forEach(r -> 
ntsEndpoints.add(r.endpoint()));
+
+                            if (!srsEndpoints.equals(ntsEndpoints))
+                            {
+                                throw new AssertionError(String.format(
+                                    "Satellite node selection differs between 
SRS and NTS\nSRS endpoints: %s\nNTS endpoints: %s",
+                                    srsEndpoints, ntsEndpoints));
+                            }
+                        }
+
+                        // check all satellite replicas are transient
+                        for (Replica replica : srsReplicas)
+                        {
+                            Location location = ClusterMetadata.current()
+                                .directory
+                                
.location(ClusterMetadata.current().directory.peerId(replica.endpoint()));
+
+                            if 
(testCase.satelliteDcs.contains(location.datacenter))
+                            {
+                                if (!replica.isTransient())
+                                {
+                                    throw new AssertionError("Satellite 
replica must be transient: " + replica);
+                                }
+                            }
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    throw new RuntimeException("Test failed", e);
+                }
+            });
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java
new file mode 100644
index 0000000000..fc2a81e2e1
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/locator/SatelliteReplicationStrategyTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.locator;
+
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.cassandra.CassandraTestBase;
+import org.apache.cassandra.CassandraTestBase.UseMurmur3Partitioner;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+
+import static org.apache.cassandra.CassandraTestBase.DisableMBeanRegistration;
+import static org.apache.cassandra.CassandraTestBase.PrepareServerNoRegister;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@PrepareServerNoRegister
+@DisableMBeanRegistration
+@UseMurmur3Partitioner
+public class SatelliteReplicationStrategyTest extends CassandraTestBase
+{
+    private static final String KEYSPACE = "test";
+
+    @After
+    public void teardown()
+    {
+        ServerTestUtils.resetCMS();
+    }
+
+    private void addToken(long token, String address, Location location) 
throws UnknownHostException
+    {
+        InetAddressAndPort addr = InetAddressAndPort.getByName(address);
+        ClusterMetadataTestHelper.addEndpoint(addr, new LongToken(token), 
location);
+    }
+
+    private void setupDCs() throws UnknownHostException
+    {
+        Location dc1 = new Location("dc1", "rack1");
+        Location dc2 = new Location("dc2", "rack1");
+        Location sat1 = new Location("sat1", "rack1");
+        Location sat2 = new Location("sat2", "rack1");
+
+        // DC1
+        addToken(100, "10.0.0.10", dc1);
+        addToken(200, "10.0.0.11", dc1);
+        addToken(300, "10.0.0.12", dc1);
+
+        // DC2
+        addToken(400, "10.1.0.10", dc2);
+        addToken(500, "10.1.0.11", dc2);
+        addToken(600, "10.1.0.12", dc2);
+
+        // SAT1
+        addToken(700, "10.2.0.10", sat1);
+        addToken(800, "10.2.0.11", sat1);
+
+        // SAT2
+        addToken(900, "10.3.0.10", sat2);
+        addToken(1000, "10.3.0.11", sat2);
+    }
+
+    private static SatelliteReplicationStrategy getSRS(String keyspace)
+    {
+        KeyspaceMetadata ksm = 
ClusterMetadata.current().schema.getKeyspaces().getNullable(keyspace);
+        return (SatelliteReplicationStrategy) ksm.replicationStrategy;
+    }
+
+    @Test
+    public void testValidSingleDCWithSatellite() throws Exception
+    {
+        setupDCs();
+
+        String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+                     "'class': 'SatelliteReplicationStrategy', " +
+                     "'dc1': '3', " +
+                     "'dc1.satellite.sat1': '2/2', " +
+                     "'primary': 'dc1'" +
+                     "} AND replication_type = 'tracked'";
+
+        ClusterMetadataTestHelper.createKeyspace(cql);
+
+        SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+        assertEquals("dc1", strategy.getPrimaryDC());
+        assertEquals(1, strategy.getDatacenters().size());
+        assertTrue(strategy.getDatacenters().contains("dc1"));
+        assertEquals(1, strategy.getSatellites().size());
+        assertTrue(strategy.getSatellites().containsKey("sat1"));
+    }
+
+    @Test
+    public void testValidMultipleDCsWithSatellites() throws Exception
+    {
+        setupDCs();
+
+        String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+                     "'class': 'SatelliteReplicationStrategy', " +
+                     "'dc1': '3', " +
+                     "'dc1.satellite.sat1': '2/2', " +
+                     "'dc2': '3', " +
+                     "'dc2.satellite.sat2': '2/2', " +
+                     "'primary': 'dc1'" +
+                     "} AND replication_type = 'tracked'";
+
+        ClusterMetadataTestHelper.createKeyspace(cql);
+
+        SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+        assertEquals("dc1", strategy.getPrimaryDC());
+        assertEquals(2, strategy.getDatacenters().size());
+        assertEquals(2, strategy.getSatellites().size());
+    }
+
+    private void testConfigurationException(Map<String, String> options, 
String messageContains) throws UnknownHostException
+    {
+        setupDCs();
+
+        try
+        {
+            new SatelliteReplicationStrategy(KEYSPACE, options, 
ReplicationType.tracked);
+            fail("ConfigurationException expected");
+        }
+        catch (ConfigurationException e)
+        {
+            assertTrue(e.getMessage().contains(messageContains));
+        }
+    }
+
+    @Test
+    public void testMissingPrimaryFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+
+        testConfigurationException(options, "'primary' option is required");
+    }
+
+    @Test
+    public void testPrimaryNotInFullDCsFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("primary", "dc2");
+
+        testConfigurationException(options, "Primary datacenter 'dc2' must be 
defined");
+    }
+
+    @Test
+    public void testUntrackedReplicationFails() throws Exception
+    {
+        setupDCs();
+
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("primary", "dc1");
+
+        SatelliteReplicationStrategy strategy = new 
SatelliteReplicationStrategy(
+            KEYSPACE, options, ReplicationType.untracked);
+
+        try
+        {
+            strategy.validateExpectedOptions(ClusterMetadata.current());
+            fail("ConfigurationException expected");
+        }
+        catch (ConfigurationException e)
+        {
+            assertTrue(e.getMessage().contains("requires tracked 
replication"));
+        }
+    }
+
+    @Test
+    public void testDotsInDCNamesFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc.1", "3");
+        options.put("primary", "dc.1");
+
+        testConfigurationException(options, "cannot contain dots");
+    }
+
+    @Test
+    public void testOrphanedSatelliteFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc2.satellite.sat1", "2/2");
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "references non-existent full 
datacenter 'dc2'");
+    }
+
+    @Test
+    public void testSatellitePartialWitnessFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc1.satellite.sat1", "3/1");
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "must all be witnesses");
+    }
+
+    @Test
+    public void testSatelliteRequiresWitnessFormat() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc1.satellite.sat1", "3"); // Missing witness format
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "witness replicas using format");
+    }
+
+    @Test
+    public void testReplicaCalculationWithSatellites() throws Exception
+    {
+        setupDCs();
+
+        String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+                     "'class': 'SatelliteReplicationStrategy', " +
+                     "'dc1': '3', " +
+                     "'dc1.satellite.sat1': '2/2', " +
+                     "'primary': 'dc1'" +
+                     "} AND replication_type = 'tracked'";
+
+        ClusterMetadataTestHelper.createKeyspace(cql);
+
+        SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+        EndpointsForRange replicas = strategy.calculateNaturalReplicas(
+            new LongToken(150), ClusterMetadata.current());
+
+        // Should have 3 full replicas from dc1 + 2 satellite replicas from 
sat1
+        assertEquals(5, replicas.size());
+
+        int fullCount = 0;
+        int witnessCount = 0;
+        for (Replica replica : replicas)
+        {
+            if (replica.isFull())
+                fullCount++;
+            else
+                witnessCount++;
+        }
+
+        assertEquals(3, fullCount);
+        assertEquals(2, witnessCount);
+    }
+
+    @Test
+    public void testDisableNonPrimaryDC() throws Exception
+    {
+        setupDCs();
+
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc2", "3");
+        options.put("dc2.disabled", "true");
+        options.put("primary", "dc1");
+
+        SatelliteReplicationStrategy strategy =
+            new SatelliteReplicationStrategy(KEYSPACE, options, 
ReplicationType.tracked);
+
+        assertTrue(strategy.isDisabled("dc2"));
+        assertFalse(strategy.isDisabled("dc1"));
+        assertEquals(1, strategy.getDisabledDatacenters().size());
+        assertTrue(strategy.getDisabledDatacenters().contains("dc2"));
+
+        // getDatacenters returns all DCs including disabled (config view)
+        assertEquals(2, strategy.getDatacenters().size());
+        assertTrue(strategy.getDatacenters().contains("dc2"));
+
+        // getReplicationFactor(dc) returns configured RF even for disabled DCs
+        assertEquals(3, strategy.getReplicationFactor("dc2").allReplicas);
+
+        // aggregate RF includes all DCs (disabled does not affect placement)
+        assertEquals(6, strategy.getReplicationFactor().allReplicas);
+    }
+
+    @Test
+    public void testDisablePrimaryDCFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc2", "3");
+        options.put("dc1.disabled", "true");
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "Primary datacenter 'dc1' cannot 
be disabled");
+    }
+
+    @Test
+    public void testDisableNonExistentDCFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc99.disabled", "true");
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "not defined as a full 
datacenter");
+    }
+
+    @Test
+    public void testDisabledDCSatelliteStillGetsReplicas() throws Exception
+    {
+        setupDCs();
+
+        String cql = "CREATE KEYSPACE " + KEYSPACE + " WITH replication = {" +
+                     "'class': 'SatelliteReplicationStrategy', " +
+                     "'dc1': '3', " +
+                     "'dc1.satellite.sat1': '2/2', " +
+                     "'dc2': '3', " +
+                     "'dc2.satellite.sat2': '2/2', " +
+                     "'dc2.disabled': 'true', " +
+                     "'primary': 'dc1'" +
+                     "} AND replication_type = 'tracked'";
+
+        ClusterMetadataTestHelper.createKeyspace(cql);
+
+        SatelliteReplicationStrategy strategy = getSRS(KEYSPACE);
+
+        EndpointsForRange replicas = strategy.calculateNaturalReplicas(
+            new LongToken(150), ClusterMetadata.current());
+
+        // Disabled does not affect placement — all DCs and satellites still 
get replicas
+        // 3 full from dc1 + 3 full from dc2 + 2 witness from sat1 + 2 witness 
from sat2
+        assertEquals(10, replicas.size());
+    }
+
+    @Test
+    public void testDisabledInvalidValueFails() throws Exception
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put("dc1", "3");
+        options.put("dc2", "3");
+        options.put("dc2.disabled", "maybe");
+        options.put("primary", "dc1");
+
+        testConfigurationException(options, "expected 'true' or 'false'");
+    }
+
+    @Test
+    public void testHasSameSettingsWithDisabled() throws Exception
+    {
+        setupDCs();
+
+        Map<String, String> optionsA = new HashMap<>();
+        optionsA.put("dc1", "3");
+        optionsA.put("dc2", "3");
+        optionsA.put("dc2.disabled", "true");
+        optionsA.put("primary", "dc1");
+
+        Map<String, String> optionsB = new HashMap<>();
+        optionsB.put("dc1", "3");
+        optionsB.put("dc2", "3");
+        optionsB.put("dc2.disabled", "true");
+        optionsB.put("primary", "dc1");
+
+        Map<String, String> optionsC = new HashMap<>();
+        optionsC.put("dc1", "3");
+        optionsC.put("dc2", "3");
+        optionsC.put("primary", "dc1");
+
+        SatelliteReplicationStrategy strategyA =
+            new SatelliteReplicationStrategy(KEYSPACE, optionsA, 
ReplicationType.tracked);
+        SatelliteReplicationStrategy strategyB =
+            new SatelliteReplicationStrategy(KEYSPACE, optionsB, 
ReplicationType.tracked);
+        SatelliteReplicationStrategy strategyC =
+            new SatelliteReplicationStrategy(KEYSPACE, optionsC, 
ReplicationType.tracked);
+
+        assertTrue(strategyA.hasSameSettings(strategyB));
+        assertFalse(strategyA.hasSameSettings(strategyC));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to