This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b31d15b9b5 Avoid failing queries when epoch changes and replica goes
up/down
b31d15b9b5 is described below
commit b31d15b9b58926676436807ddc1efdd5616e13b3
Author: Marcus Eriksson <[email protected]>
AuthorDate: Wed Mar 26 15:48:13 2025 +0100
Avoid failing queries when epoch changes and replica goes up/down
Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20489
---
CHANGES.txt | 1 +
.../apache/cassandra/locator/ReplicaLayout.java | 16 ++-
.../org/apache/cassandra/locator/ReplicaPlan.java | 38 ++++---
.../org/apache/cassandra/locator/ReplicaPlans.java | 33 +++---
.../org/apache/cassandra/service/paxos/Paxos.java | 6 ++
.../distributed/test/RepairDigestTrackingTest.java | 6 +-
.../test/tcm/FailureDetectorRecomputeTest.java | 113 +++++++++++++++++++++
.../cassandra/service/reads/DataResolverTest.java | 2 +-
.../service/reads/DigestResolverTest.java | 2 +-
.../cassandra/service/reads/ReadExecutorTest.java | 2 +-
.../reads/repair/AbstractReadRepairTest.java | 1 +
11 files changed, 172 insertions(+), 48 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 45a4f16b56..eee55c65e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Avoid failing queries when epoch changes and replica goes up/down
(CASSANDRA-20489)
* Split out truncation record lock (CASSANDRA-20480)
* Throw new IndexBuildInProgressException when queries fail during index
build, instead of IndexNotAvailableException (CASSANDRA-20402)
* Fix Paxos repair interrupts running transactions (CASSANDRA-20469)
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index f0069f2555..30a52be73a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.FBUtilities;
@@ -354,32 +353,31 @@ public abstract class ReplicaLayout<E extends
Endpoints<E>>
}
/**
- * @return the read layout for a token - this includes only live natural
replicas, i.e. those that are not pending
- * and not marked down by the failure detector. these are reverse sorted
by the badness score of the configured snitch
+ * @return the read layout for a token - this includes natural replicas,
i.e. those that are not pending.
+ * They are reverse sorted by the badness score of the configured snitch
*/
- static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(ClusterMetadata
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy,
Token token)
+ static ReplicaLayout.ForTokenRead forTokenReadSorted(ClusterMetadata
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy,
Token token)
{
EndpointsForToken replicas =
keyspace.getMetadata().params.replication.isLocal()
? forLocalStrategyToken(metadata,
replicationStrategy, token)
: forNonLocalStrategyTokenRead(metadata,
keyspace.getMetadata(), token);
+
replicas =
DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
replicas);
- replicas = replicas.filter(FailureDetector.isReplicaAlive);
+
return new ReplicaLayout.ForTokenRead(replicationStrategy, replicas);
}
/**
* TODO: we should really double check that the provided range does not
overlap multiple token ring regions
- * @return the read layout for a range - this includes only live natural
replicas, i.e. those that are not pending
- * and not marked down by the failure detector. these are reverse sorted
by the badness score of the configured snitch
+ * @return the read layout for a range - these are reverse sorted by the
badness score of the configured snitch
*/
- static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(ClusterMetadata
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy,
AbstractBounds<PartitionPosition> range)
+ static ReplicaLayout.ForRangeRead forRangeReadSorted(ClusterMetadata
metadata, Keyspace keyspace, AbstractReplicationStrategy replicationStrategy,
AbstractBounds<PartitionPosition> range)
{
EndpointsForRange replicas =
keyspace.getMetadata().params.replication.isLocal()
? forLocalStrategyRange(metadata,
replicationStrategy, range)
: forNonLocalStategyRangeRead(metadata,
keyspace.getMetadata(), range);
replicas =
DatabaseDescriptor.getNodeProximity().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
replicas);
- replicas = replicas.filter(FailureDetector.isReplicaAlive);
return new ReplicaLayout.ForRangeRead(replicationStrategy, range,
replicas);
}
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 62db6f85f3..7d08b341b8 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.FBUtilities;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -44,6 +43,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
ConsistencyLevel consistencyLevel();
E contacts();
+ E liveAndDown();
Replica lookup(InetAddressAndPort endpoint);
P withContacts(E contacts);
@@ -82,29 +82,28 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
// - paxos, includes all live replicas (natural+pending), for this DC
if SERIAL_LOCAL
// ==> live.all() (if consistencyLevel.isDCLocal(), then
.filter(consistencyLevel.isLocal))
protected final E contacts;
+ protected final E liveAndDown;
protected final Function<ClusterMetadata, P> recompute;
protected List<InetAddressAndPort> contacted = new
CopyOnWriteArrayList<>();
- AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts,
Function<ClusterMetadata, P> recompute, Epoch epoch)
+ AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, E
liveAndDown, Function<ClusterMetadata, P> recompute, Epoch epoch)
{
assert contacts != null;
this.keyspace = keyspace;
this.replicationStrategy = replicationStrategy;
this.consistencyLevel = consistencyLevel;
this.contacts = contacts;
+ this.liveAndDown = liveAndDown;
this.recompute = recompute;
this.epoch = epoch;
}
public E contacts() { return contacts; }
+ public E liveAndDown() { return liveAndDown; }
public Keyspace keyspace() { return keyspace; }
public AbstractReplicationStrategy replicationStrategy() { return
replicationStrategy; }
public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
- public boolean canDoLocalRequest()
- {
- return contacts.contains(FBUtilities.getBroadcastAddressAndPort());
- }
public Epoch epoch()
{
@@ -132,10 +131,11 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
ConsistencyLevel consistencyLevel,
E candidates,
E contacts,
+ E liveAndDown,
Function<ClusterMetadata, P> recompute,
Epoch epoch)
{
- super(keyspace, replicationStrategy, consistencyLevel, contacts,
recompute, epoch);
+ super(keyspace, replicationStrategy, consistencyLevel, contacts,
liveAndDown, recompute, epoch);
this.candidates = candidates;
this.readQuorum = consistencyLevel.blockFor(replicationStrategy);
}
@@ -171,13 +171,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
ForRead<?, ?> newPlan = recompute.apply(newMetadata);
- if (readCandidates().equals(newPlan.readCandidates()))
+ if (liveAndDown().equals(newPlan.liveAndDown()))
return true;
int readQuorum = newPlan.readQuorum();
for (InetAddressAndPort addr : contacted)
{
- if (newPlan.readCandidates().contains(addr))
+ if (newPlan.liveAndDown().contains(addr))
readQuorum--;
}
@@ -204,17 +204,18 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
ConsistencyLevel consistencyLevel,
EndpointsForToken candidates,
EndpointsForToken contacts,
+ EndpointsForToken liveAndDown,
Function<ClusterMetadata,
ReplicaPlan.ForTokenRead> recompute,
Function<ReplicaPlan<?, ?>, ReplicaPlan.ForWrite>
repairPlan,
Epoch epoch)
{
- super(keyspace, replicationStrategy, consistencyLevel, candidates,
contacts, recompute, epoch);
+ super(keyspace, replicationStrategy, consistencyLevel, candidates,
contacts, liveAndDown, recompute, epoch);
this.repairPlan = repairPlan;
}
public ForTokenRead withContacts(EndpointsForToken newContacts)
{
- ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates, newContacts, recompute, repairPlan, epoch);
+ ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates, newContacts, liveAndDown, recompute, repairPlan,
epoch);
res.contacted.addAll(contacted);
return res;
}
@@ -240,12 +241,13 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
AbstractBounds<PartitionPosition> range,
EndpointsForRange candidates,
EndpointsForRange contact,
+ EndpointsForRange liveAndDown,
int vnodeCount,
Function<ClusterMetadata,
ReplicaPlan.ForRangeRead> recompute,
BiFunction<ReplicaPlan<?, ?>, Token,
ReplicaPlan.ForWrite> repairPlan,
Epoch epoch)
{
- super(keyspace, replicationStrategy, consistencyLevel, candidates,
contact, recompute, epoch);
+ super(keyspace, replicationStrategy, consistencyLevel, candidates,
contact, liveAndDown, recompute, epoch);
this.range = range;
this.vnodeCount = vnodeCount;
this.repairPlan = repairPlan;
@@ -260,7 +262,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
public ForRangeRead withContacts(EndpointsForRange newContact)
{
- ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, readCandidates(), newContact, vnodeCount, recompute,
repairPlan, epoch);
+ ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, readCandidates(), newContact, liveAndDown, vnodeCount,
recompute, repairPlan, epoch);
res.contacted.addAll(contacted);
return res;
}
@@ -284,6 +286,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
AbstractBounds<PartitionPosition> range,
EndpointsForRange candidates,
EndpointsForRange contact,
+ EndpointsForRange liveAndDown,
int vnodeCount,
Epoch epoch)
{
@@ -291,7 +294,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
// the epoch change during the course of query execution so no
recomputation function is supplied. Likewise,
// no read repair is expected to be performed during this type of
query so a null is also used in place of a
// function for calculating the repair plan.
- super(keyspace, replicationStrategy, consistencyLevel, range,
candidates, contact, vnodeCount, null, null, epoch);
+ super(keyspace, replicationStrategy, consistencyLevel, range,
candidates, contact, liveAndDown, vnodeCount, null, null, epoch);
}
@Override
@@ -305,7 +308,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
{
// TODO: this is only needed because of poor isolation of concerns
elsewhere - we can remove it soon, and will do so in a follow-up patch
final EndpointsForToken pending;
- final EndpointsForToken liveAndDown;
final EndpointsForToken live;
final int writeQuorum;
@@ -319,9 +321,8 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
Function<ClusterMetadata, ForWrite> recompute,
Epoch epoch)
{
- super(keyspace, replicationStrategy, consistencyLevel, contact,
recompute, epoch);
+ super(keyspace, replicationStrategy, consistencyLevel, contact,
liveAndDown, recompute, epoch);
this.pending = pending;
- this.liveAndDown = liveAndDown;
this.live = live;
this.writeQuorum =
consistencyLevel.blockForWrite(replicationStrategy, pending);
}
@@ -331,9 +332,6 @@ public interface ReplicaPlan<E extends Endpoints<E>, P
extends ReplicaPlan<E, P>
/** Replicas that a region of the ring is moving to; not yet ready to
serve reads, but should receive writes */
public EndpointsForToken pending() { return pending; }
- /** Replicas that can participate in the write - this always includes
all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM
(which is local DC only) */
- public EndpointsForToken liveAndDown() { return liveAndDown; }
-
/** The live replicas present in liveAndDown, usually derived from
FailureDetector.isReplicaAlive */
public EndpointsForToken live() { return live; }
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index b6a03b683b..53b32797c6 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -839,13 +839,9 @@ public class ReplicaPlans
private static ReplicaPlan.ForTokenRead
forSingleReplicaRead(ClusterMetadata metadata, Keyspace keyspace, Token token,
Replica replica)
{
- // todo; replica does not always contain token, figure out why
-// if
(!metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(token).contains(replica))
-// throw UnavailableException.create(ConsistencyLevel.ONE, 1, 1, 0,
0);
-
EndpointsForToken one = EndpointsForToken.of(token, replica);
- return new ReplicaPlan.ForTokenRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one,
+ return new ReplicaPlan.ForTokenRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one, one,
(newClusterMetadata) ->
forSingleReplicaRead(newClusterMetadata, keyspace, token, replica),
(self) -> {
throw new
IllegalStateException("Read repair is not supported for short read/replica
filtering protection.");
@@ -866,7 +862,7 @@ public class ReplicaPlans
// TODO: this is unsafe, as one.range() may be inconsistent with our
supplied range; should refactor Range/AbstractBounds to single class
EndpointsForRange one = EndpointsForRange.of(replica);
- return new ReplicaPlan.ForRangeRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one,
vnodeCount,
+ return new ReplicaPlan.ForRangeRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, one,
vnodeCount,
(newClusterMetadata) ->
forSingleReplicaRead(metadata, keyspace, range, replica, vnodeCount),
(self, token) -> {
throw new
IllegalStateException("Read repair is not supported for short read/replica
filtering protection.");
@@ -901,17 +897,24 @@ public class ReplicaPlans
return forRead(metadata, keyspace, token, indexQueryPlan,
consistencyLevel, retry, true);
}
- private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata,
Keyspace keyspace, Token token, @Nullable Index.QueryPlan indexQueryPlan,
ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry, boolean
throwOnInsufficientLiveReplicas)
+ private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata,
+ Keyspace keyspace,
+ Token token,
+ @Nullable Index.QueryPlan
indexQueryPlan,
+ ConsistencyLevel
consistencyLevel,
+ SpeculativeRetryPolicy
retry,
+ boolean
throwOnInsufficientLiveReplicas)
{
AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
- ReplicaLayout.ForTokenRead forTokenRead =
ReplicaLayout.forTokenReadLiveSorted(metadata, keyspace, replicationStrategy,
token);
- EndpointsForToken candidates = candidatesForRead(keyspace,
indexQueryPlan, consistencyLevel, forTokenRead.natural());
+ ReplicaLayout.ForTokenRead forTokenReadLiveAndDown =
ReplicaLayout.forTokenReadSorted(metadata, keyspace, replicationStrategy,
token);
+ ReplicaLayout.ForTokenRead forTokenReadLive =
forTokenReadLiveAndDown.filter(FailureDetector.isReplicaAlive);
+ EndpointsForToken candidates = candidatesForRead(keyspace,
indexQueryPlan, consistencyLevel, forTokenReadLive.all());
EndpointsForToken contacts = contactForRead(metadata.locator,
replicationStrategy, consistencyLevel,
retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates);
if (throwOnInsufficientLiveReplicas)
assureSufficientLiveReplicasForRead(metadata.locator,
replicationStrategy, consistencyLevel, contacts);
- return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates, contacts,
+ return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates, contacts, forTokenReadLiveAndDown.all(),
(newClusterMetadata) ->
forRead(newClusterMetadata, keyspace, token, indexQueryPlan, consistencyLevel,
retry, false),
(self) -> forReadRepair(self,
metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive),
metadata.epoch);
@@ -942,8 +945,9 @@ public class ReplicaPlans
boolean
throwOnInsufficientLiveReplicas)
{
AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
- ReplicaLayout.ForRangeRead forRangeRead =
ReplicaLayout.forRangeReadLiveSorted(metadata, keyspace, replicationStrategy,
range);
- EndpointsForRange candidates = candidatesForRead(keyspace,
indexQueryPlan, consistencyLevel, forRangeRead.natural());
+ ReplicaLayout.ForRangeRead forRangeReadLiveAndDown =
ReplicaLayout.forRangeReadSorted(metadata, keyspace, replicationStrategy,
range);
+ ReplicaLayout.ForRangeRead forRangeReadLive =
forRangeReadLiveAndDown.filter(FailureDetector.isReplicaAlive);
+ EndpointsForRange candidates = candidatesForRead(keyspace,
indexQueryPlan, consistencyLevel, forRangeReadLive.natural());
EndpointsForRange contacts = contactForRead(metadata.locator,
replicationStrategy, consistencyLevel, false, candidates);
if (throwOnInsufficientLiveReplicas)
@@ -955,6 +959,7 @@ public class ReplicaPlans
range,
candidates,
contacts,
+ forRangeReadLiveAndDown.all(),
vnodeCount,
(newClusterMetadata) ->
forRangeRead(newClusterMetadata, keyspace, indexQueryPlan, consistencyLevel,
range, vnodeCount, false),
(self, token) ->
forReadRepair(self, metadata, keyspace, consistencyLevel, token,
FailureDetector.isReplicaAlive),
@@ -983,7 +988,7 @@ public class ReplicaPlans
EndpointsForRange contacts = builder.build();
ClusterMetadata metadata = ClusterMetadata.current();
- return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, contacts, contacts, vnodeCount, metadata.epoch);
+ return new ReplicaPlan.ForFullRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, contacts, contacts, contacts, vnodeCount,
metadata.epoch);
}
/**
@@ -1000,6 +1005,7 @@ public class ReplicaPlans
if (!left.epoch.equals(right.epoch))
return null;
+ EndpointsForRange mergedLiveAndDown =
left.liveAndDown().keep(right.liveAndDown().endpoints());
EndpointsForRange mergedCandidates =
left.readCandidates().keep(right.readCandidates().endpoints());
AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
EndpointsForRange contacts = contactForRead(metadata.locator,
replicationStrategy, consistencyLevel, false, mergedCandidates);
@@ -1023,6 +1029,7 @@ public class ReplicaPlans
newRange,
mergedCandidates,
contacts,
+ mergedLiveAndDown,
newVnodeCount,
(newClusterMetadata) ->
forRangeRead(newClusterMetadata,
keyspace,
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 06f90907d5..15d2b320fb 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -415,6 +415,12 @@ public class Paxos
return electorateNatural;
}
+ @Override
+ public EndpointsForToken liveAndDown()
+ {
+ return all;
+ }
+
@Override
public boolean stillAppliesTo(ClusterMetadata newMetadata)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
index 1e5773c67a..ce3df571d8 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java
@@ -380,7 +380,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
* local reads triggered by read repair (after speculative reads) execute
at roughly the same time.
*
* This test depends on whether node1 gets a data or a digest request
first, we force it to be a digest request
- * in the forTokenReadLiveSorted ByteBuddy rule below.
+ * in the forTokenReadSorted ByteBuddy rule below.
*/
@Test
public void testLocalDataAndRemoteRequestConcurrency() throws Exception
@@ -440,7 +440,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
.load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(ReplicaLayout.class)
-
.method(named("forTokenReadLiveSorted").and(takesArguments(ClusterMetadata.class,
Keyspace.class, AbstractReplicationStrategy.class, Token.class)))
+
.method(named("forTokenReadSorted").and(takesArguments(ClusterMetadata.class,
Keyspace.class, AbstractReplicationStrategy.class, Token.class)))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
@@ -475,7 +475,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl
}
@SuppressWarnings({ "unused" })
- public static ReplicaLayout.ForTokenRead
forTokenReadLiveSorted(ClusterMetadata metadata, Keyspace keyspace,
AbstractReplicationStrategy replicationStrategy, Token token)
+ public static ReplicaLayout.ForTokenRead
forTokenReadSorted(ClusterMetadata metadata, Keyspace keyspace,
AbstractReplicationStrategy replicationStrategy, Token token)
{
try
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
new file mode 100644
index 0000000000..7abfd7386c
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/tcm/FailureDetectorRecomputeTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tcm;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+
+public class FailureDetectorRecomputeTest extends TestBaseImpl
+{
+ @Test
+ public void readTest() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
+ cluster.get(1).runOnInstance(() -> BB.enabled.set(true));
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute(withKeyspace("select * from
%s.tbl where id=?"), ConsistencyLevel.QUORUM, i);
+ }
+ }
+
+ @Test
+ public void writeTest() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key)"));
+ cluster.get(1).runOnInstance(() -> BB.enabled.set(true));
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute(withKeyspace("insert into
%s.tbl (id) values (?)"), ConsistencyLevel.QUORUM, i);
+ }
+ }
+
+ public static class BB
+ {
+ public static AtomicBoolean enabled = new AtomicBoolean();
+
+ public static void install(ClassLoader cl, int i)
+ {
+ new ByteBuddy().rebase(FailureDetector.class)
+ .method(named("isAlive").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(ReplicaPlan.AbstractForRead.class)
+
.method(named("stillAppliesTo").and(takesArguments(1)))
+
.intercept(MethodDelegation.to(FailureDetectorRecomputeTest.BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ static int downNode = 1;
+ public static boolean isAlive(InetAddressAndPort ep)
+ {
+ if (!enabled.get())
+ return true;
+ enabled.set(false);
+
ClusterMetadataService.instance().commit(CustomTransformation.make("hello"));
+ enabled.set(true);
+ return !ep.equals(InetAddressAndPort.getByNameUnchecked("127.0.0."
+ ((downNode % 3) + 1)));
+ }
+
+ public static boolean stillAppliesTo(ClusterMetadata metadata,
@SuperCall Callable<Boolean> zuper) throws Exception
+ {
+ if (!enabled.get())
+ return true;
+ downNode++;
+ return zuper.call();
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 9c56f00a80..d281025666 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -1331,7 +1331,7 @@ public class DataResolverTest extends
AbstractReadResponseTest
ks.getReplicationStrategy(),
consistencyLevel,
ReplicaUtils.FULL_BOUNDS,
- replicas,
replicas,
+ replicas,
replicas, replicas,
1, null,
repairPlan,
Epoch.EMPTY));
diff --git
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 84a1167295..17baa4fa55 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -215,7 +215,7 @@ public class DigestResolverTest extends
AbstractReadResponseTest
private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel
consistencyLevel, EndpointsForToken replicas)
{
- return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks,
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, null, (self)
-> null, Epoch.EMPTY));
+ return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks,
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas, replicas,
null, (self) -> null, Epoch.EMPTY));
}
private void waitForLatch(CountDownLatch startlatch)
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 046da259e9..e23c7078b4 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -278,6 +278,6 @@ public class ReadExecutorTest
private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel,
EndpointsForToken natural, EndpointsForToken selected)
{
- return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(),
consistencyLevel, natural, selected, (cm) -> null, (self) -> null, Epoch.EMPTY);
+ return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(),
consistencyLevel, natural, selected, natural, (cm) -> null, (self) -> null,
Epoch.EMPTY);
}
}
diff --git
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index f0026d35b9..1689069cf9 100644
---
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -357,6 +357,7 @@ public abstract class AbstractReadRepairTest
ReplicaUtils.FULL_BOUNDS,
replicas,
targets,
+ replicas,
1,
null,
(self, token) ->
forReadRepair(self, ClusterMetadata.current(), keyspace, consistencyLevel,
token, (r) -> true),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]