This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c484fc511a Add tooling to repair system peers tables if inconsistent 
with cluster metadata
c484fc511a is described below

commit c484fc511accda38bf3fcf6906cb58b39ab4503f
Author: Minal Kyada <[email protected]>
AuthorDate: Mon Feb 23 16:02:57 2026 -0800

    Add tooling to repair system peers tables if inconsistent with cluster 
metadata
    
    Patch by Minal Kyada; reviewed by Alex Petrov and Sam Tunnicliffe for
    CASSANDRA-21187
---
 CHANGES.txt                                        |   1 +
 .../config/CassandraRelevantProperties.java        |   1 +
 .../apache/cassandra/db/SystemPeersValidator.java  | 185 ++++++++++++
 .../apache/cassandra/service/StorageService.java   |  10 +
 .../cassandra/service/StorageServiceMBean.java     |   9 +
 .../cassandra/db/SystemPeersValidatorTest.java     | 322 +++++++++++++++++++++
 6 files changed, 528 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 83b3bb7d67..d64329acbc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 6.0-alpha2
+ * Add tooling to repair system peers and peers_v2 if inconsistent with 
cluster metadata (CASSANDRA-21187) 
  * Fix a removed TTLed row re-appearance in a materialized view after a cursor 
compaction (CASSANDRA-21152)
  * Rework ZSTD dictionary compression logic to create a trainer per training 
(CASSANDRA-21209)
 Merged from 5.0:
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 9df5e2f201..8a60d44797 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -587,6 +587,7 @@ public enum CassandraRelevantProperties
     SUN_STDERR_ENCODING("sun.stderr.encoding"),
     SUN_STDOUT_ENCODING("sun.stdout.encoding"),
     SUPERUSER_SETUP_DELAY_MS("cassandra.superuser_setup_delay_ms", "10000"),
+    
SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP("cassandra.sync_system_peers_tables_at_startup",
 "true"),
     SYSTEM_AUTH_DEFAULT_RF("cassandra.system_auth.default_rf", "1"),
     SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf", 
"3"),
     SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"),
diff --git a/src/java/org/apache/cassandra/db/SystemPeersValidator.java 
b/src/java/org/apache/cassandra/db/SystemPeersValidator.java
new file mode 100644
index 0000000000..2f490a4bb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SystemPeersValidator.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.PeersTable;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
+import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
+
+/**
+ * Validator to ensure system.peers and system.peers_v2 tables match 
ClusterMetadata.
+ * These tables are maintained for existing clients and tools which read from 
them to determine
+ * topology and schema information, while Cassandra itself uses 
ClusterMetadata as the source of truth.
+ *
+ * This validator detects inconsistencies and automatically repairs them by 
synchronizing
+ * the peers tables with the current ClusterMetadata.
+ *
+ * The system_views.peers virtual table provides a live view on the current 
ClusterMetadata
+ * and includes all members of the cluster, unlike the legacy tables which 
exclude the local node.
+ */
+public final class SystemPeersValidator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SystemPeersValidator.class);
+    private static final String SELECT_PEERS_V2_QUERY = String.format("SELECT 
* FROM %s.%s",
+                                                                      
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
+    private static final String SELECT_PEERS_QUERY = String.format("SELECT * 
FROM %s.%s",
+                                                                    
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
+    private static final String DELETE_PEERS_V2_QUERY = String.format("DELETE 
FROM %s.%s WHERE peer = ? AND peer_port = ?",
+                                                                     
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
+    private static final String DELETE_PEERS_QUERY = String.format("DELETE 
FROM %s.%s WHERE peer = ?",
+                                                                   
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
+
+    public static void validateAndRepair(ClusterMetadata metadata)
+    {
+        Map<InetAddressAndPort, UntypedResultSet.Row> peersV2Rows = 
getPeersV2Rows();
+        Map<InetAddress, UntypedResultSet.Row> peersRows = getPeersRows();
+
+        Map<InetAddressAndPort, NodeId> knownEndpoints = new HashMap<>();
+        Set<InetAddress> knownAddresses = new HashSet<>();
+        InetAddressAndPort localEndpoint = 
FBUtilities.getBroadcastAddressAndPort();
+        for (InetAddressAndPort endpoint : 
metadata.directory.allJoinedEndpoints())
+        {
+            if (endpoint.equals(localEndpoint))
+                continue;
+            knownEndpoints.put(endpoint, metadata.directory.peerId(endpoint));
+            knownAddresses.add(endpoint.getAddress());
+        }
+
+        for (InetAddressAndPort endpoint : peersV2Rows.keySet())
+        {
+            if (!knownEndpoints.containsKey(endpoint))
+            {
+                logger.info("Removing stale peer {} from system.{}", endpoint, 
PEERS_V2);
+                executeInternal(DELETE_PEERS_V2_QUERY, endpoint.getAddress(), 
endpoint.getPort());
+            }
+        }
+
+        for (InetAddress address : peersRows.keySet())
+        {
+            if (!knownAddresses.contains(address))
+            {
+                logger.info("Removing stale peer {} from system.{}", address, 
LEGACY_PEERS);
+                executeInternal(DELETE_PEERS_QUERY, address);
+            }
+        }
+
+        for (Map.Entry<InetAddressAndPort, NodeId> entry : 
knownEndpoints.entrySet())
+        {
+            NodeId nodeId = entry.getValue();
+            InetAddressAndPort endpoint = entry.getKey();
+            UntypedResultSet.Row peersV2Row = peersV2Rows.get(endpoint);
+            UntypedResultSet.Row peersRow = 
peersRows.get(endpoint.getAddress());
+
+            boolean peersV2NeedsRepair = needsRepair(peersV2Row, endpoint, 
PEERS_V2, nodeId, metadata);
+            boolean peersNeedsRepair = needsRepair(peersRow, endpoint, 
LEGACY_PEERS, nodeId, metadata);
+
+            if (peersV2NeedsRepair || peersNeedsRepair)
+                PeersTable.updateLegacyPeerTable(nodeId, metadata, metadata);
+        }
+    }
+
+    private static boolean needsRepair(UntypedResultSet.Row row, 
InetAddressAndPort endpoint,
+                                       String table, NodeId nodeId, 
ClusterMetadata metadata)
+    {
+        if (row == null)
+        {
+            logger.info("Adding missing peer {} to system.{}", endpoint, 
table);
+            return true;
+        }
+        boolean isEquivalent = PEERS_V2.equals(table)
+                               ? peersV2RowIsEquivalent(row, nodeId, metadata)
+                               : peersRowIsEquivalent(row, nodeId, metadata);
+        if (!isEquivalent)
+        {
+            logger.info("Repairing mismatched peer {} in system.{}: {}", 
endpoint, table, row);
+            return true;
+        }
+        return false;
+    }
+
+    private static boolean peersV2RowIsEquivalent(UntypedResultSet.Row row, 
NodeId nodeId, ClusterMetadata metadata)
+    {
+        NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+        return commonColumnsAreEquivalent(row, nodeId, addresses, metadata)
+               && row.has("preferred_port") && row.getInt("preferred_port") == 
addresses.broadcastAddress.getPort()
+               && row.has("native_port") && row.getInt("native_port") == 
addresses.nativeAddress.getPort();
+    }
+
+    private static boolean peersRowIsEquivalent(UntypedResultSet.Row row, 
NodeId nodeId, ClusterMetadata metadata)
+    {
+        NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+        return commonColumnsAreEquivalent(row, nodeId, addresses, metadata);
+    }
+
+    private static boolean commonColumnsAreEquivalent(UntypedResultSet.Row 
row, NodeId nodeId,
+                                                      NodeAddresses addresses, 
ClusterMetadata metadata)
+    {
+        Location location = metadata.directory.location(nodeId);
+        // This column is differently named in the peers and peers_v2 tables
+        String nativeAddressColumn = row.has("native_address") ? 
"native_address" : "rpc_address";
+
+        // Check existence first because row.getXXX can NPE if the column is 
not present
+        return row.has("preferred_ip") && 
Objects.equals(row.getInetAddress("preferred_ip"), 
addresses.broadcastAddress.getAddress())
+               && row.has(nativeAddressColumn) && 
Objects.equals(row.getInetAddress(nativeAddressColumn), 
addresses.nativeAddress.getAddress())
+               && row.has("data_center") && 
Objects.equals(row.getString("data_center"), location.datacenter)
+               && row.has("rack") && Objects.equals(row.getString("rack"), 
location.rack)
+               && row.has("host_id") && Objects.equals(row.getUUID("host_id"), 
nodeId.toUUID())
+               && row.has("release_version") && 
Objects.equals(row.getString("release_version"), 
metadata.directory.version(nodeId).cassandraVersion.toString())
+               && row.has("schema_version") && 
Objects.equals(row.getUUID("schema_version"), metadata.schema.getVersion())
+               && row.has("tokens") && Objects.equals(row.getSet("tokens", 
UTF8Type.instance), 
SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)));
+    }
+
+    private static Map<InetAddressAndPort, UntypedResultSet.Row> 
getPeersV2Rows()
+    {
+        Map<InetAddressAndPort, UntypedResultSet.Row> rows = new HashMap<>();
+        for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_V2_QUERY))
+            
rows.put(InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("peer"),
+                                                                     
row.getInt("peer_port")), row);
+        return rows;
+    }
+
+    private static Map<InetAddress, UntypedResultSet.Row> getPeersRows()
+    {
+        Map<InetAddress, UntypedResultSet.Row> rows = new HashMap<>();
+        for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_QUERY))
+            rows.put(row.getInetAddress("peer"), row);
+        return rows;
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 424df75b97..b7460b1be5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -103,6 +103,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SizeEstimatesRecorder;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemPeersValidator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -859,6 +860,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         RegistrationStatus.instance.onRegistration();
         Startup.maybeExecuteStartupTransformation(self);
 
+        if 
(CassandraRelevantProperties.SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP.getBoolean())
+            SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
         try
         {
             if (joinRing)
@@ -5802,6 +5806,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return 
Keyspace.open(keyspace).getColumnFamilyStores().stream().map(cfs -> 
cfs.name).collect(Collectors.toList());
     }
 
+    @Override
+    public void validateAndRepairPeersMetadata()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+    }
+
     @Override
     public List<String> mutateSSTableRepairedState(boolean repaired, boolean 
preview, String keyspace, List<String> tableNames)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 058bc8955f..b9f47b3b2d 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1410,6 +1410,15 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     /** Gets the names of all tables for the given keyspace */
     public List<String> getTablesForKeyspace(String keyspace);
 
+    /**
+     * Validates that system.peers and system.peers_v2 are consistent with 
ClusterMetadata,
+     * inserting missing peer entries and removing stale ones. This runs 
automatically on
+     * startup but can be triggered manually if a discrepancy is suspected.
+     * <p>
+     * Note: mutates data in system.peers and system.peers_v2.
+     */
+    public void validateAndRepairPeersMetadata();
+
     /** Mutates the repaired state of all SSTables for the given SSTables */
     public List<String> mutateSSTableRepairedState(boolean repaired, boolean 
preview, String keyspace, List<String> tables);
 
diff --git a/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java 
b/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java
new file mode 100644
index 0000000000..796de62642
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.harry.model.TokenPlacementModel;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
+import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
+import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SystemPeersValidatorTest
+{
+    private CMSTestBase.CMSSut sut;
+    private InetAddressAndPort peerEndpoint;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        ServerTestUtils.prepareServerNoRegister();
+    }
+
+    @Before
+    public void before() throws Exception
+    {
+        ClusterMetadataService.unsetInstance();
+        sut = new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false,
+                                     new 
TokenPlacementModel.SimpleReplicationFactor(3));
+        ClusterMetadataTestHelper.register(2);
+        ClusterMetadataTestHelper.join(2, 2);
+        peerEndpoint = 
ClusterMetadata.current().directory.endpoint(ClusterMetadataTestHelper.nodeId(2));
+        cleanupPeersTables();
+    }
+
+    @After
+    public void after()
+    {
+        if (sut != null)
+            sut.close();
+    }
+
+    @Test
+    public void testNoRepairWhenTablesAreConsistent()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertEquals("peers_v2 should have 1 peer", 1, countEntries(PEERS_V2));
+        assertEquals("peers should have 1 peer", 1, 
countEntries(LEGACY_PEERS));
+
+        // Second call should be a no-op
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertEquals("peers_v2 count should not change on second call", 1, 
countEntries(PEERS_V2));
+        assertEquals("peers count should not change on second call", 1, 
countEntries(LEGACY_PEERS));
+    }
+
+    @Test
+    public void testStaleEntryOnlyInPeersV2IsRemoved() throws 
UnknownHostException
+    {
+        InetAddressAndPort staleEndpoint = 
InetAddressAndPort.getByName("127.0.0.99");
+        insertStalePeerEntry(staleEndpoint, PEERS_V2);
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertFalse("Stale entry should be removed from peers_v2", 
entryExistsInPeersV2(staleEndpoint));
+    }
+
+    @Test
+    public void testStaleEntryOnlyInPeersIsRemoved() throws 
UnknownHostException
+    {
+        InetAddressAndPort staleEndpoint = 
InetAddressAndPort.getByName("127.0.0.99");
+        insertStalePeerEntry(staleEndpoint, LEGACY_PEERS);
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertFalse("Stale entry should be removed from peers", 
entryExistsInPeers(staleEndpoint.getAddress()));
+    }
+
+    @Test
+    public void testMissingPeerIsAddedToBothTables()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+        removePeerEntry(peerEndpoint);
+
+        assertFalse(entryExistsInPeersV2(peerEndpoint));
+        assertFalse(entryExistsInPeers(peerEndpoint.getAddress()));
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertPeersV2RowMatchesMetadata(peerEndpoint);
+        assertPeersRowMatchesMetadata(peerEndpoint);
+    }
+
+    @Test
+    public void testStaleFieldInPeersV2IsRepaired()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        executeInternal(String.format("UPDATE %s.%s SET data_center = 
'stale-dc' WHERE peer = ? AND peer_port = ?",
+                                      SYSTEM_KEYSPACE_NAME, PEERS_V2),
+                        peerEndpoint.getAddress(), peerEndpoint.getPort());
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertPeersV2RowMatchesMetadata(peerEndpoint);
+    }
+
+    @Test
+    public void testStaleFieldInPeersIsRepaired()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        executeInternal(String.format("UPDATE %s.%s SET data_center = 
'stale-dc' WHERE peer = ?",
+                                      SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+                        peerEndpoint.getAddress());
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        assertPeersRowMatchesMetadata(peerEndpoint);
+    }
+
+    @Test
+    public void testNullColumnInPeersV2IsRepaired()
+    {
+        String[] columns = { "data_center", "host_id", "preferred_ip", 
"preferred_port",
+                             "rack", "release_version", "native_address", 
"native_port",
+                             "schema_version", "tokens" };
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        for (String column : columns)
+        {
+            executeInternal(String.format("DELETE %s FROM %s.%s WHERE peer = ? 
AND peer_port = ?",
+                                          column, SYSTEM_KEYSPACE_NAME, 
PEERS_V2),
+                            peerEndpoint.getAddress(), peerEndpoint.getPort());
+
+            SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+            assertPeersV2RowMatchesMetadata(peerEndpoint);
+        }
+    }
+
+    @Test
+    public void testNullColumnInPeersIsRepaired()
+    {
+        String[] columns = { "data_center", "host_id", "preferred_ip",
+                             "rack", "release_version", "rpc_address",
+                             "schema_version", "tokens" };
+
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+        for (String column : columns)
+        {
+            executeInternal(String.format("DELETE %s FROM %s.%s WHERE peer = 
?",
+                                          column, SYSTEM_KEYSPACE_NAME, 
LEGACY_PEERS),
+                            peerEndpoint.getAddress());
+
+            SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+            assertPeersRowMatchesMetadata(peerEndpoint);
+        }
+    }
+
+    @Test
+    public void testJmxEntryPointRepairsMissingPeer()
+    {
+        SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+        removePeerEntry(peerEndpoint);
+
+        StorageService.instance.validateAndRepairPeersMetadata();
+
+        assertTrue("JMX repair should restore peer in peers_v2", 
entryExistsInPeersV2(peerEndpoint));
+        assertTrue("JMX repair should restore peer in peers", 
entryExistsInPeers(peerEndpoint.getAddress()));
+    }
+
+    private void cleanupPeersTables()
+    {
+        executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, 
PEERS_V2));
+        executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, 
LEGACY_PEERS));
+    }
+
+    private int countEntries(String table)
+    {
+        UntypedResultSet result = executeInternal(String.format("SELECT 
COUNT(*) FROM %s.%s",
+                                                                
SYSTEM_KEYSPACE_NAME, table));
+        return (int) result.one().getLong("count");
+    }
+
+    private boolean entryExistsInPeersV2(InetAddressAndPort endpoint)
+    {
+        UntypedResultSet result = executeInternal(
+            String.format("SELECT peer FROM %s.%s WHERE peer = ? AND peer_port 
= ?",
+                          SYSTEM_KEYSPACE_NAME, PEERS_V2),
+            endpoint.getAddress(), endpoint.getPort());
+        return !result.isEmpty();
+    }
+
+    private boolean entryExistsInPeers(InetAddress address)
+    {
+        UntypedResultSet result = executeInternal(
+            String.format("SELECT peer FROM %s.%s WHERE peer = ?",
+                          SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+            address);
+        return !result.isEmpty();
+    }
+
+    private void assertPeersV2RowMatchesMetadata(InetAddressAndPort endpoint)
+    {
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId nodeId = ClusterMetadataTestHelper.nodeId(2);
+        NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+        Location location = metadata.directory.location(nodeId);
+
+        UntypedResultSet result = executeInternal(
+            String.format("SELECT * FROM %s.%s WHERE peer = ? AND peer_port = 
?",
+                          SYSTEM_KEYSPACE_NAME, PEERS_V2),
+            endpoint.getAddress(), endpoint.getPort());
+        UntypedResultSet.Row row = result.one();
+
+        assertEquals(addresses.broadcastAddress.getAddress(), 
row.getInetAddress("preferred_ip"));
+        assertEquals(addresses.broadcastAddress.getPort(), 
row.getInt("preferred_port"));
+        assertEquals(addresses.nativeAddress.getAddress(), 
row.getInetAddress("native_address"));
+        assertEquals(addresses.nativeAddress.getPort(), 
row.getInt("native_port"));
+        assertEquals(location.datacenter, row.getString("data_center"));
+        assertEquals(location.rack, row.getString("rack"));
+        assertEquals(nodeId.toUUID(), row.getUUID("host_id"));
+        
assertEquals(metadata.directory.version(nodeId).cassandraVersion.toString(), 
row.getString("release_version"));
+        assertEquals(metadata.schema.getVersion(), 
row.getUUID("schema_version"));
+        
assertEquals(SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)), 
row.getSet("tokens", UTF8Type.instance));
+    }
+
+    private void assertPeersRowMatchesMetadata(InetAddressAndPort endpoint)
+    {
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId nodeId = ClusterMetadataTestHelper.nodeId(2);
+        NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+        Location location = metadata.directory.location(nodeId);
+
+        UntypedResultSet result = executeInternal(
+            String.format("SELECT * FROM %s.%s WHERE peer = ?",
+                          SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+            endpoint.getAddress());
+        UntypedResultSet.Row row = result.one();
+
+        assertEquals(addresses.broadcastAddress.getAddress(), 
row.getInetAddress("preferred_ip"));
+        assertEquals(addresses.nativeAddress.getAddress(), 
row.getInetAddress("rpc_address"));
+        assertEquals(location.datacenter, row.getString("data_center"));
+        assertEquals(location.rack, row.getString("rack"));
+        assertEquals(nodeId.toUUID(), row.getUUID("host_id"));
+        
assertEquals(metadata.directory.version(nodeId).cassandraVersion.toString(), 
row.getString("release_version"));
+        assertEquals(metadata.schema.getVersion(), 
row.getUUID("schema_version"));
+        
assertEquals(SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)), 
row.getSet("tokens", UTF8Type.instance));
+    }
+
+    private void insertStalePeerEntry(InetAddressAndPort endpoint, String 
table)
+    {
+        if (table.equals(PEERS_V2))
+            executeInternal(
+                String.format("INSERT INTO %s.%s (peer, peer_port, 
data_center, host_id, rack, release_version, tokens) " +
+                              "VALUES (?, ?, ?, ?, ?, ?, ?)",
+                              SYSTEM_KEYSPACE_NAME, table),
+                endpoint.getAddress(), endpoint.getPort(), "dc1", 
UUID.randomUUID(), "rack1", "5.0.0", new HashSet<String>());
+        else
+            executeInternal(
+                String.format("INSERT INTO %s.%s (peer, data_center, host_id, 
rack, release_version, tokens) " +
+                              "VALUES (?, ?, ?, ?, ?, ?)",
+                              SYSTEM_KEYSPACE_NAME, table),
+                endpoint.getAddress(), "dc1", UUID.randomUUID(), "rack1", 
"5.0.0", new HashSet<String>());
+    }
+
+    private void removePeerEntry(InetAddressAndPort endpoint)
+    {
+        executeInternal(
+            String.format("DELETE FROM %s.%s WHERE peer = ? AND peer_port = 
?", SYSTEM_KEYSPACE_NAME, PEERS_V2),
+            endpoint.getAddress(), endpoint.getPort());
+
+        executeInternal(
+            String.format("DELETE FROM %s.%s WHERE peer = ?", 
SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+                endpoint.getAddress());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to