This is an automated email from the ASF dual-hosted git repository.
payang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9d1c93b832e MINOR: Various cleanups in raft module (#21883)
9d1c93b832e is described below
commit 9d1c93b832e35a4a10a02417b65a8a03f2ffc29e
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Mar 27 14:14:10 2026 +0100
MINOR: Various cleanups in raft module (#21883)
Small java cleanups + typo fixes
Reviewers: PoAn Yang <[email protected]>, Ken Huang <[email protected]>
---
.../java/org/apache/kafka/raft/LeaderState.java | 2 +-
.../kafka/raft/internals/BatchAccumulator.java | 8 ++---
.../kafka/raft/internals/KRaftVersionUpgrade.java | 14 ++++-----
.../kafka/raft/internals/RequestSendResult.java | 2 +-
.../kafka/raft/KafkaRaftClientPreVoteTest.java | 4 +--
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 10 +++----
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 34 ++++++++++------------
.../org/apache/kafka/raft/LeaderStateTest.java | 2 +-
.../java/org/apache/kafka/raft/MockLogTest.java | 5 ++--
.../apache/kafka/raft/RaftClientTestContext.java | 10 ++-----
.../org/apache/kafka/raft/ReplicatedCounter.java | 10 ++-----
.../java/org/apache/kafka/raft/VoterSetTest.java | 2 +-
.../raft/internals/RecordsBatchReaderTest.java | 4 +--
13 files changed, 46 insertions(+), 61 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 39dcdb2487c..d6d3d5a6180 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -563,7 +563,7 @@ public class LeaderState<T> implements EpochState {
);
}
- // Upgrade to kraft.verion 1 is only supported; this needs to change
when kraft.version 2 is added
+ // Upgrade to kraft.version 1 is only supported; this needs to change
when kraft.version 2 is added
var inMemoryVoters =
kraftVersionUpgradeState.get().toVoters().orElseThrow(() ->
new InvalidUpdateVersionException(
String.format(
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 4bc26db7537..4040a4d95e4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -140,7 +140,7 @@ public class BatchAccumulator<T> implements Closeable {
long lastOffset = nextOffset + records.size() - 1;
maybeCompleteDrain();
- BatchBuilder<T> batch = null;
+ BatchBuilder<T> batch;
batch = maybeAllocateBatch(records, serializationCache);
if (batch == null) {
throw new BufferAllocationException("Append failed because we
failed to allocate memory to write the batch");
@@ -576,7 +576,7 @@ public class BatchAccumulator<T> implements Closeable {
this.pool = pool;
this.initialBuffer = initialBuffer;
- validateContruction();
+ validateConstruction();
}
private CompletedBatch(
@@ -593,10 +593,10 @@ public class BatchAccumulator<T> implements Closeable {
this.pool = pool;
this.initialBuffer = initialBuffer;
- validateContruction();
+ validateConstruction();
}
- private void validateContruction() {
+ private void validateConstruction() {
Objects.requireNonNull(data.firstBatch(), "Expected memory records
to contain one batch");
if (numRecords <= 0) {
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
index 22d0e8f8946..3e2a17b8f3e 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftVersionUpgrade.java
@@ -31,16 +31,16 @@ import java.util.Optional;
* avoid blocking locks.
*/
public sealed interface KRaftVersionUpgrade {
- public record Empty() implements KRaftVersionUpgrade {
+ record Empty() implements KRaftVersionUpgrade {
}
- public record Version(KRaftVersion kraftVersion) implements
KRaftVersionUpgrade {
+ record Version(KRaftVersion kraftVersion) implements KRaftVersionUpgrade {
}
- public record Voters(VoterSet voters) implements KRaftVersionUpgrade {
+ record Voters(VoterSet voters) implements KRaftVersionUpgrade {
}
- public default Optional<Voters> toVoters() {
+ default Optional<Voters> toVoters() {
if (this instanceof Voters) {
return Optional.of(((Voters) this));
} else {
@@ -48,7 +48,7 @@ public sealed interface KRaftVersionUpgrade {
}
}
- public default Optional<Version> toVersion() {
+ default Optional<Version> toVersion() {
if (this instanceof Version) {
return Optional.of(((Version) this));
} else {
@@ -56,9 +56,9 @@ public sealed interface KRaftVersionUpgrade {
}
}
- static final KRaftVersionUpgrade EMPTY = new Empty();
+ KRaftVersionUpgrade EMPTY = new Empty();
- public static KRaftVersionUpgrade empty() {
+ static KRaftVersionUpgrade empty() {
return EMPTY;
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
index 891d8e32509..70a75dca4d4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSendResult.java
@@ -17,7 +17,7 @@
package org.apache.kafka.raft.internals;
/**
- * Type to capture the atempt to send a request.
+ * Type to capture the attempt to send a request.
*
* @param requestSent true if the request was sent
* @param timeToWaitMs the amount of time to wait in milliseconds before
attempting to resend the
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
index f115d59798e..1a4ca403db9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
@@ -478,7 +478,7 @@ public class KafkaRaftClientPreVoteTest {
// invalid voter id is rejected
context.deliverRequest(
context.voteRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch,
otherNodeKey,
ReplicaKey.of(10, Uuid.randomUuid()),
@@ -493,7 +493,7 @@ public class KafkaRaftClientPreVoteTest {
// invalid voter directory id is rejected
context.deliverRequest(
context.voteRequest(
- context.clusterId.toString(),
+ context.clusterId,
epoch,
otherNodeKey,
ReplicaKey.of(0, Uuid.randomUuid()),
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bfdbd14c18a..85278779b79 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -841,7 +841,7 @@ public final class KafkaRaftClientSnapshotTest {
int epoch = context.currentEpoch();
FetchSnapshotRequestData voter1FetchSnapshotRequest =
fetchSnapshotRequest(
- context.clusterId.toString(),
+ context.clusterId,
voter1,
context.metadataPartition,
epoch,
@@ -851,7 +851,7 @@ public final class KafkaRaftClientSnapshotTest {
);
FetchSnapshotRequestData voter2FetchSnapshotRequest =
fetchSnapshotRequest(
- context.clusterId.toString(),
+ context.clusterId,
voter2,
context.metadataPartition,
epoch,
@@ -861,7 +861,7 @@ public final class KafkaRaftClientSnapshotTest {
);
FetchSnapshotRequestData observerFetchSnapshotRequest =
fetchSnapshotRequest(
- context.clusterId.toString(),
+ context.clusterId,
observer3,
context.metadataPartition,
epoch,
@@ -1955,7 +1955,7 @@ public final class KafkaRaftClientSnapshotTest {
// valid cluster id is accepted
context.deliverRequest(
fetchSnapshotRequest(
- context.clusterId.toString(),
+ context.clusterId,
otherNode,
context.metadataPartition,
epoch,
@@ -2264,7 +2264,7 @@ public final class KafkaRaftClientSnapshotTest {
private static void assertBootstrapSnapshot(
RaftClientTestContext context,
List<String> expectedRecords
- ) throws Exception {
+ ) {
try (SnapshotReader<String> bootstrapSnapshot =
context.listener.drainHandledBootstrapSnapshot().get()) {
assertEquals(Snapshots.BOOTSTRAP_SNAPSHOT_ID,
bootstrapSnapshot.snapshotId());
SnapshotWriterReaderTest.assertDataSnapshot(List.of(expectedRecords),
bootstrapSnapshot);
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index f44f09581cc..e4d879c64ed 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2037,7 +2037,7 @@ class KafkaRaftClientTest {
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
assertTrue(context.client.quorum().isUnattached());
- context.time.sleep(context.electionTimeoutMs() * 2);
+ context.time.sleep(context.electionTimeoutMs() * 2L);
context.pollUntilRequest();
assertTrue(context.client.quorum().isUnattached());
context.assertSentFetchRequest(epoch, 0L, 0, OptionalLong.empty());
@@ -2049,7 +2049,7 @@ class KafkaRaftClientTest {
// observer can vote
context.assertSentVoteResponse(Errors.NONE, epoch + 1,
OptionalInt.empty(), true);
- context.time.sleep(context.electionTimeoutMs() * 2);
+ context.time.sleep(context.electionTimeoutMs() * 2L);
context.pollUntilRequest();
// observer cannot transition to prospective though
assertTrue(context.client.quorum().isUnattached());
@@ -3467,8 +3467,7 @@ class KafkaRaftClientTest {
long followerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(1, follower, fetchOffset,
epoch, 0));
context.pollUntilResponse();
- long expectedHW = fetchOffset;
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
// Create observer
ReplicaKey observer = replicaKey(localId + 2, withKip853Rpc);
@@ -3477,7 +3476,7 @@ class KafkaRaftClientTest {
long observerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, observer, 0L, 0,
0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.time.sleep(100);
context.deliverRequest(context.describeQuorumRequest());
@@ -3505,14 +3504,14 @@ class KafkaRaftClientTest {
.setLogEndOffset(0L)
.setLastFetchTimestamp(observerFetchTime)
.setLastCaughtUpTimestamp(-1L));
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, expectedObserverStates);
// Update observer fetch state
context.time.sleep(100);
observerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, observer,
fetchOffset, epoch, 0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.time.sleep(100);
context.deliverRequest(context.describeQuorumRequest());
@@ -3525,7 +3524,7 @@ class KafkaRaftClientTest {
.setLogEndOffset(fetchOffset)
.setLastFetchTimestamp(observerFetchTime)
.setLastCaughtUpTimestamp(observerFetchTime);
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, expectedObserverStates);
// Observer falls behind
context.time.sleep(100);
@@ -3541,7 +3540,7 @@ class KafkaRaftClientTest {
.setLogEndOffset(fetchOffset + records.size())
.setLastFetchTimestamp(context.time.milliseconds())
.setLastCaughtUpTimestamp(context.time.milliseconds());
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, expectedObserverStates);
// Observer is removed due to inactivity
long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS;
@@ -3550,7 +3549,7 @@ class KafkaRaftClientTest {
followerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset, epoch, 0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.time.sleep(context.checkQuorumTimeoutMs - 1);
timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1);
@@ -3563,19 +3562,19 @@ class KafkaRaftClientTest {
.setLastCaughtUpTimestamp(context.time.milliseconds());
expectedVoterStates.get(1)
.setLastFetchTimestamp(followerFetchTime);
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, List.of());
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, List.of());
// No-op for negative node id
context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1,
ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.deliverRequest(context.describeQuorumRequest());
context.pollUntilResponse();
expectedVoterStates.get(0)
.setLastFetchTimestamp(context.time.milliseconds())
.setLastCaughtUpTimestamp(context.time.milliseconds());
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, List.of());
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, List.of());
}
@ParameterizedTest
@@ -3613,8 +3612,7 @@ class KafkaRaftClientTest {
long followerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset, epoch, 0));
context.pollUntilResponse();
- long expectedHW = fetchOffset;
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.time.sleep(100);
context.deliverRequest(context.describeQuorumRequest());
@@ -3632,7 +3630,7 @@ class KafkaRaftClientTest {
.setLogEndOffset(fetchOffset)
.setLastFetchTimestamp(followerFetchTime)
.setLastCaughtUpTimestamp(followerFetchTime));
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, List.of());
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, List.of());
// Follower crashes and disk is lost. It fetches an earlier offset to
rebuild state.
// The leader will report an error in the logs, but will not let the
high watermark rewind
@@ -3640,7 +3638,7 @@ class KafkaRaftClientTest {
followerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset - 1, epoch, 0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.assertSentFetchPartitionResponse(fetchOffset, epoch);
context.time.sleep(100);
context.deliverRequest(context.describeQuorumRequest());
context.pollUntilResponse();
@@ -3651,7 +3649,7 @@ class KafkaRaftClientTest {
expectedVoterStates.get(1)
.setLogEndOffset(fetchOffset - batch.size())
.setLastFetchTimestamp(followerFetchTime);
- context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, List.of());
+ context.assertSentDescribeQuorumResponse(localId, epoch, fetchOffset,
expectedVoterStates, List.of());
}
@ParameterizedTest
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index d66fb31399c..5b4ed99fa97 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -732,7 +732,7 @@ public class LeaderStateTest {
.get()
);
- // Upate in-memory voter and check state
+ // Update in-memory voter and check state
assertTrue(
state.compareAndSetVolatileVoters(votersWithLeaderUpdated,
updatedVoters)
);
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 8fc6f0c769b..c3d6ff29186 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -48,7 +48,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.ValueSource;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -1134,7 +1133,7 @@ public class MockLogTest {
@ParameterizedTest
@ValueSource(ints = {1, 2})
- public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws
IOException {
+ public void testReadRespectsMaxSizeInBytes(int expectedBatches) {
int recordsPerBatch = 5;
appendBatch(recordsPerBatch, 1);
appendBatch(recordsPerBatch, 1);
@@ -1211,7 +1210,7 @@ public class MockLogTest {
log.endOffset().offset(),
Compression.NONE,
epoch,
- records.toArray(new SimpleRecord[records.size()])
+ records.toArray(new SimpleRecord[0])
),
epoch
);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index dfc4ec9c309..2b01c153254 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1354,7 +1354,7 @@ public final class RaftClientTestContext {
assertEquals(replicaKey.id(), addRaftVoterRequestData.voterId());
assertEquals(replicaKey.directoryId().get(),
addRaftVoterRequestData.voterDirectoryId());
assertEquals(endpoints,
Endpoints.fromAddVoterRequest(addRaftVoterRequestData.listeners()));
- assertEquals(false, addRaftVoterRequestData.ackWhenCommitted());
+ assertFalse(addRaftVoterRequestData.ackWhenCommitted());
return request;
}
@@ -2337,7 +2337,7 @@ public final class RaftClientTestContext {
}
void readBatch(BatchReader<String> reader) {
- try {
+ try (reader) {
while (reader.hasNext()) {
long nextOffset = lastCommitOffset().isPresent() ?
lastCommitOffset().getAsLong() + 1 : 0L;
@@ -2349,8 +2349,6 @@ public final class RaftClientTestContext {
". We expected an offset at least as large as " +
nextOffset);
commits.add(batch);
}
- } finally {
- reader.close();
}
}
@@ -2425,10 +2423,6 @@ public final class RaftClientTestContext {
// autoJoin support
KIP_1186_PROTOCOL;
- boolean isKRaftSupported() {
- return isAtLeast(KIP_595_PROTOCOL);
- }
-
boolean isReconfigSupported() {
return isAtLeast(KIP_853_PROTOCOL);
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
index 6e0de670b08..bcc29eb189f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -77,7 +77,7 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
@Override
public synchronized void handleCommit(BatchReader<Integer> reader) {
- try {
+ try (reader) {
int initialCommitted = committed;
long lastCommittedOffset = -1;
int lastCommittedEpoch = 0;
@@ -90,7 +90,7 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
batch.records(),
batch.baseOffset()
);
- for (Integer nextCommitted: batch.records()) {
+ for (Integer nextCommitted : batch.records()) {
if (nextCommitted != committed + 1) {
throw new AssertionError(
String.format(
@@ -132,14 +132,12 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
lastOffsetSnapshotted = lastCommittedOffset;
}
}
- } finally {
- reader.close();
}
}
@Override
public synchronized void handleLoadSnapshot(SnapshotReader<Integer>
reader) {
- try {
+ try (reader) {
log.debug("Loading snapshot {}", reader.snapshotId());
// Since the state machine is only one value, expect only one data
record
boolean foundDataRecord = false;
@@ -176,8 +174,6 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
lastOffsetSnapshotted = reader.lastContainedLogOffset();
handleLoadSnapshotCalls += 1;
log.debug("Finished loading snapshot. Set value: {}", committed);
- } finally {
- reader.close();
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
index 307b6aa59a1..bd303933103 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/VoterSetTest.java
@@ -148,7 +148,7 @@ public final class VoterSetTest {
Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1,
2, 3), false);
VoterSet voterSet = VoterSet.fromMap(new HashMap<>(aVoterMap));
- // Cannot override node id not contianed in the voter set
+ // Cannot override node id not contained in the voter set
assertEquals(Optional.empty(),
voterSet.updateVoterIgnoringDirectoryId(voterNode(4, true)));
// Test that it can override voter set with different directory ids
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 1606681600f..e01db38dc50 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -133,7 +133,7 @@ class RecordsBatchReaderTest {
true,
new LogContext()
);
- try {
+ try (reader) {
for (TestBatch<String> batch : expectedBatches) {
assertTrue(reader.hasNext());
assertEquals(batch, TestBatch.from(reader.next()));
@@ -141,8 +141,6 @@ class RecordsBatchReaderTest {
assertFalse(reader.hasNext());
assertThrows(NoSuchElementException.class, reader::next);
- } finally {
- reader.close();
}
Mockito.verify(closeListener).onClose(reader);