This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c484fc511a Add tooling to repair system peers tables if inconsistent
with cluster metadata
c484fc511a is described below
commit c484fc511accda38bf3fcf6906cb58b39ab4503f
Author: Minal Kyada <[email protected]>
AuthorDate: Mon Feb 23 16:02:57 2026 -0800
Add tooling to repair system peers tables if inconsistent with cluster
metadata
Patch by Minal Kyada; reviewed by Alex Petrov and Sam Tunnicliffe for
CASSANDRA-21187
---
CHANGES.txt | 1 +
.../config/CassandraRelevantProperties.java | 1 +
.../apache/cassandra/db/SystemPeersValidator.java | 185 ++++++++++++
.../apache/cassandra/service/StorageService.java | 10 +
.../cassandra/service/StorageServiceMBean.java | 9 +
.../cassandra/db/SystemPeersValidatorTest.java | 322 +++++++++++++++++++++
6 files changed, 528 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index 83b3bb7d67..d64329acbc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
6.0-alpha2
+ * Add tooling to repair system peers and peers_v2 if inconsistent with
cluster metadata (CASSANDRA-21187)
* Fix a removed TTLed row re-appearance in a materialized view after a cursor
compaction (CASSANDRA-21152)
* Rework ZSTD dictionary compression logic to create a trainer per training
(CASSANDRA-21209)
Merged from 5.0:
diff --git
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 9df5e2f201..8a60d44797 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -587,6 +587,7 @@ public enum CassandraRelevantProperties
SUN_STDERR_ENCODING("sun.stderr.encoding"),
SUN_STDOUT_ENCODING("sun.stdout.encoding"),
SUPERUSER_SETUP_DELAY_MS("cassandra.superuser_setup_delay_ms", "10000"),
+
SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP("cassandra.sync_system_peers_tables_at_startup",
"true"),
SYSTEM_AUTH_DEFAULT_RF("cassandra.system_auth.default_rf", "1"),
SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf",
"3"),
SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"),
diff --git a/src/java/org/apache/cassandra/db/SystemPeersValidator.java
b/src/java/org/apache/cassandra/db/SystemPeersValidator.java
new file mode 100644
index 0000000000..2f490a4bb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SystemPeersValidator.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.PeersTable;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
+import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
+
+/**
+ * Validator to ensure system.peers and system.peers_v2 tables match
ClusterMetadata.
+ * These tables are maintained for existing clients and tools which read from
them to determine
+ * topology and schema information, while Cassandra itself uses
ClusterMetadata as the source of truth.
+ *
+ * This validator detects inconsistencies and automatically repairs them by
synchronizing
+ * the peers tables with the current ClusterMetadata.
+ *
+ * The system_views.peers virtual table provides a live view on the current
ClusterMetadata
+ * and includes all members of the cluster, unlike the legacy tables which
exclude the local node.
+ */
+public final class SystemPeersValidator
+{
+ private static final Logger logger =
LoggerFactory.getLogger(SystemPeersValidator.class);
+ private static final String SELECT_PEERS_V2_QUERY = String.format("SELECT
* FROM %s.%s",
+
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
+ private static final String SELECT_PEERS_QUERY = String.format("SELECT *
FROM %s.%s",
+
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
+ private static final String DELETE_PEERS_V2_QUERY = String.format("DELETE
FROM %s.%s WHERE peer = ? AND peer_port = ?",
+
SchemaConstants.SYSTEM_KEYSPACE_NAME, PEERS_V2);
+ private static final String DELETE_PEERS_QUERY = String.format("DELETE
FROM %s.%s WHERE peer = ?",
+
SchemaConstants.SYSTEM_KEYSPACE_NAME, LEGACY_PEERS);
+
+ public static void validateAndRepair(ClusterMetadata metadata)
+ {
+ Map<InetAddressAndPort, UntypedResultSet.Row> peersV2Rows =
getPeersV2Rows();
+ Map<InetAddress, UntypedResultSet.Row> peersRows = getPeersRows();
+
+ Map<InetAddressAndPort, NodeId> knownEndpoints = new HashMap<>();
+ Set<InetAddress> knownAddresses = new HashSet<>();
+ InetAddressAndPort localEndpoint =
FBUtilities.getBroadcastAddressAndPort();
+ for (InetAddressAndPort endpoint :
metadata.directory.allJoinedEndpoints())
+ {
+ if (endpoint.equals(localEndpoint))
+ continue;
+ knownEndpoints.put(endpoint, metadata.directory.peerId(endpoint));
+ knownAddresses.add(endpoint.getAddress());
+ }
+
+ for (InetAddressAndPort endpoint : peersV2Rows.keySet())
+ {
+ if (!knownEndpoints.containsKey(endpoint))
+ {
+ logger.info("Removing stale peer {} from system.{}", endpoint,
PEERS_V2);
+ executeInternal(DELETE_PEERS_V2_QUERY, endpoint.getAddress(),
endpoint.getPort());
+ }
+ }
+
+ for (InetAddress address : peersRows.keySet())
+ {
+ if (!knownAddresses.contains(address))
+ {
+ logger.info("Removing stale peer {} from system.{}", address,
LEGACY_PEERS);
+ executeInternal(DELETE_PEERS_QUERY, address);
+ }
+ }
+
+ for (Map.Entry<InetAddressAndPort, NodeId> entry :
knownEndpoints.entrySet())
+ {
+ NodeId nodeId = entry.getValue();
+ InetAddressAndPort endpoint = entry.getKey();
+ UntypedResultSet.Row peersV2Row = peersV2Rows.get(endpoint);
+ UntypedResultSet.Row peersRow =
peersRows.get(endpoint.getAddress());
+
+ boolean peersV2NeedsRepair = needsRepair(peersV2Row, endpoint,
PEERS_V2, nodeId, metadata);
+ boolean peersNeedsRepair = needsRepair(peersRow, endpoint,
LEGACY_PEERS, nodeId, metadata);
+
+ if (peersV2NeedsRepair || peersNeedsRepair)
+ PeersTable.updateLegacyPeerTable(nodeId, metadata, metadata);
+ }
+ }
+
+ private static boolean needsRepair(UntypedResultSet.Row row,
InetAddressAndPort endpoint,
+ String table, NodeId nodeId,
ClusterMetadata metadata)
+ {
+ if (row == null)
+ {
+ logger.info("Adding missing peer {} to system.{}", endpoint,
table);
+ return true;
+ }
+ boolean isEquivalent = PEERS_V2.equals(table)
+ ? peersV2RowIsEquivalent(row, nodeId, metadata)
+ : peersRowIsEquivalent(row, nodeId, metadata);
+ if (!isEquivalent)
+ {
+ logger.info("Repairing mismatched peer {} in system.{}: {}",
endpoint, table, row);
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean peersV2RowIsEquivalent(UntypedResultSet.Row row,
NodeId nodeId, ClusterMetadata metadata)
+ {
+ NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+ return commonColumnsAreEquivalent(row, nodeId, addresses, metadata)
+ && row.has("preferred_port") && row.getInt("preferred_port") ==
addresses.broadcastAddress.getPort()
+ && row.has("native_port") && row.getInt("native_port") ==
addresses.nativeAddress.getPort();
+ }
+
+ private static boolean peersRowIsEquivalent(UntypedResultSet.Row row,
NodeId nodeId, ClusterMetadata metadata)
+ {
+ NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+ return commonColumnsAreEquivalent(row, nodeId, addresses, metadata);
+ }
+
+ private static boolean commonColumnsAreEquivalent(UntypedResultSet.Row
row, NodeId nodeId,
+ NodeAddresses addresses,
ClusterMetadata metadata)
+ {
+ Location location = metadata.directory.location(nodeId);
+ // This column is differently named in the peers and peers_v2 tables
+ String nativeAddressColumn = row.has("native_address") ?
"native_address" : "rpc_address";
+
+ // Check existence first because row.getXXX can NPE if the column is
not present
+ return row.has("preferred_ip") &&
Objects.equals(row.getInetAddress("preferred_ip"),
addresses.broadcastAddress.getAddress())
+ && row.has(nativeAddressColumn) &&
Objects.equals(row.getInetAddress(nativeAddressColumn),
addresses.nativeAddress.getAddress())
+ && row.has("data_center") &&
Objects.equals(row.getString("data_center"), location.datacenter)
+ && row.has("rack") && Objects.equals(row.getString("rack"),
location.rack)
+ && row.has("host_id") && Objects.equals(row.getUUID("host_id"),
nodeId.toUUID())
+ && row.has("release_version") &&
Objects.equals(row.getString("release_version"),
metadata.directory.version(nodeId).cassandraVersion.toString())
+ && row.has("schema_version") &&
Objects.equals(row.getUUID("schema_version"), metadata.schema.getVersion())
+ && row.has("tokens") && Objects.equals(row.getSet("tokens",
UTF8Type.instance),
SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)));
+ }
+
+ private static Map<InetAddressAndPort, UntypedResultSet.Row>
getPeersV2Rows()
+ {
+ Map<InetAddressAndPort, UntypedResultSet.Row> rows = new HashMap<>();
+ for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_V2_QUERY))
+
rows.put(InetAddressAndPort.getByAddressOverrideDefaults(row.getInetAddress("peer"),
+
row.getInt("peer_port")), row);
+ return rows;
+ }
+
+ private static Map<InetAddress, UntypedResultSet.Row> getPeersRows()
+ {
+ Map<InetAddress, UntypedResultSet.Row> rows = new HashMap<>();
+ for (UntypedResultSet.Row row : executeInternal(SELECT_PEERS_QUERY))
+ rows.put(row.getInetAddress("peer"), row);
+ return rows;
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 424df75b97..b7460b1be5 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -103,6 +103,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SizeEstimatesRecorder;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemPeersValidator;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
@@ -859,6 +860,9 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
RegistrationStatus.instance.onRegistration();
Startup.maybeExecuteStartupTransformation(self);
+ if
(CassandraRelevantProperties.SYNC_SYSTEM_PEERS_TABLES_AT_STARTUP.getBoolean())
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
try
{
if (joinRing)
@@ -5802,6 +5806,12 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return
Keyspace.open(keyspace).getColumnFamilyStores().stream().map(cfs ->
cfs.name).collect(Collectors.toList());
}
+ @Override
+ public void validateAndRepairPeersMetadata()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+ }
+
@Override
public List<String> mutateSSTableRepairedState(boolean repaired, boolean
preview, String keyspace, List<String> tableNames)
{
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 058bc8955f..b9f47b3b2d 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1410,6 +1410,15 @@ public interface StorageServiceMBean extends
NotificationEmitter
/** Gets the names of all tables for the given keyspace */
public List<String> getTablesForKeyspace(String keyspace);
+ /**
+ * Validates that system.peers and system.peers_v2 are consistent with
ClusterMetadata,
+ * inserting missing peer entries and removing stale ones. This runs
automatically on
+ * startup but can be triggered manually if a discrepancy is suspected.
+ * <p>
+ * Note: mutates data in system.peers and system.peers_v2.
+ */
+ public void validateAndRepairPeersMetadata();
+
/** Mutates the repaired state of all SSTables for the given SSTables */
public List<String> mutateSSTableRepairedState(boolean repaired, boolean
preview, String keyspace, List<String> tables);
diff --git a/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java
b/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java
new file mode 100644
index 0000000000..796de62642
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SystemPeersValidatorTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashSet;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.harry.model.TokenPlacementModel;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
+import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
+import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SystemPeersValidatorTest
+{
+ private CMSTestBase.CMSSut sut;
+ private InetAddressAndPort peerEndpoint;
+
+ @BeforeClass
+ public static void beforeClass()
+ {
+ ServerTestUtils.prepareServerNoRegister();
+ }
+
+ @Before
+ public void before() throws Exception
+ {
+ ClusterMetadataService.unsetInstance();
+ sut = new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false,
+ new
TokenPlacementModel.SimpleReplicationFactor(3));
+ ClusterMetadataTestHelper.register(2);
+ ClusterMetadataTestHelper.join(2, 2);
+ peerEndpoint =
ClusterMetadata.current().directory.endpoint(ClusterMetadataTestHelper.nodeId(2));
+ cleanupPeersTables();
+ }
+
+ @After
+ public void after()
+ {
+ if (sut != null)
+ sut.close();
+ }
+
+ @Test
+ public void testNoRepairWhenTablesAreConsistent()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertEquals("peers_v2 should have 1 peer", 1, countEntries(PEERS_V2));
+ assertEquals("peers should have 1 peer", 1,
countEntries(LEGACY_PEERS));
+
+ // Second call should be a no-op
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertEquals("peers_v2 count should not change on second call", 1,
countEntries(PEERS_V2));
+ assertEquals("peers count should not change on second call", 1,
countEntries(LEGACY_PEERS));
+ }
+
+ @Test
+ public void testStaleEntryOnlyInPeersV2IsRemoved() throws
UnknownHostException
+ {
+ InetAddressAndPort staleEndpoint =
InetAddressAndPort.getByName("127.0.0.99");
+ insertStalePeerEntry(staleEndpoint, PEERS_V2);
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertFalse("Stale entry should be removed from peers_v2",
entryExistsInPeersV2(staleEndpoint));
+ }
+
+ @Test
+ public void testStaleEntryOnlyInPeersIsRemoved() throws
UnknownHostException
+ {
+ InetAddressAndPort staleEndpoint =
InetAddressAndPort.getByName("127.0.0.99");
+ insertStalePeerEntry(staleEndpoint, LEGACY_PEERS);
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertFalse("Stale entry should be removed from peers",
entryExistsInPeers(staleEndpoint.getAddress()));
+ }
+
+ @Test
+ public void testMissingPeerIsAddedToBothTables()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+ removePeerEntry(peerEndpoint);
+
+ assertFalse(entryExistsInPeersV2(peerEndpoint));
+ assertFalse(entryExistsInPeers(peerEndpoint.getAddress()));
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertPeersV2RowMatchesMetadata(peerEndpoint);
+ assertPeersRowMatchesMetadata(peerEndpoint);
+ }
+
+ @Test
+ public void testStaleFieldInPeersV2IsRepaired()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ executeInternal(String.format("UPDATE %s.%s SET data_center =
'stale-dc' WHERE peer = ? AND peer_port = ?",
+ SYSTEM_KEYSPACE_NAME, PEERS_V2),
+ peerEndpoint.getAddress(), peerEndpoint.getPort());
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertPeersV2RowMatchesMetadata(peerEndpoint);
+ }
+
+ @Test
+ public void testStaleFieldInPeersIsRepaired()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ executeInternal(String.format("UPDATE %s.%s SET data_center =
'stale-dc' WHERE peer = ?",
+ SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+ peerEndpoint.getAddress());
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertPeersRowMatchesMetadata(peerEndpoint);
+ }
+
+ @Test
+ public void testNullColumnInPeersV2IsRepaired()
+ {
+ String[] columns = { "data_center", "host_id", "preferred_ip",
"preferred_port",
+ "rack", "release_version", "native_address",
"native_port",
+ "schema_version", "tokens" };
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ for (String column : columns)
+ {
+ executeInternal(String.format("DELETE %s FROM %s.%s WHERE peer = ?
AND peer_port = ?",
+ column, SYSTEM_KEYSPACE_NAME,
PEERS_V2),
+ peerEndpoint.getAddress(), peerEndpoint.getPort());
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertPeersV2RowMatchesMetadata(peerEndpoint);
+ }
+ }
+
+ @Test
+ public void testNullColumnInPeersIsRepaired()
+ {
+ String[] columns = { "data_center", "host_id", "preferred_ip",
+ "rack", "release_version", "rpc_address",
+ "schema_version", "tokens" };
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ for (String column : columns)
+ {
+ executeInternal(String.format("DELETE %s FROM %s.%s WHERE peer =
?",
+ column, SYSTEM_KEYSPACE_NAME,
LEGACY_PEERS),
+ peerEndpoint.getAddress());
+
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+
+ assertPeersRowMatchesMetadata(peerEndpoint);
+ }
+ }
+
+ @Test
+ public void testJmxEntryPointRepairsMissingPeer()
+ {
+ SystemPeersValidator.validateAndRepair(ClusterMetadata.current());
+ removePeerEntry(peerEndpoint);
+
+ StorageService.instance.validateAndRepairPeersMetadata();
+
+ assertTrue("JMX repair should restore peer in peers_v2",
entryExistsInPeersV2(peerEndpoint));
+ assertTrue("JMX repair should restore peer in peers",
entryExistsInPeers(peerEndpoint.getAddress()));
+ }
+
+ private void cleanupPeersTables()
+ {
+ executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME,
PEERS_V2));
+ executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME,
LEGACY_PEERS));
+ }
+
+ private int countEntries(String table)
+ {
+ UntypedResultSet result = executeInternal(String.format("SELECT
COUNT(*) FROM %s.%s",
+
SYSTEM_KEYSPACE_NAME, table));
+ return (int) result.one().getLong("count");
+ }
+
+ private boolean entryExistsInPeersV2(InetAddressAndPort endpoint)
+ {
+ UntypedResultSet result = executeInternal(
+ String.format("SELECT peer FROM %s.%s WHERE peer = ? AND peer_port
= ?",
+ SYSTEM_KEYSPACE_NAME, PEERS_V2),
+ endpoint.getAddress(), endpoint.getPort());
+ return !result.isEmpty();
+ }
+
+ private boolean entryExistsInPeers(InetAddress address)
+ {
+ UntypedResultSet result = executeInternal(
+ String.format("SELECT peer FROM %s.%s WHERE peer = ?",
+ SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+ address);
+ return !result.isEmpty();
+ }
+
+ private void assertPeersV2RowMatchesMetadata(InetAddressAndPort endpoint)
+ {
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId nodeId = ClusterMetadataTestHelper.nodeId(2);
+ NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+ Location location = metadata.directory.location(nodeId);
+
+ UntypedResultSet result = executeInternal(
+ String.format("SELECT * FROM %s.%s WHERE peer = ? AND peer_port =
?",
+ SYSTEM_KEYSPACE_NAME, PEERS_V2),
+ endpoint.getAddress(), endpoint.getPort());
+ UntypedResultSet.Row row = result.one();
+
+ assertEquals(addresses.broadcastAddress.getAddress(),
row.getInetAddress("preferred_ip"));
+ assertEquals(addresses.broadcastAddress.getPort(),
row.getInt("preferred_port"));
+ assertEquals(addresses.nativeAddress.getAddress(),
row.getInetAddress("native_address"));
+ assertEquals(addresses.nativeAddress.getPort(),
row.getInt("native_port"));
+ assertEquals(location.datacenter, row.getString("data_center"));
+ assertEquals(location.rack, row.getString("rack"));
+ assertEquals(nodeId.toUUID(), row.getUUID("host_id"));
+
assertEquals(metadata.directory.version(nodeId).cassandraVersion.toString(),
row.getString("release_version"));
+ assertEquals(metadata.schema.getVersion(),
row.getUUID("schema_version"));
+
assertEquals(SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)),
row.getSet("tokens", UTF8Type.instance));
+ }
+
+ private void assertPeersRowMatchesMetadata(InetAddressAndPort endpoint)
+ {
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId nodeId = ClusterMetadataTestHelper.nodeId(2);
+ NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+ Location location = metadata.directory.location(nodeId);
+
+ UntypedResultSet result = executeInternal(
+ String.format("SELECT * FROM %s.%s WHERE peer = ?",
+ SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+ endpoint.getAddress());
+ UntypedResultSet.Row row = result.one();
+
+ assertEquals(addresses.broadcastAddress.getAddress(),
row.getInetAddress("preferred_ip"));
+ assertEquals(addresses.nativeAddress.getAddress(),
row.getInetAddress("rpc_address"));
+ assertEquals(location.datacenter, row.getString("data_center"));
+ assertEquals(location.rack, row.getString("rack"));
+ assertEquals(nodeId.toUUID(), row.getUUID("host_id"));
+
assertEquals(metadata.directory.version(nodeId).cassandraVersion.toString(),
row.getString("release_version"));
+ assertEquals(metadata.schema.getVersion(),
row.getUUID("schema_version"));
+
assertEquals(SystemKeyspace.tokensAsSet(metadata.tokenMap.tokens(nodeId)),
row.getSet("tokens", UTF8Type.instance));
+ }
+
+ private void insertStalePeerEntry(InetAddressAndPort endpoint, String
table)
+ {
+ if (table.equals(PEERS_V2))
+ executeInternal(
+ String.format("INSERT INTO %s.%s (peer, peer_port,
data_center, host_id, rack, release_version, tokens) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?)",
+ SYSTEM_KEYSPACE_NAME, table),
+ endpoint.getAddress(), endpoint.getPort(), "dc1",
UUID.randomUUID(), "rack1", "5.0.0", new HashSet<String>());
+ else
+ executeInternal(
+ String.format("INSERT INTO %s.%s (peer, data_center, host_id,
rack, release_version, tokens) " +
+ "VALUES (?, ?, ?, ?, ?, ?)",
+ SYSTEM_KEYSPACE_NAME, table),
+ endpoint.getAddress(), "dc1", UUID.randomUUID(), "rack1",
"5.0.0", new HashSet<String>());
+ }
+
+ private void removePeerEntry(InetAddressAndPort endpoint)
+ {
+ executeInternal(
+ String.format("DELETE FROM %s.%s WHERE peer = ? AND peer_port =
?", SYSTEM_KEYSPACE_NAME, PEERS_V2),
+ endpoint.getAddress(), endpoint.getPort());
+
+ executeInternal(
+ String.format("DELETE FROM %s.%s WHERE peer = ?",
SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+ endpoint.getAddress());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]