kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r2006079891


##########
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##########
@@ -1127,14 +1477,83 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
         @Override
         public void verify() {
-            cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-                long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-                    .filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-                    .filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-                    .count();
-                assertTrue(
-                    numReachedHighWatermark >= cluster.majoritySize(),
-                    "Insufficient nodes have reached current high watermark");
+            if (cluster.withKip853) {
+                /*
+                * For clusters running in KIP-853 mode, we check that a 
majority of at least one of the following
+                * voter sets has reached the high watermark:
+                * 1. the leader's voter set at the HWM (i.e. the last 
committed voter set)
+                * 2. the leader's lastVoterSet() (which may or may not be 
committed)
+                * Note that 1 and 2 can be the same set, but when they are 
not, lastVoterSet() is uncommitted,
+                * which follows from the AtMostOneUncommittedVoterSet 
invariant.
+                *
+                * A more elaborate check is necessary for this invariant 
because this method can get called after the
+                * leader has updated its lastVoterSet() with a new uncommitted 
voter set, but before the leader has
+                * updated its high watermark using the new voter set. In this 
case, we need to check that the majority
+                * of the last committed voter set has reached the current high 
watermark.
+                * */
+                cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                    leaderNode.client.highWatermark().ifPresent(highWatermark 
-> {
+                        VoterSet voterSet = 
leaderNode.client.partitionState().lastVoterSet();
+                        long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, voterSet.voterIds());
+                        if (numReachedHighWatermark < 
cluster.majoritySize(voterSet.size())) {
+                            
leaderNode.client.partitionState().voterSetAtOffset(highWatermark - 
1).ifPresent(otherVoterSet -> {
+                                long nodesReachedHighWatermark = 
numReachedHighWatermark(highWatermark, otherVoterSet.voterIds());
+                                assertTrue(
+                                    nodesReachedHighWatermark >= 
cluster.majoritySize(otherVoterSet.size()),
+                                    "Insufficient nodes have reached current 
high watermark. Expected at least " +
+                                        
cluster.majoritySize(otherVoterSet.size()) + " but got " + 
nodesReachedHighWatermark);
+                            });
+                            return;
+                        }
+                        assertTrue(
+                            numReachedHighWatermark >= 
cluster.majoritySize(voterSet.size()),
+                            "Insufficient nodes have reached current high 
watermark. Expected at least " +
+                                cluster.majoritySize(voterSet.size()) + " but 
got " + numReachedHighWatermark);
+                    });
+                });
+            } else {
+                cluster.leaderHighWatermark().ifPresent(highWatermark -> {
+                    long numReachedHighWatermark = 
numReachedHighWatermark(highWatermark, cluster.initialVoters.keySet());
+                    assertTrue(
+                        numReachedHighWatermark >= 
cluster.majoritySize(cluster.initialVoters.size()),
+                        "Insufficient nodes have reached current high 
watermark");
+                });
+            }
+        }
+
+        private long numReachedHighWatermark(long highWatermark, Set<Integer> 
voterIds) {
+            return cluster.persistentStates.entrySet().stream()
+                .filter(entry -> voterIds.contains(entry.getKey()))
+                .filter(entry -> entry.getValue().log.endOffset().offset() >= 
highWatermark)
+                .count();
+        }
+    }
+
+    private static class AtMostOneUncommittedVotersRecord implements Invariant 
{
+        final Cluster cluster;
+
+        private AtMostOneUncommittedVotersRecord(Cluster cluster) {
+            this.cluster = cluster;
+        }
+
+        @Override
+        public void verify() {
+            cluster.leaderWithMaxEpoch().ifPresent(leaderNode -> {
+                leaderNode.log.readBatches(leaderNode.highWatermark(), 
OptionalLong.of(leaderNode.logEndOffset())).forEach(batch -> {
+                    boolean seenUncommittedVotersRecord = false;
+                    if (batch.isControlBatch) {
+                        for (LogEntry entry : batch.entries) {
+                            short typeId = 
ControlRecordType.parseTypeId(entry.record.key());
+                            ControlRecordType type = 
ControlRecordType.fromTypeId(typeId);
+                            if (type == ControlRecordType.KRAFT_VOTERS) {
+                                if (seenUncommittedVotersRecord) {
+                                    fail("More than one uncommitted voters 
record found in the log");
+                                }
+                                seenUncommittedVotersRecord = true;
+                            }
+                        }
+                    }
+                });

Review Comment:
   What do you mean by implementation-specific? Specific to KIP-853?
   
   When we think of an invariant that encapsulates a "consistent log", isn't 
that really a combination of the other pre-defined invariants together (like 
monotonic HWM/Epoch)? From what I understand, this is adding an additional 
criteria for the log to be "consistent" while the RaftNodes are running.
   
   We preform validation at the end of the test with the 
`ConsistentCommittedData` validation, but this only applies to committed data, 
and does not run while the RaftNodes are polling. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to