This is an automated email from the ASF dual-hosted git repository.

payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d1c93b832e MINOR: Various cleanups in raft module (#21883)
9d1c93b832e is described below

commit 9d1c93b832e35a4a10a02417b65a8a03f2ffc29e
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Mar 27 14:14:10 2026 +0100

    MINOR: Various cleanups in raft module (#21883)
    
    Small java cleanups + typo fixes
    
    Reviewers: PoAn Yang <[email protected]>, Ken Huang <[email protected]>
---
 .../java/org/apache/kafka/raft/LeaderState.java    |  2 +-
 .../kafka/raft/internals/BatchAccumulator.java     |  8 ++---
 .../kafka/raft/internals/KRaftVersionUpgrade.java  | 14 ++++-----
 .../kafka/raft/internals/RequestSendResult.java    |  2 +-
 .../kafka/raft/KafkaRaftClientPreVoteTest.java     |  4 +--
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 10 +++----
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 34 ++++++++++------------
 .../org/apache/kafka/raft/LeaderStateTest.java     |  2 +-
 .../java/org/apache/kafka/raft/MockLogTest.java    |  5 ++--
 .../apache/kafka/raft/RaftClientTestContext.java   | 10 ++-----
 .../org/apache/kafka/raft/ReplicatedCounter.java   | 10 ++-----
 .../java/org/apache/kafka/raft/VoterSetTest.java   |  2 +-
 .../raft/internals/RecordsBatchReaderTest.java     |  4 +--
 13 files changed, 46 insertions(+), 61 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 39dcdb2487c..d6d3d5a6180 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -563,7 +563,7 @@ public class LeaderState<T> implements EpochState {
             );
         }
 
-        // Upgrade to kraft.verion 1 is only supported; this needs to change 
when kraft.version 2 is added
+        // Upgrade to kraft.version 1 is only supported; this needs to change 
when kraft.version 2 is added
         var inMemoryVoters = 
kraftVersionUpgradeState.get().toVoters().orElseThrow(() ->
             new InvalidUpdateVersionException(
                 String.format(
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 4bc26db7537..4040a4d95e4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -140,7 +140,7 @@ public class BatchAccumulator<T> implements Closeable {
             long lastOffset = nextOffset + records.size() - 1;
             maybeCompleteDrain();
 
-            BatchBuilder<T> batch = null;
+            BatchBuilder<T> batch;
             batch = maybeAllocateBatch(records, serializationCache);
             if (batch == null) {
                 throw new BufferAllocationException("Append failed because we 
failed to allocate memory to write the batch");
@@ -576,7 +576,7 @@ public class BatchAccumulator<T> implements Closeable {
             this.pool = pool;
             this.initialBuffer = initialBuffer;
 
-            validateContruction();
+            validateConstruction();
         }
 
         private CompletedBatch(
@@ -593,10 +593,10 @@ public class BatchAccumulator<T> implements Closeable {
             this.pool = pool;
             this.initialBuffer = initialBuffer;
 
-            validateContruction();
+            validateConstruction();
         }
 
-        private void validateContruction() {
+        private void validateConstruction() {
             Objects.requireNonNull(data.firstBatch(), "Expected memory records 
to contain one batch");
 
             if (numRecords <= 0) {
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
index 22d0e8f8946..3e2a17b8f3e 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
@@ -31,16 +31,16 @@ import java.util.Optional;
  * avoid blocking locks.
  */
 public sealed interface KRaftVersionUpgrade {
-    public record Empty() implements KRaftVersionUpgrade {
+    record Empty() implements KRaftVersionUpgrade {
     }
 
-    public record Version(KRaftVersion kraftVersion) implements 
KRaftVersionUpgrade {
+    record Version(KRaftVersion kraftVersion) implements KRaftVersionUpgrade {
     }
 
-    public record Voters(VoterSet voters) implements KRaftVersionUpgrade {
+    record Voters(VoterSet voters) implements KRaftVersionUpgrade {
     }
 
-    public default Optional<Voters> toVoters() {
+    default Optional<Voters> toVoters() {
         if (this instanceof Voters) {
             return Optional.of(((Voters) this));
         } else {
@@ -48,7 +48,7 @@ public sealed interface KRaftVersionUpgrade {
         }
     }
 
-    public default Optional<Version> toVersion() {
+    default Optional<Version> toVersion() {
         if (this instanceof Version) {
             return Optional.of(((Version) this));
         } else {
@@ -56,9 +56,9 @@ public sealed interface KRaftVersionUpgrade {
         }
     }
 
-    static final KRaftVersionUpgrade EMPTY = new Empty();
+    KRaftVersionUpgrade EMPTY = new Empty();
 
-    public static KRaftVersionUpgrade empty() {
+    static KRaftVersionUpgrade empty() {
         return EMPTY;
     }
 
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
index 891d8e32509..70a75dca4d4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.raft.internals;
 
 /**
- * Type to capture the atempt to send a request.
+ * Type to capture the attempt to send a request.
  *
  * @param requestSent true if the request was sent
  * @param timeToWaitMs the amount of time to wait in milliseconds before 
attempting to resend the
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
index f115d59798e..1a4ca403db9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
@@ -478,7 +478,7 @@ public class KafkaRaftClientPreVoteTest {
         // invalid voter id is rejected
         context.deliverRequest(
             context.voteRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch,
                 otherNodeKey,
                 ReplicaKey.of(10, Uuid.randomUuid()),
@@ -493,7 +493,7 @@ public class KafkaRaftClientPreVoteTest {
         // invalid voter directory id is rejected
         context.deliverRequest(
             context.voteRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 epoch,
                 otherNodeKey,
                 ReplicaKey.of(0, Uuid.randomUuid()),
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bfdbd14c18a..85278779b79 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -841,7 +841,7 @@ public final class KafkaRaftClientSnapshotTest {
         int epoch = context.currentEpoch();
 
         FetchSnapshotRequestData voter1FetchSnapshotRequest = 
fetchSnapshotRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 voter1,
                 context.metadataPartition,
                 epoch,
@@ -851,7 +851,7 @@ public final class KafkaRaftClientSnapshotTest {
         );
 
         FetchSnapshotRequestData voter2FetchSnapshotRequest = 
fetchSnapshotRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 voter2,
                 context.metadataPartition,
                 epoch,
@@ -861,7 +861,7 @@ public final class KafkaRaftClientSnapshotTest {
         );
 
         FetchSnapshotRequestData observerFetchSnapshotRequest = 
fetchSnapshotRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 observer3,
                 context.metadataPartition,
                 epoch,
@@ -1955,7 +1955,7 @@ public final class KafkaRaftClientSnapshotTest {
         // valid cluster id is accepted
         context.deliverRequest(
             fetchSnapshotRequest(
-                context.clusterId.toString(),
+                context.clusterId,
                 otherNode,
                 context.metadataPartition,
                 epoch,
@@ -2264,7 +2264,7 @@ public final class KafkaRaftClientSnapshotTest {
     private static void assertBootstrapSnapshot(
         RaftClientTestContext context,
         List<String> expectedRecords
-    ) throws Exception {
+    ) {
         try (SnapshotReader<String> bootstrapSnapshot = 
context.listener.drainHandledBootstrapSnapshot().get()) {
             assertEquals(Snapshots.BOOTSTRAP_SNAPSHOT_ID, 
bootstrapSnapshot.snapshotId());
             
SnapshotWriterReaderTest.assertDataSnapshot(List.of(expectedRecords), 
bootstrapSnapshot);
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index f44f09581cc..e4d879c64ed 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2037,7 +2037,7 @@ class KafkaRaftClientTest {
         context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
         assertTrue(context.client.quorum().isUnattached());
 
-        context.time.sleep(context.electionTimeoutMs() * 2);
+        context.time.sleep(context.electionTimeoutMs() * 2L);
         context.pollUntilRequest();
         assertTrue(context.client.quorum().isUnattached());
         context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
@@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
         // observer can vote
         context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
 
-        context.time.sleep(context.electionTimeoutMs() * 2);
+        context.time.sleep(context.electionTimeoutMs() * 2L);
         context.pollUntilRequest();
         // observer cannot transition to prospective though
         assertTrue(context.client.quorum().isUnattached());
@@ -3467,8 +3467,7 @@ class KafkaRaftClientTest {
         long followerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(1, follower, fetchOffset, 
epoch, 0));
         context.pollUntilResponse();
-        long expectedHW = fetchOffset;
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
 
         // Create observer
         ReplicaKey observer = replicaKey(localId + 2, withKip853Rpc);
@@ -3477,7 +3476,7 @@ class KafkaRaftClientTest {
         long observerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, observer, 0L, 0, 
0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
 
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
@@ -3505,14 +3504,14 @@ class KafkaRaftClientTest {
                 .setLogEndOffset(0L)
                 .setLastFetchTimestamp(observerFetchTime)
                 .setLastCaughtUpTimestamp(-1L));
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, expectedObserverStates);
 
         // Update observer fetch state
         context.time.sleep(100);
         observerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, observer, 
fetchOffset, epoch, 0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
 
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
@@ -3525,7 +3524,7 @@ class KafkaRaftClientTest {
             .setLogEndOffset(fetchOffset)
             .setLastFetchTimestamp(observerFetchTime)
             .setLastCaughtUpTimestamp(observerFetchTime);
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, expectedObserverStates);
 
         // Observer falls behind
         context.time.sleep(100);
@@ -3541,7 +3540,7 @@ class KafkaRaftClientTest {
             .setLogEndOffset(fetchOffset + records.size())
             .setLastFetchTimestamp(context.time.milliseconds())
             .setLastCaughtUpTimestamp(context.time.milliseconds());
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, expectedObserverStates);
 
         // Observer is removed due to inactivity
         long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS;
@@ -3550,7 +3549,7 @@ class KafkaRaftClientTest {
             followerFetchTime = context.time.milliseconds();
             context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
             context.pollUntilResponse();
-            context.assertSentFetchPartitionResponse(expectedHW, epoch);
+            context.assertSentFetchPartitionResponse(fetchOffset, epoch);
 
             context.time.sleep(context.checkQuorumTimeoutMs - 1);
             timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1);
@@ -3563,19 +3562,19 @@ class KafkaRaftClientTest {
             .setLastCaughtUpTimestamp(context.time.milliseconds());
         expectedVoterStates.get(1)
             .setLastFetchTimestamp(followerFetchTime);
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, List.of());
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, List.of());
 
         // No-op for negative node id
         context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, 
ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
         context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
 
         expectedVoterStates.get(0)
             .setLastFetchTimestamp(context.time.milliseconds())
             .setLastCaughtUpTimestamp(context.time.milliseconds());
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, List.of());
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, List.of());
     }
 
     @ParameterizedTest
@@ -3613,8 +3612,7 @@ class KafkaRaftClientTest {
         long followerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
         context.pollUntilResponse();
-        long expectedHW = fetchOffset;
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
 
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
@@ -3632,7 +3630,7 @@ class KafkaRaftClientTest {
                 .setLogEndOffset(fetchOffset)
                 .setLastFetchTimestamp(followerFetchTime)
                 .setLastCaughtUpTimestamp(followerFetchTime));
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, List.of());
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, List.of());
 
         // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
         // The leader will report an error in the logs, but will not let the 
high watermark rewind
@@ -3640,7 +3638,7 @@ class KafkaRaftClientTest {
         followerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset - 1, epoch, 0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.assertSentFetchPartitionResponse(fetchOffset, epoch);
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
@@ -3651,7 +3649,7 @@ class KafkaRaftClientTest {
         expectedVoterStates.get(1)
             .setLogEndOffset(fetchOffset - batch.size())
             .setLastFetchTimestamp(followerFetchTime);
-        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, List.of());
+        context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset, 
expectedVoterStates, List.of());
     }
 
     @ParameterizedTest
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index d66fb31399c..5b4ed99fa97 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -732,7 +732,7 @@ public class LeaderStateTest {
                 .get()
         );
 
-        // Upate in-memory voter and check state
+        // Update in-memory voter and check state
         assertTrue(
             state.compareAndSetVolatileVoters(votersWithLeaderUpdated, 
updatedVoters)
         );
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 8fc6f0c769b..c3d6ff29186 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -48,7 +48,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -1134,7 +1133,7 @@ public class MockLogTest {
 
     @ParameterizedTest
     @ValueSource(ints = {1, 2})
-    public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws 
IOException {
+    public void testReadRespectsMaxSizeInBytes(int expectedBatches) {
         int recordsPerBatch = 5;
         appendBatch(recordsPerBatch, 1);
         appendBatch(recordsPerBatch, 1);
@@ -1211,7 +1210,7 @@ public class MockLogTest {
                 log.endOffset().offset(),
                 Compression.NONE,
                 epoch,
-                records.toArray(new SimpleRecord[records.size()])
+                records.toArray(new SimpleRecord[0])
             ),
             epoch
         );
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index dfc4ec9c309..2b01c153254 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1354,7 +1354,7 @@ public final class RaftClientTestContext {
         assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId());
         assertEquals(replicaKey.directoryId().get(), 
addRaftVoterRequestData.voterDirectoryId());
         assertEquals(endpoints, 
Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners()));
-        assertEquals(false, addRaftVoterRequestData.ackWhenCommitted());
+        assertFalse(addRaftVoterRequestData.ackWhenCommitted());
 
         return request;
     }
@@ -2337,7 +2337,7 @@ public final class RaftClientTestContext {
         }
 
         void readBatch(BatchReader<String> reader) {
-            try {
+            try (reader) {
                 while (reader.hasNext()) {
                     long nextOffset = lastCommitOffset().isPresent() ?
                         lastCommitOffset().getAsLong() + 1 : 0L;
@@ -2349,8 +2349,6 @@ public final class RaftClientTestContext {
                             ". We expected an offset at least as large as " + 
nextOffset);
                     commits.add(batch);
                 }
-            } finally {
-                reader.close();
             }
         }
 
@@ -2425,10 +2423,6 @@ public final class RaftClientTestContext {
         // autoJoin support
         KIP_1186_PROTOCOL;
 
-        boolean isKRaftSupported() {
-            return isAtLeast(KIP_595_PROTOCOL);
-        }
-
         boolean isReconfigSupported() {
             return isAtLeast(KIP_853_PROTOCOL);
         }
diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java 
b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
index 6e0de670b08..bcc29eb189f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -77,7 +77,7 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
 
     @Override
     public synchronized void handleCommit(BatchReader<Integer> reader) {
-        try {
+        try (reader) {
             int initialCommitted = committed;
             long lastCommittedOffset = -1;
             int lastCommittedEpoch = 0;
@@ -90,7 +90,7 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
                     batch.records(),
                     batch.baseOffset()
                 );
-                for (Integer nextCommitted: batch.records()) {
+                for (Integer nextCommitted : batch.records()) {
                     if (nextCommitted != committed + 1) {
                         throw new AssertionError(
                             String.format(
@@ -132,14 +132,12 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
                     lastOffsetSnapshotted = lastCommittedOffset;
                 }
             }
-        } finally {
-            reader.close();
         }
     }
 
     @Override
     public synchronized void handleLoadSnapshot(SnapshotReader<Integer> 
reader) {
-        try {
+        try (reader) {
             log.debug("Loading snapshot {}", reader.snapshotId());
             // Since the state machine is only one value, expect only one data 
record
             boolean foundDataRecord = false;
@@ -176,8 +174,6 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
             lastOffsetSnapshotted = reader.lastContainedLogOffset();
             handleLoadSnapshotCalls += 1;
             log.debug("Finished loading snapshot. Set value: {}", committed);
-        } finally {
-            reader.close();
         }
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java 
b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
index 307b6aa59a1..bd303933103 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
@@ -148,7 +148,7 @@ public final class VoterSetTest {
         Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 
2, 3), false);
         VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
 
-        // Cannot override node id not contianed in the voter set
+        // Cannot override node id not contained in the voter set
         assertEquals(Optional.empty(), 
voterSet.updateVoterIgnoringDirectoryId(voterNode(4, true)));
 
         // Test that it can override voter set with different directory ids
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 1606681600f..e01db38dc50 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -133,7 +133,7 @@ class RecordsBatchReaderTest {
             true,
             new LogContext()
         );
-        try {
+        try (reader) {
             for (TestBatch<String> batch : expectedBatches) {
                 assertTrue(reader.hasNext());
                 assertEquals(batch, TestBatch.from(reader.next()));
@@ -141,8 +141,6 @@ class RecordsBatchReaderTest {
 
             assertFalse(reader.hasNext());
             assertThrows(NoSuchElementException.class, reader::next);
-        } finally {
-            reader.close();
         }
 
         Mockito.verify(closeListener).onClose(reader);

Reply via email to