This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 366d4958dfc KAFKA-18563 move RaftClientTestContext RpcVersion methods 
into RaftProtocol (#19349)
366d4958dfc is described below

commit 366d4958dfc45c92a8b07ae90ee59506388e24cc
Author: PoAn Yang <[email protected]>
AuthorDate: Wed May 6 00:37:41 2026 +0800

    KAFKA-18563 move RaftClientTestContext RpcVersion methods into RaftProtocol 
(#19349)
    
    There are lot of xyzRpcVersion which returning value by checking
    RaftProtocol. It's better to include these functions in RaftProtocol
    itself.
    
    Reviewers: TaiJuWu <[email protected]>, Ken Huang <[email protected]>,
     Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/raft/RaftClientTestContext.java   | 212 ++++++++++-----------
 1 file changed, 102 insertions(+), 110 deletions(-)

diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 02f92500711..d819102a79b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -812,7 +812,7 @@ public final class RaftClientTestContext {
         }
 
         DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection(0);
-        if (describeQuorumRpcVersion() >= 2) {
+        if (raftProtocol.describeQuorumRpcVersion() >= 2) {
             nodes = new 
DescribeQuorumResponseData.NodeCollection(voterStates.size());
             for (ReplicaState voterState : voterStates) {
                 nodes.add(new DescribeQuorumResponseData.Node()
@@ -1558,7 +1558,7 @@ public final class RaftClientTestContext {
     ) {
         return RaftUtil.singletonEndQuorumEpochResponse(
             channel.listenerName(),
-            endQuorumEpochRpcVersion(),
+            raftProtocol.endQuorumEpochRpcVersion(),
             Errors.NONE,
             metadataPartition,
             Errors.NONE,
@@ -1651,7 +1651,7 @@ public final class RaftClientTestContext {
     BeginQuorumEpochResponseData beginEpochResponse(int epoch, int leaderId) {
         return RaftUtil.singletonBeginQuorumEpochResponse(
             channel.listenerName(),
-            beginQuorumEpochRpcVersion(),
+            raftProtocol.beginQuorumEpochRpcVersion(),
             Errors.NONE,
             metadataPartition,
             Errors.NONE,
@@ -1738,11 +1738,11 @@ public final class RaftClientTestContext {
     }
 
     VoteResponseData voteResponse(boolean voteGranted, OptionalInt leaderId, 
int epoch) {
-        return voteResponse(Errors.NONE, voteGranted, leaderId, epoch, 
voteRpcVersion());
+        return voteResponse(Errors.NONE, voteGranted, leaderId, epoch, 
raftProtocol.voteRpcVersion());
     }
 
     VoteResponseData voteResponse(Errors error, OptionalInt leaderId, int 
epoch) {
-        return voteResponse(error, false, leaderId, epoch, voteRpcVersion());
+        return voteResponse(error, false, leaderId, epoch, 
raftProtocol.voteRpcVersion());
     }
 
     VoteResponseData voteResponse(Errors error, boolean voteGranted, 
OptionalInt leaderId, int epoch, short version) {
@@ -1925,7 +1925,7 @@ public final class RaftClientTestContext {
     ) {
         return RaftUtil.singletonFetchResponse(
             channel.listenerName(),
-            fetchRpcVersion(),
+            raftProtocol.fetchRpcVersion(),
             metadataPartition,
             metadataTopicId,
             Errors.NONE,
@@ -1953,7 +1953,7 @@ public final class RaftClientTestContext {
     ) {
         return RaftUtil.singletonFetchResponse(
             channel.listenerName(),
-            fetchRpcVersion(),
+            raftProtocol.fetchRpcVersion(),
             metadataPartition,
             metadataTopicId,
             Errors.NONE,
@@ -1983,7 +1983,7 @@ public final class RaftClientTestContext {
     ) {
         return RaftUtil.singletonFetchResponse(
             channel.listenerName(),
-            fetchRpcVersion(),
+            raftProtocol.fetchRpcVersion(),
             metadataPartition,
             metadataTopicId,
             Errors.NONE,
@@ -2011,7 +2011,7 @@ public final class RaftClientTestContext {
     ) {
         return RaftUtil.singletonFetchSnapshotResponse(
             channel.listenerName(),
-            fetchSnapshotRpcVersion(),
+            raftProtocol.fetchSnapshotRpcVersion(),
             metadataPartition,
             leaderId,
             startingVoters.listeners(leaderId),
@@ -2091,103 +2091,25 @@ public final class RaftClientTestContext {
         );
     }
 
-    private short fetchRpcVersion() {
-        if (raftProtocol.isHwmInFetchSupported()) {
-            return 18;
-        } else if (raftProtocol.isReconfigSupported()) {
-            return 17;
-        } else {
-            return 16;
-        }
-    }
-
-    private short fetchSnapshotRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-    short voteRpcVersion() {
-        if (raftProtocol.isPreVoteSupported()) {
-            return 2;
-        } else if (raftProtocol.isReconfigSupported()) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-    private short beginQuorumEpochRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-    private short endQuorumEpochRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-    private short describeQuorumRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 2;
-        } else {
-            return 1;
-        }
-    }
-
-    private short addVoterRpcVersion() {
-        if (raftProtocol.isAutoJoinSupported()) {
-            return 1;
-        } else if (raftProtocol.isReconfigSupported()) {
-            return 0;
-        } else {
-            throw new IllegalStateException("Reconfiguration must be enabled 
by calling withRaftProtocol(KIP_853_PROTOCOL)");
-        }
-    }
-
-    private short removeVoterRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 0;
-        } else {
-            throw new IllegalStateException("Reconfiguration must be enabled 
by calling withRaftProtocol(KIP_853_PROTOCOL)");
-        }
-    }
-
-    private short updateVoterRpcVersion() {
-        if (raftProtocol.isReconfigSupported()) {
-            return 0;
-        } else {
-            throw new IllegalStateException("Reconfiguration must be enabled 
by calling withRaftProtocol(KIP_853_PROTOCOL)");
-        }
-    }
-
     private short raftRequestVersion(ApiMessage request) {
         if (request instanceof FetchRequestData) {
-            return fetchRpcVersion();
+            return raftProtocol.fetchRpcVersion();
         } else if (request instanceof FetchSnapshotRequestData) {
-            return fetchSnapshotRpcVersion();
+            return raftProtocol.fetchSnapshotRpcVersion();
         } else if (request instanceof VoteRequestData) {
-            return voteRpcVersion();
+            return raftProtocol.voteRpcVersion();
         } else if (request instanceof BeginQuorumEpochRequestData) {
-            return beginQuorumEpochRpcVersion();
+            return raftProtocol.beginQuorumEpochRpcVersion();
         } else if (request instanceof EndQuorumEpochRequestData) {
-            return endQuorumEpochRpcVersion();
+            return raftProtocol.endQuorumEpochRpcVersion();
         } else if (request instanceof DescribeQuorumRequestData) {
-            return describeQuorumRpcVersion();
+            return raftProtocol.describeQuorumRpcVersion();
         } else if (request instanceof AddRaftVoterRequestData) {
-            return addVoterRpcVersion();
+            return raftProtocol.addVoterRpcVersion();
         } else if (request instanceof RemoveRaftVoterRequestData) {
-            return removeVoterRpcVersion();
+            return raftProtocol.removeVoterRpcVersion();
         } else if (request instanceof UpdateRaftVoterRequestData) {
-            return updateVoterRpcVersion();
+            return raftProtocol.updateVoterRpcVersion();
         } else {
             throw new IllegalArgumentException(String.format("Request %s is 
not a raft request", request));
         }
@@ -2195,23 +2117,23 @@ public final class RaftClientTestContext {
 
     private short raftResponseVersion(ApiMessage response) {
         if (response instanceof FetchResponseData) {
-            return fetchRpcVersion();
+            return raftProtocol.fetchRpcVersion();
         } else if (response instanceof FetchSnapshotResponseData) {
-            return fetchSnapshotRpcVersion();
+            return raftProtocol.fetchSnapshotRpcVersion();
         } else if (response instanceof VoteResponseData) {
-            return voteRpcVersion();
+            return raftProtocol.voteRpcVersion();
         } else if (response instanceof BeginQuorumEpochResponseData) {
-            return beginQuorumEpochRpcVersion();
+            return raftProtocol.beginQuorumEpochRpcVersion();
         } else if (response instanceof EndQuorumEpochResponseData) {
-            return endQuorumEpochRpcVersion();
+            return raftProtocol.endQuorumEpochRpcVersion();
         } else if (response instanceof DescribeQuorumResponseData) {
-            return describeQuorumRpcVersion();
+            return raftProtocol.describeQuorumRpcVersion();
         } else if (response instanceof AddRaftVoterResponseData) {
-            return addVoterRpcVersion();
+            return raftProtocol.addVoterRpcVersion();
         } else if (response instanceof RemoveRaftVoterResponseData) {
-            return removeVoterRpcVersion();
+            return raftProtocol.removeVoterRpcVersion();
         } else if (response instanceof UpdateRaftVoterResponseData) {
-            return updateVoterRpcVersion();
+            return raftProtocol.updateVoterRpcVersion();
         } else if (response instanceof ApiVersionsResponseData) {
             return 4;
         } else {
@@ -2431,12 +2353,82 @@ public final class RaftClientTestContext {
             return isAtLeast(KIP_996_PROTOCOL);
         }
 
-        boolean isHwmInFetchSupported() {
-            return isAtLeast(KIP_1166_PROTOCOL);
+        short describeQuorumRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 2;
+            } else {
+                return 1;
+            }
+        }
+
+        short fetchRpcVersion() {
+            if (isAtLeast(KIP_1166_PROTOCOL)) {
+                return 18;
+            } else if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 17;
+            } else {
+                return 16;
+            }
+        }
+
+        short fetchSnapshotRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+        short voteRpcVersion() {
+            if (isAtLeast(KIP_996_PROTOCOL)) {
+                return 2;
+            } else if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+        short beginQuorumEpochRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 1;
+            } else {
+                return 0;
+            }
         }
-      
-        boolean isAutoJoinSupported() {
-            return isAtLeast(KIP_1186_PROTOCOL);
+
+        short endQuorumEpochRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 1;
+            } else {
+                return 0;
+            }
+        }
+
+        short addVoterRpcVersion() {
+            if (isAtLeast(KIP_1186_PROTOCOL)) {
+                return 1;
+            } else if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 0;
+            } else {
+                throw new IllegalStateException("Reconfiguration must be 
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+            }
+        }
+
+        short removeVoterRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 0;
+            } else {
+                throw new IllegalStateException("Reconfiguration must be 
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+            }
+        }
+
+        short updateVoterRpcVersion() {
+            if (isAtLeast(KIP_853_PROTOCOL)) {
+                return 0;
+            } else {
+                throw new IllegalStateException("Reconfiguration must be 
enabled by calling withRaftProtocol(KIP_853_PROTOCOL)");
+            }
         }
 
         private boolean isAtLeast(RaftProtocol otherRpc) {

Reply via email to