This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4d3c65898df KAFKA-19541; Allow configuration of kraft RPC max bytes
(#21028)
4d3c65898df is described below
commit 4d3c65898dff8bf03a1111cb9956c6340e29de81
Author: Jonah Hooper <[email protected]>
AuthorDate: Thu Mar 26 17:00:46 2026 -0400
KAFKA-19541; Allow configuration of kraft RPC max bytes (#21028)
Implements KIP-1219 by adding two new configuration properties:
controller.quorum.fetch.snapshot.max.bytes and
controller.quorum.fetch.max.bytes. The
controller.quorum.fetch.snapshot.max.bytes property controls the maximum
bytes for the FETCH_SNAPSHOT request. The
controller.quorum.fetch.max.bytes property controls the maximum bytes
for the FETCH request.
This change also removes the internal.metadata.max.batch.size.in.bytes
internal configuration property. This internal configuration property is
not needed now that there is a publicly available configuration
property.
Reviewers: Alyssa Huang <[email protected]>, José Armando García
Sancio <[email protected]>
---
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 -
.../org/apache/kafka/raft/KafkaRaftClient.java | 16 +-
.../org/apache/kafka/raft/MetadataLogConfig.java | 10 -
.../java/org/apache/kafka/raft/QuorumConfig.java | 25 +-
.../main/java/org/apache/kafka/raft/RaftLog.java | 12 +-
.../internals/KRaftControlRecordStateMachine.java | 6 +-
.../apache/kafka/raft/internals/KafkaRaftLog.java | 16 +-
.../kafka/raft/KafkaRaftClientFetchTest.java | 291 ++++++++++++++++++++-
.../kafka/raft/KafkaRaftClientReconfigTest.java | 24 +-
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 107 +++++++-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 11 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 19 +-
.../java/org/apache/kafka/raft/MockLogTest.java | 207 ++++++++++++++-
.../org/apache/kafka/raft/QuorumConfigTest.java | 2 +
.../apache/kafka/raft/RaftClientTestContext.java | 24 +-
.../kafka/raft/internals/KafkaRaftLogTest.java | 134 ++++++++--
16 files changed, 828 insertions(+), 77 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bb493c89811..2a035f77333 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -806,7 +806,6 @@ class KafkaConfigTest {
case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG =>
// no op
case
MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op
- case
MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op
case MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG =>
// no op
case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string
case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9df0b82a373..02ae182901f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -171,7 +171,6 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
private static final int MAX_FETCH_WAIT_MS = 500;
// visible for testing
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
- public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
private final OptionalInt nodeId;
private final Uuid nodeDirectoryId;
@@ -434,7 +433,11 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// Re-read the expected offset in case the snapshot had to be
reloaded
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
if (nextExpectedOffset < highWatermark) {
- LogFetchInfo readInfo = log.read(nextExpectedOffset,
Isolation.COMMITTED);
+ LogFetchInfo readInfo = log.read(
+ nextExpectedOffset,
+ Isolation.COMMITTED,
+ Integer.MAX_VALUE
+ );
listenerContext.fireHandleCommit(nextExpectedOffset,
readInfo.records);
}
});
@@ -1517,6 +1520,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
requestMetadata.apiVersion(),
replicaKey,
fetchPartition,
+ request.maxBytes(),
currentTimeMs
);
FetchResponseData.PartitionData partitionResponse =
@@ -1587,6 +1591,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
requestMetadata.apiVersion(),
replicaKey,
fetchPartition,
+ request.maxBytes(),
completionTimeMs
);
}
@@ -1598,6 +1603,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
short apiVersion,
ReplicaKey replicaKey,
FetchRequestData.FetchPartition request,
+ int maxSizeBytes,
long currentTimeMs
) {
try {
@@ -1622,7 +1628,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
final Records records;
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
- LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED);
+ LogFetchInfo info = log.read(fetchOffset,
Isolation.UNCOMMITTED, maxSizeBytes);
if (state.updateReplicaState(replicaKey, currentTimeMs,
info.startOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
@@ -2989,7 +2995,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
return request
- .setMaxBytes(MAX_FETCH_SIZE_BYTES)
+ .setMaxBytes(quorumConfig.fetchMaxBytes())
.setMaxWaitMs(fetchMaxWaitMs)
.setClusterId(clusterId)
.setReplicaState(new
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
@@ -3013,7 +3019,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
log.topicPartition(),
quorum.epoch(),
snapshotId,
- MAX_FETCH_SIZE_BYTES,
+ quorumConfig.fetchSnapshotMaxBytes(),
snapshotSize
);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
index 93821df8db3..331c64e858f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
@@ -81,9 +81,6 @@ public class MetadataLogConfig {
public static final String
INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG =
"internal.metadata.max.batch.size.in.bytes";
public static final String INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC =
"The largest record batch size allowed in the metadata log, only for testing.";
- public static final String
INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG =
"internal.metadata.max.fetch.size.in.bytes";
- public static final String INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC =
"The maximum number of bytes to read when fetching from the metadata log, only
for testing.";
-
public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG =
"internal.metadata.delete.delay.millis";
public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC =
"The amount of time to wait before deleting a file from the filesystem, only
for testing.";
@@ -98,7 +95,6 @@ public class MetadataLogConfig {
.define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT,
METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW,
METADATA_MAX_IDLE_INTERVAL_MS_DOC)
.defineInternal(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, INT,
null, null, LOW, INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC)
.defineInternal(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG,
INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW,
INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC)
- .defineInternal(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG,
INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW,
INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC)
.defineInternal(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG,
LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW,
INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC);
private final int logSegmentBytes;
@@ -107,7 +103,6 @@ public class MetadataLogConfig {
private final long retentionMaxBytes;
private final long retentionMillis;
private final int internalMaxBatchSizeInBytes;
- private final int internalMaxFetchSizeInBytes;
private final long internalDeleteDelayMillis;
public MetadataLogConfig(AbstractConfig config) {
@@ -117,7 +112,6 @@ public class MetadataLogConfig {
this.retentionMaxBytes =
config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG);
this.retentionMillis =
config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG);
this.internalMaxBatchSizeInBytes =
config.getInt(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG);
- this.internalMaxFetchSizeInBytes =
config.getInt(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG);
this.internalDeleteDelayMillis =
config.getLong(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG);
}
@@ -145,10 +139,6 @@ public class MetadataLogConfig {
return internalMaxBatchSizeInBytes;
}
- public int internalMaxFetchSizeInBytes() {
- return internalMaxFetchSizeInBytes;
- }
-
public long internalDeleteDelayMillis() {
return internalDeleteDelayMillis;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 3712c1cc92d..3ecc1b2c190 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -108,6 +108,15 @@ public class QuorumConfig {
"join the cluster metadata partition for its cluster id.";
public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
+ public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG =
QUORUM_PREFIX + "fetch.snapshot.max.bytes";
+ public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_DOC = "Maximum
amount of data to retrieve for each FetchSnapshot request to the controller.";
+ public static final int DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES = 1048576;
+
+ public static final String QUORUM_FETCH_MAX_BYTES_CONFIG = QUORUM_PREFIX +
"fetch.max.bytes";
+ public static final String QUORUM_FETCH_MAX_BYTES_DOC = "Maximum amount of
data to retrieve for each Fetch request. " +
+ "Always returns at least one batch even if it is greater than
controller.quorum.fetch.max.bytes.";
+ public static final int DEFAULT_QUORUM_FETCH_MAX_BYTES = 1048576;
+
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new
ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
.define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST,
DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new
ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
@@ -117,7 +126,9 @@ public class QuorumConfig {
.define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS,
atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
.define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT,
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM,
QUORUM_REQUEST_TIMEOUT_MS_DOC)
.define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT,
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
- .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN,
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC);
+ .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN,
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC)
+ .define(QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG, INT,
DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES, atLeast(1), LOW,
QUORUM_FETCH_SNAPSHOT_MAX_BYTES_DOC)
+ .define(QUORUM_FETCH_MAX_BYTES_CONFIG, INT,
DEFAULT_QUORUM_FETCH_MAX_BYTES, atLeast(1), LOW, QUORUM_FETCH_MAX_BYTES_DOC);
private final List<String> voters;
private final List<String> bootstrapServers;
@@ -128,6 +139,8 @@ public class QuorumConfig {
private final int fetchTimeoutMs;
private final int appendLingerMs;
private final boolean autoJoin;
+ private final int fetchSnapshotMaxBytes;
+ private final int fetchMaxBytes;
public QuorumConfig(AbstractConfig abstractConfig) {
this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
@@ -139,6 +152,8 @@ public class QuorumConfig {
this.fetchTimeoutMs =
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
this.autoJoin =
abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG);
+ this.fetchSnapshotMaxBytes =
abstractConfig.getInt(QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG);
+ this.fetchMaxBytes =
abstractConfig.getInt(QUORUM_FETCH_MAX_BYTES_CONFIG);
}
public List<String> voters() {
@@ -177,6 +192,14 @@ public class QuorumConfig {
return autoJoin;
}
+ public int fetchSnapshotMaxBytes() {
+ return fetchSnapshotMaxBytes;
+ }
+
+ public int fetchMaxBytes() {
+ return fetchMaxBytes;
+ }
+
private static Integer parseVoterId(String idString) {
try {
return Integer.parseInt(idString);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
b/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
index 5bb60e7a175..8a287d31efb 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
@@ -57,9 +57,15 @@ public interface RaftLog extends AutoCloseable {
LogAppendInfo appendAsFollower(Records records, int epoch);
/**
- * Read a set of records within a range of offsets.
+ * Read a set of records from startOffsetInclusive. Always returns at
least one records batch if one exists.
+ *
+ * @param startOffsetInclusive Records later and including this offset are
returned.
+ * @param isolation The fetch isolation, which controls the maximum offset
we are allowed to read.
+ * @param maxTotalBatchBytes The maximum number of bytes to read if there
are more than one record batch.
+ * @return Records and start offset information wrapped in a LogFetchInfo
*/
- LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+ LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxTotalBatchBytes);
+
/**
* Return the latest epoch. For an empty log, the latest epoch is defined
@@ -175,7 +181,7 @@ public interface RaftLog extends AutoCloseable {
/**
* Update the high watermark and associated metadata (which is used to
avoid
- * index lookups when handling reads with {@link #read(long, Isolation)}
with
+ * index lookups when handling reads with {@link #read(long, Isolation,
int)} with
* the {@link Isolation#COMMITTED} isolation level).
*
* @param offsetMetadata The offset and optional metadata
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 6a33fdb2c2e..724d52ce4fd 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -233,7 +233,11 @@ public final class KRaftControlRecordStateMachine {
private void maybeLoadLog() {
while (log.endOffset().offset() > nextOffset) {
- LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
+ LogFetchInfo info = log.read(
+ nextOffset,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ );
try (RecordsIterator<?> iterator = new RecordsIterator<>(
info.records,
serde,
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
index b168dc66c46..95bc12b9643 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
@@ -117,14 +117,19 @@ public class KafkaRaftLog implements RaftLog {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+ public LogFetchInfo read(long startOffset, Isolation readIsolation, int
maxTotalBatchBytes) {
FetchIsolation isolation = switch (readIsolation) {
case COMMITTED -> FetchIsolation.HIGH_WATERMARK;
case UNCOMMITTED -> FetchIsolation.LOG_END;
};
try {
- FetchDataInfo fetchInfo = log.read(startOffset,
config.internalMaxFetchSizeInBytes(), isolation, true);
+ FetchDataInfo fetchInfo = log.read(
+ startOffset,
+ maxTotalBatchBytes,
+ isolation,
+ true
+ );
return new LogFetchInfo(
fetchInfo.records,
new LogOffsetMetadata(
@@ -358,7 +363,12 @@ public class KafkaRaftLog implements RaftLog {
fetches from this offset, the returned batch will start at offset (X
- M), and the
follower will be unable to append it since (X - M) < (X).
*/
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(
+ snapshotId.offset(),
+ Isolation.COMMITTED,
+ 1 // maxTotalBatchBytes - ensures that we only fetch one batch.
+ ).startOffsetMetadata.offset();
+
if (snapshotId.offset() != baseOffset) {
throw new IllegalArgumentException(
"Cannot create snapshot at offset (" + snapshotId.offset()
+ ") because it is not batch aligned. " +
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index 775a1fb1bc5..74e4afd362a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -17,12 +17,15 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.ArbitraryMemoryRecords;
import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.server.common.KRaftVersion;
import net.jqwik.api.AfterFailureMode;
@@ -42,6 +45,8 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public final class KafkaRaftClientFetchTest {
@Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
@@ -91,6 +96,289 @@ public final class KafkaRaftClientFetchTest {
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+ int expectedFetchMaxBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxBytes(expectedFetchMaxBytes)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ FetchRequestData data = (FetchRequestData) fetchRequest.data();
+ assertEquals(expectedFetchMaxBytes, data.maxBytes());
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // Here we are effectively saying that there is no limit to the amount
of records to return.
+ var remoteMaxSizeBytes = Integer.MAX_VALUE;
+ var localMaxSizeBytes = 1;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ var iterator = records.batchIterator();
+ int offsetCount = 0;
+ int batchCount = 0;
+ while (iterator.hasNext()) {
+ var batch = iterator.next();
+ var recordsIterator = batch.iterator();
+ while (recordsIterator.hasNext()) {
+ var record = recordsIterator.next();
+ assertEquals(offsetCount, record.offset());
+ offsetCount++;
+ }
+ batchCount++;
+ }
+ assertEquals(2, batchCount);
+ assertEquals(6, offsetCount);
+ }
+
+ @Test
+ public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws
Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ // There are two batches with 3 records each. The first batch is
always at least larger than 1 byte.
+ // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1
batch.
+ // If localMaxSizeBytes is used then MockLog will return two batches
+ var remoteMaxSizeBytes = 1;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ assertTrue(
+ records.sizeInBytes() > 1,
+ String.format(
+ "Expected records.sizeInBytes() (%d) > 1 since we always
return at least one batch",
+ records.sizeInBytes()
+ )
+ );
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ assertEquals(0, firstBatch.baseOffset());
+ assertEquals(3, firstBatch.nextOffset());
+ assertFalse(
+ iterator.hasNext(),
+ String.format("Expected only a single batch to be fetched for
maxSize = %d", remoteMaxSizeBytes)
+ );
+ }
+
+ @Test
+ public void testFetchMaxBytesWithTwoBatches() throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var batchSizeBytes = 115;
+ var remoteMaxSizeBytes = batchSizeBytes * 2;
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .appendToLog(epoch, List.of("c", "c", "c"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(), partitionData.errorCode());
+ MemoryRecords records = (MemoryRecords)
FetchResponse.recordsOrFail(partitionData);
+ // There is less data than remoteMaxSizeBytes so we expect the size of
records.sizeInBytes to be less than
+ // remoteMaxSizeBytes.
+ assertTrue(
+ records.sizeInBytes() < remoteMaxSizeBytes,
+ String.format(
+ "Expected records size (%d) < remoteMaxBatchSizeBytes (%d)",
+ records.sizeInBytes(),
+ remoteMaxSizeBytes
+ )
+ );
+ var iterator = records.batchIterator();
+ var firstBatch = iterator.next();
+ // First batch should be less than the batchSizeBytes.
+ assertTrue(
+ firstBatch.sizeInBytes() < batchSizeBytes,
+ String.format(
+ "Expected secondBatch.sizeInBytes() (%d) < batchSizeBytes
(%d)",
+ firstBatch.sizeInBytes(),
+ remoteMaxSizeBytes
+ )
+ );
+ assertTrue(iterator.hasNext(), "Expected more than one batch to be
fetched");
+ var secondBatch = iterator.next();
+ assertTrue(
+ secondBatch.sizeInBytes() < batchSizeBytes,
+ String.format(
+ "Expected secondBatch.sizeInBytes() (%d) < batchSizeBytes
(%d)",
+ secondBatch.sizeInBytes(),
+ remoteMaxSizeBytes
+ )
+ );
+ assertFalse(iterator.hasNext(), "Expected two batches to be fetched");
+ }
+
+ @Test
+ public void testFetchMaxBytesBatchesInvariants() throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var localMaxSizeBytes = 1024;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "a", "a"))
+ .appendToLog(epoch, List.of("b", "b", "b"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+ KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // Send initial request to get all available data.
+ FetchRequestData allRecordsRequest =
+ context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+ allRecordsRequest.setMaxBytes(Integer.MAX_VALUE);
+ context.deliverRequest(allRecordsRequest);
+ context.pollUntilResponse();
+ MemoryRecords allRecords =
+ (MemoryRecords)
FetchResponse.recordsOrFail(context.assertSentFetchPartitionResponse());
+
+ // Send another request to retrieve all data by setting exactSizeBytes
+ FetchRequestData exactSizeBytesRequest =
+ context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+ exactSizeBytesRequest.setMaxBytes(allRecords.sizeInBytes());
+ context.deliverRequest(exactSizeBytesRequest);
+ context.pollUntilResponse();
+
+ // All the records should be returned
+ FetchResponseData.PartitionData exactSizeBytesResponseData =
context.assertSentFetchPartitionResponse();
+ assertEquals(Errors.NONE.code(),
exactSizeBytesResponseData.errorCode());
+ MemoryRecords exactSizeBytesRecords = (MemoryRecords)
FetchResponse.recordsOrFail(exactSizeBytesResponseData);
+
+ // We expect to return the same number of total bytes in both requests.
+ assertEquals(allRecords.sizeInBytes(),
exactSizeBytesRecords.sizeInBytes());
+
+ var allIterator = allRecords.records().iterator();
+ var exactIterator = exactSizeBytesRecords.records().iterator();
+ while (exactIterator.hasNext() && allIterator.hasNext()) {
+ var r1 = exactIterator.next();
+ var r2 = allIterator.next();
+ assertEquals(r1, r2);
+ }
+ assertFalse(exactIterator.hasNext());
+ assertFalse(allIterator.hasNext());
+
+ // Send fetch request with sizeInBytes-1. It will appear here that we
have only 1 batch
+ // since the other batch is not "complete" (it's missing one byte) and
hence not "iterable".
+ FetchRequestData oneBatchRequest =
+ context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+ oneBatchRequest.setMaxBytes(exactSizeBytesRecords.sizeInBytes() - 1);
+ context.deliverRequest(oneBatchRequest);
+ context.pollUntilResponse();
+ MemoryRecords oneBatchRecords =
+ (MemoryRecords)
FetchResponse.recordsOrFail(context.assertSentFetchPartitionResponse());
+ assertTrue(oneBatchRecords.sizeInBytes() <
exactSizeBytesRecords.sizeInBytes());
+ var oneBatchBatches = oneBatchRecords.batchIterator();
+ var firstBatch = oneBatchBatches.next();
+ assertTrue(firstBatch.sizeInBytes() < oneBatchRecords.sizeInBytes());
+ assertEquals(exactSizeBytesRecords.sizeInBytes() - 1,
oneBatchRecords.sizeInBytes());
+ assertFalse(oneBatchBatches.hasNext(), "Expected 1 batch to be
fetched");
+ }
+
@Test
void testReplicationOfHigherPartitionLeaderEpoch() throws Exception {
int epoch = 2;
@@ -103,7 +391,8 @@ public final class KafkaRaftClientFetchTest {
local.directoryId().get()
)
.withStartingVoters(
- VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
+ KRaftVersion.KRAFT_VERSION_1
)
.withElectedLeader(epoch, electedLeader.id())
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index e97eb2dcc26..a63c1f40243 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -132,7 +132,11 @@ public class KafkaRaftClientReconfigTest {
context.unattachedToLeader();
// check if leader writes 3 bootstrap records to the log
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
@@ -195,7 +199,11 @@ public class KafkaRaftClientReconfigTest {
// check leader does not write bootstrap records to log
context.unattachedToLeader();
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
Iterator<Record> recordIterator = batch.iterator();
@@ -2451,7 +2459,11 @@ public class KafkaRaftClientReconfigTest {
// check if leader writes 2 control records to the log;
// one for the kraft version and one for the voter set
var updatedVoters = VoterSetTest.voterSet(Stream.of(local, voter1,
voter2));
- var records = context.log.read(localLogEndOffset,
Isolation.UNCOMMITTED).records;
+ var records = context.log.read(
+ localLogEndOffset,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
var batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
var recordsIterator = batch.iterator();
@@ -2559,7 +2571,11 @@ public class KafkaRaftClientReconfigTest {
context.client.poll();
// check that the leader wrote voters control record to the log;
- var records = context.log.read(localLogEndOffset,
Isolation.UNCOMMITTED).records;
+ var records = context.log.read(
+ localLogEndOffset,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
var batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
var recordsIterator = batch.iterator();
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index b159c08d2f2..bfdbd14c18a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1026,6 +1026,87 @@ public final class KafkaRaftClientSnapshotTest {
assertEquals(leaderId, response.currentLeader().leaderId());
}
+ @Test
+ public void testFetchSnapshotRequestWithPartialData() throws Exception {
+ int localId = randomReplicaId();
+ Set<Integer> voters = Set.of(localId, localId + 1);
+ OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+ List<String> records = List.of("foo", "bar");
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .appendToLog(snapshotId.epoch(), List.of("a"))
+ .withKip853Rpc(true)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+ try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(snapshotId, 0).get()) {
+ assertEquals(snapshotId, snapshot.snapshotId());
+ snapshot.append(records);
+ snapshot.freeze();
+ }
+
+ // Test that we will respond with at least 2 equally sized read of the
snapshot.
+ RawSnapshotReader snapshot =
context.log.readSnapshot(snapshotId).get();
+ int snapshotSizeBytes = Math.toIntExact(snapshot.sizeInBytes());
+ int expectedNumberOfReads = 2;
+ // Find expectedNumberOfReads where we always have a remainder.
+ // This ensures that we will have expectedNumberOfRead which return
fetchSnapshotMaxBytes, followed by one
+ // request with expectedFinalRequestSize that returns the remaining
data.
+ while (snapshotSizeBytes % expectedNumberOfReads == 0)
expectedNumberOfReads++;
+ int fetchSnapshotMaxBytes = snapshotSizeBytes / expectedNumberOfReads;
+ int expectedFinalRequestSize = snapshotSizeBytes %
expectedNumberOfReads;
+ int totalBytesRead = 0;
+ int position = 0;
+ for (int i = 0; i < expectedNumberOfReads; i++) {
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ fetchSnapshotMaxBytes,
+ position
+ )
+ );
+ context.client.poll();
+
+ FetchSnapshotResponseData.PartitionSnapshot response =
+
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+ int actualSizeBytes = response.unalignedRecords().sizeInBytes();
+ assertEquals(fetchSnapshotMaxBytes, actualSizeBytes);
+
+ totalBytesRead += actualSizeBytes;
+ position += fetchSnapshotMaxBytes;
+ }
+ assertEquals(fetchSnapshotMaxBytes * expectedNumberOfReads,
totalBytesRead);
+
+ // Fetch the remaining snapshot bytes.
+ assertTrue(
+ totalBytesRead < snapshotSizeBytes,
+ String.format("Expected totalBytesRead (%d) < snapshotSizeBytes
(%d)", totalBytesRead, snapshotSizeBytes)
+ );
+ context.deliverRequest(
+ fetchSnapshotRequest(
+ context.metadataPartition,
+ epoch,
+ snapshotId,
+ fetchSnapshotMaxBytes,
+ position
+ )
+ );
+ context.client.poll();
+ FetchSnapshotResponseData.PartitionSnapshot response =
+
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+ assertEquals(epoch, response.currentLeader().leaderEpoch());
+ assertEquals(localId, response.currentLeader().leaderId());
+ assertEquals(expectedFinalRequestSize,
response.unalignedRecords().sizeInBytes());
+ }
+
@ParameterizedTest
@ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestWithInvalidPosition(boolean
withKip853Rpc) throws Exception {
@@ -1223,10 +1304,12 @@ public final class KafkaRaftClientSnapshotTest {
Set<Integer> voters = Set.of(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+ int expectedFetchMaxSnapshotBytes = 1024;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, leaderId)
.withKip853Rpc(withKip853Rpc)
+ .withFetchSnapshotMaxBytes(expectedFetchMaxSnapshotBytes)
.build();
context.pollUntilRequest();
@@ -1245,7 +1328,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ expectedFetchMaxSnapshotBytes
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1302,10 +1385,12 @@ public final class KafkaRaftClientSnapshotTest {
Set<Integer> voters = Set.of(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+ int expectedFetchMaxSnapshotBytes = 6;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, leaderId)
.withKip853Rpc(withKip853Rpc)
+ .withFetchSnapshotMaxBytes(expectedFetchMaxSnapshotBytes)
.build();
context.pollUntilRequest();
@@ -1324,7 +1409,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ expectedFetchMaxSnapshotBytes
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1360,7 +1445,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ expectedFetchMaxSnapshotBytes
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1435,7 +1520,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1497,7 +1582,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1558,7 +1643,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1619,7 +1704,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1653,7 +1738,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1690,7 +1775,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1739,7 +1824,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1807,7 +1892,7 @@ public final class KafkaRaftClientSnapshotTest {
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
).get();
assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index c4a6840ff84..f44f09581cc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -958,7 +958,10 @@ class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
@@ -1007,7 +1010,11 @@ class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId,
secondNodeId));
- Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = context.log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
RecordBatch batch = records.batches().iterator().next();
assertTrue(batch.isControlBatch());
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 228f92a9bd5..abadb1c8a30 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -419,7 +419,7 @@ public class MockLog implements RaftLog {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation isolation) {
+ public LogFetchInfo read(long startOffset, Isolation isolation, int
maxTotalBatchBytes) {
verifyOffsetInRange(startOffset);
long maxOffset = isolation == Isolation.COMMITTED ?
highWatermark.offset() : endOffset().offset();
@@ -444,7 +444,13 @@ public class MockLog implements RaftLog {
// complete batches, so batches which end at an offset larger than
the max offset are
// filtered, which is effectively the same as having the consumer
drop an incomplete
// batch returned in a fetch response.
- if (batch.lastOffset() >= startOffset && batch.lastOffset() <
maxOffset && !batch.entries.isEmpty()) {
+ // Since we operate in batches, it is possible for byte size of
returned batches to exceed
+ // maxTotalBatchBytes. To keep to that invariant, we exit the loop
as soon as the buffer contains
+ // more bytes than maxTotalBatchBytes.
+ if (batch.lastOffset() >= startOffset
+ && batch.lastOffset() < maxOffset
+ && !batch.entries.isEmpty()
+ && buffer.position() < maxTotalBatchBytes) {
buffer = batch.writeTo(buffer);
if (batchStartOffset == null) {
@@ -462,6 +468,9 @@ public class MockLog implements RaftLog {
buffer.flip();
Records records = MemoryRecords.readableRecords(buffer);
+ if (batchCount > 1) {
+ records = records.slice(0, maxTotalBatchBytes);
+ }
if (batchStartOffset == null) {
throw new RuntimeException("Expected to find at least one entry
starting from offset " +
@@ -513,7 +522,11 @@ public class MockLog implements RaftLog {
);
}
- long baseOffset = read(snapshotId.offset(),
Isolation.COMMITTED).startOffsetMetadata.offset();
+ long baseOffset = read(
+ snapshotId.offset(),
+ Isolation.COMMITTED,
+ 1 // Only needs to read the first batch.
+ ).startOffsetMetadata.offset();
if (snapshotId.offset() != baseOffset) {
throw new IllegalArgumentException(
String.format(
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 8a5c4bf6225..8fc6f0c769b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -46,8 +46,11 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -214,7 +217,11 @@ public class MockLogTest {
assertEquals(1, log.endOffset().offset());
assertEquals(currentEpoch, log.lastFetchedEpoch());
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
for (RecordBatch batch : records.batches()) {
assertTrue(batch.isControlBatch());
}
@@ -249,7 +256,11 @@ public class MockLogTest {
assertEquals(initialOffset + 1, log.endOffset().offset());
assertEquals(3, log.lastFetchedEpoch());
- Records records = log.read(5L, Isolation.UNCOMMITTED).records;
+ Records records = log.read(
+ 5L,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
extractRecords.add(record.value());
@@ -275,7 +286,11 @@ public class MockLogTest {
appendAsLeader(List.of(recordOne, recordTwo), epoch);
- Records records = log.read(0, Isolation.UNCOMMITTED).records;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
List<ByteBuffer> extractRecords = new ArrayList<>();
for (Record record : records.records()) {
@@ -297,11 +312,23 @@ public class MockLogTest {
assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(30L,
Isolation.UNCOMMITTED));
assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(33L,
Isolation.UNCOMMITTED));
assertEquals(Optional.empty(), readOffsets(60L,
Isolation.UNCOMMITTED));
- assertThrows(OffsetOutOfRangeException.class, () -> log.read(61L,
Isolation.UNCOMMITTED));
+ assertThrows(OffsetOutOfRangeException.class,
+ () -> log.read(
+ 61L,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ )
+ );
// Verify range after truncation
log.truncateTo(20L);
- assertThrows(OffsetOutOfRangeException.class, () -> log.read(21L,
Isolation.UNCOMMITTED));
+ assertThrows(OffsetOutOfRangeException.class,
+ () -> log.read(
+ 21L,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ )
+ );
}
@Test
@@ -336,7 +363,13 @@ public class MockLogTest {
assertEquals(Optional.of(new OffsetRange(30, 59L)), readOffsets(30L,
Isolation.COMMITTED));
assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(50L,
Isolation.COMMITTED));
assertEquals(Optional.empty(), readOffsets(60L, Isolation.COMMITTED));
- assertThrows(OffsetOutOfRangeException.class, () -> log.read(61L,
Isolation.COMMITTED));
+ assertThrows(OffsetOutOfRangeException.class,
+ () -> log.read(
+ 61L,
+ Isolation.COMMITTED,
+ Integer.MAX_VALUE
+ )
+ );
}
@Test
@@ -345,7 +378,11 @@ public class MockLogTest {
appendBatch(5, 1);
appendBatch(5, 1);
- LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
+ LogFetchInfo readInfo = log.read(
+ 5,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ );
assertEquals(5L, readInfo.startOffsetMetadata.offset());
assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
@@ -359,7 +396,11 @@ public class MockLogTest {
Optional.of(new MockLog.MockOffsetMetadata(98230980L)))));
// Ensure we can update the high watermark to the end offset
- LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
+ LogFetchInfo readFromEndInfo = log.read(
+ 15L,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ );
assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
@@ -369,7 +410,11 @@ public class MockLogTest {
log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
// Check handling of a fetch from the middle of a batch
- LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
+ LogFetchInfo readFromMiddleInfo = log.read(
+ 16L,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ );
assertEquals(readFromEndInfo.startOffsetMetadata,
readFromMiddleInfo.startOffsetMetadata);
}
@@ -484,11 +529,19 @@ public class MockLogTest {
assertThrows(
OffsetOutOfRangeException.class,
- () -> log.read(log.startOffset() - 1, Isolation.UNCOMMITTED)
+ () -> log.read(
+ log.startOffset() - 1,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ )
);
assertThrows(
OffsetOutOfRangeException.class,
- () -> log.read(log.endOffset().offset() + 1, Isolation.UNCOMMITTED)
+ () -> log.read(
+ log.endOffset().offset() + 1,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ )
);
}
@@ -991,6 +1044,126 @@ public class MockLogTest {
assertEquals(ValidOffsetAndEpoch.Kind.VALID,
resultOffsetAndEpoch.kind());
}
+ @Test
+ public void testLogLimitsReturnsLessThanMaxBytes() {
+ int numberOfRecordsPerBatch = 10;
+ appendBatch(numberOfRecordsPerBatch, 5);
+ appendBatch(numberOfRecordsPerBatch, 5);
+ appendBatch(numberOfRecordsPerBatch, 5);
+ // Set to be larger than 1 batch but smaller than 2.
+ int magicMaxTotalBytes = 200;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
+ // MockLog#read returns data in batches and will return an additional
batch if one of them
+ // exceeds maxTotalBytes.
+ assertEquals(magicMaxTotalBytes, records.sizeInBytes());
+ }
+
+ @Test
+ public void testLogLimitsReturnsAtLeastOne() {
+ int numberOfRecordsPerBatch = 10;
+ appendBatch(numberOfRecordsPerBatch, 5);
+ appendBatch(numberOfRecordsPerBatch, 5);
+ // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+ // Meaning we will read only the first batch and not the second.
+ int magicMaxTotalBytes = 1;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
+ assertTrue(
+ records.sizeInBytes() > magicMaxTotalBytes,
+ String.format(
+ "Expected records size (%d) > maxTotalBytes (%d) since one
whole batch must be returned",
+ records.sizeInBytes(),
+ magicMaxTotalBytes
+ )
+ );
+ int recordCount = 0;
+ var iterator = records.records().iterator();
+ while (iterator.hasNext()) {
+ recordCount++;
+ iterator.next();
+ }
+ assertEquals(numberOfRecordsPerBatch, recordCount);
+ }
+
+ @Test
+ public void testMockLogReadExtremelyLargeMultiBatch() {
+ int recordsPerBatch = 1000;
+ appendBatch(recordsPerBatch, 5);
+ appendBatch(recordsPerBatch, 5);
+ // The MockLog is able to read a large internal batch
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
+ int recordCount = 0;
+ var iterator = records.records().iterator();
+ while (iterator.hasNext()) {
+ recordCount++;
+ iterator.next();
+ }
+ assertEquals(recordsPerBatch * 2, recordCount);
+ }
+
+ @Test
+ public void testMockLogReadExtremelyLargeSingleBatch() {
+ int recordsPerBatch = 1000;
+ appendBatch(recordsPerBatch, 5);
+ appendBatch(recordsPerBatch, 5);
+ // The MockLog is able to read a large internal batch
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ 1
+ ).records;
+ int recordCount = 0;
+ var iterator = records.records().iterator();
+ while (iterator.hasNext()) {
+ recordCount++;
+ iterator.next();
+ }
+ assertEquals(recordsPerBatch, recordCount);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2})
+ public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws
IOException {
+ int recordsPerBatch = 5;
+ appendBatch(recordsPerBatch, 1);
+ appendBatch(recordsPerBatch, 1);
+ appendBatch(recordsPerBatch, 1);
+ appendBatch(recordsPerBatch, 1);
+
+ int magicMaxBatchSizeBytes = 101;
+ LogFetchInfo info = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxBatchSizeBytes * expectedBatches
+ );
+
+ assertEquals(expectedBatches * magicMaxBatchSizeBytes,
info.records.sizeInBytes());
+ var batchIterator = info.records.batchIterator();
+ while (batchIterator.hasNext()) {
+ assertEquals(magicMaxBatchSizeBytes,
batchIterator.next().sizeInBytes());
+ }
+ // Asserts that we have exactly B * R records. Further there must be B
batches of SimpleRecords each with a value of
+ // [0..R-1] converted to an utf-8 string with empty keys and headers.
+ int count = 0;
+ for (Record record : info.records.records()) {
+ byte[] expectedValue = String.valueOf(count %
recordsPerBatch).getBytes(StandardCharsets.UTF_8);
+ assertEquals(ByteBuffer.wrap(expectedValue), record.value());
+ count += 1;
+ }
+ assertEquals(recordsPerBatch * expectedBatches, count);
+ }
+
private Optional<OffsetRange> readOffsets(long startOffset, Isolation
isolation) {
// The current MockLog implementation reads at most one batch
@@ -1002,7 +1175,11 @@ public class MockLogTest {
while (foundRecord) {
foundRecord = false;
- Records records = log.read(currentStart, isolation).records;
+ Records records = log.read(
+ currentStart,
+ isolation,
+ Integer.MAX_VALUE
+ ).records;
for (Record record : records.records()) {
foundRecord = true;
@@ -1055,7 +1232,11 @@ public class MockLogTest {
int currentOffset = 0;
while (currentOffset < log.endOffset().offset()) {
- Records records = log.read(currentOffset,
Isolation.UNCOMMITTED).records;
+ Records records = log.read(
+ currentOffset,
+ Isolation.UNCOMMITTED,
+ Integer.MAX_VALUE
+ ).records;
List<? extends RecordBatch> batches =
Utils.toList(records.batches().iterator());
assertFalse(batches.isEmpty());
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
index ce7175a8b5e..c20612d0892 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
@@ -35,6 +35,8 @@ public class QuorumConfigTest {
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG,
"-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1"));
+
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG,
"-1"));
+ assertInvalidConfig(Map.of(QuorumConfig.QUORUM_FETCH_MAX_BYTES_CONFIG,
"-1"));
}
private void assertInvalidConfig(Map<String, Object> overrideConfig) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index cd839d37b42..dfc4ec9c309 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -121,6 +121,7 @@ public final class RaftClientTestContext {
final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs *
CHECK_QUORUM_TIMEOUT_FACTOR);
final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
+ final int fetchMaxBytes;
private int electionTimeoutMs;
private int requestTimeoutMs;
@@ -185,6 +186,8 @@ public final class RaftClientTestContext {
private Endpoints localListeners = Endpoints.empty();
private boolean isStartingVotersStatic = false;
private boolean autoJoin = false;
+ private int fetchSnapshotMaxBytes =
QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES;
+ private int fetchMaxBytes =
QuorumConfig.DEFAULT_QUORUM_FETCH_MAX_BYTES;
public Builder(int localId, Set<Integer> staticVoters) {
this(OptionalInt.of(localId), staticVoters);
@@ -392,6 +395,16 @@ public final class RaftClientTestContext {
return this;
}
+ Builder withFetchSnapshotMaxBytes(int fetchSnapshotMaxSizeBytes) {
+ this.fetchSnapshotMaxBytes = fetchSnapshotMaxSizeBytes;
+ return this;
+ }
+
+ Builder withFetchMaxBytes(int fetchMaxBytes) {
+ this.fetchMaxBytes = fetchMaxBytes;
+ return this;
+ }
+
public RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
@@ -428,6 +441,8 @@ public final class RaftClientTestContext {
configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG,
FETCH_TIMEOUT_MS);
configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG,
appendLingerMs);
configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG,
autoJoin);
+ configMap.put(QuorumConfig.QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG,
fetchSnapshotMaxBytes);
+ configMap.put(QuorumConfig.QUORUM_FETCH_MAX_BYTES_CONFIG,
fetchMaxBytes);
QuorumConfig quorumConfig = new QuorumConfig(new
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
List<InetSocketAddress> computedBootstrapServers =
bootstrapServers.orElseGet(() -> {
@@ -494,7 +509,8 @@ public final class RaftClientTestContext {
canBecomeVoter,
metrics,
externalKRaftMetrics,
- listener
+ listener,
+ fetchMaxBytes
);
context.electionTimeoutMs = electionTimeoutMs;
@@ -523,7 +539,8 @@ public final class RaftClientTestContext {
boolean canBecomeVoter,
Metrics metrics,
ExternalKRaftMetrics externalKRaftMetrics,
- MockListener listener
+ MockListener listener,
+ int fetchMaxBytes
) {
this.clusterId = clusterId;
this.localId = localId;
@@ -542,6 +559,7 @@ public final class RaftClientTestContext {
this.metrics = metrics;
this.externalKRaftMetrics = externalKRaftMetrics;
this.listener = listener;
+ this.fetchMaxBytes = fetchMaxBytes;
}
int electionTimeoutMs() {
@@ -1798,7 +1816,6 @@ public final class RaftClientTestContext {
message.data(),
"unexpected request type " + message.data());
FetchRequestData request = (FetchRequestData) message.data();
- assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
assertEquals(fetchMaxWaitMs, request.maxWaitMs());
assertEquals(1, request.topics().size());
@@ -1893,6 +1910,7 @@ public final class RaftClientTestContext {
return request
.setMaxWaitMs(maxWaitTimeMs)
.setClusterId(clusterId)
+ .setMaxBytes(fetchMaxBytes)
.setReplicaState(
new
FetchRequestData.ReplicaState().setReplicaId(replicaKey.id())
);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
index 98456324cc1..ba3f39cdeb0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
@@ -28,11 +28,15 @@ import org.apache.kafka.common.protocol.Writable;
import org.apache.kafka.common.record.internal.ArbitraryMemoryRecords;
import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.KRaftConfigs;
import org.apache.kafka.raft.KafkaRaftClient;
import org.apache.kafka.raft.LogAppendInfo;
+import org.apache.kafka.raft.LogFetchInfo;
import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
@@ -61,6 +65,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
@@ -88,8 +93,7 @@ public class KafkaRaftLogTest {
10 * 1000,
100 * 1024,
60 * 1000,
- KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ KafkaRaftClient.MAX_BATCH_SIZE_BYTES
);
private final MockTime mockTime = new MockTime();
@@ -718,8 +722,7 @@ public class KafkaRaftLogTest {
DEFAULT_METADATA_LOG_CONFIG.logSegmentMillis(),
DEFAULT_METADATA_LOG_CONFIG.retentionMaxBytes(),
DEFAULT_METADATA_LOG_CONFIG.retentionMillis(),
- maxBatchSizeInBytes,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ maxBatchSizeInBytes
);
KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
@@ -928,8 +931,7 @@ public class KafkaRaftLogTest {
10 * 1000,
256,
60 * 1000,
- 512,
- DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+ 512
);
KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
@@ -963,8 +965,7 @@ public class KafkaRaftLogTest {
10 * 1000,
1024,
60 * 1000,
- 100,
- DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+ 100
);
KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
@@ -995,8 +996,7 @@ public class KafkaRaftLogTest {
10 * 1000,
10240,
60 * 1000,
- 100,
- DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+ 100
);
KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
@@ -1043,8 +1043,7 @@ public class KafkaRaftLogTest {
10 * 1000,
10240,
60 * 1000,
- 200,
- DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+ 200
);
KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
@@ -1083,13 +1082,117 @@ public class KafkaRaftLogTest {
);
}
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3})
+ public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws
IOException {
+ // 5 records are written in batches of 101 bytes each (at time of
writing).
+ int magicMaxBatchSizeBytes = 101;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ magicMaxBatchSizeBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ int recordsPerBatch = 5;
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+
+ LogFetchInfo info = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxBatchSizeBytes * expectedBatches
+ );
+ assertEquals(expectedBatches * magicMaxBatchSizeBytes,
info.records.sizeInBytes());
+ // Asserts that we have exactly B * R records. Further there must be B
batches of SimpleRecords each with a value of
+ // [0..R-1] converted to an utf-8 string with empty keys and headers.
+ int count = 0;
+ for (Record record : info.records.records()) {
+ byte[] expectedValue = String.valueOf(count %
recordsPerBatch).getBytes(StandardCharsets.UTF_8);
+ assertEquals(ByteBuffer.wrap(expectedValue), record.value());
+ count += 1;
+ }
+ assertEquals(recordsPerBatch * expectedBatches, count);
+ }
+
+ @Test
+ public void testLogLimitsReturnsLessThanMaxBytes() throws IOException {
+ // 5 records are written in batches of 141 bytes each (at time of
writing).
+ int magicMaxBatchSizeBytes = 141;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ magicMaxBatchSizeBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ int numberOfRecordsPerBatch = 10;
+ append(log, numberOfRecordsPerBatch, 5);
+ append(log, numberOfRecordsPerBatch, 5);
+ append(log, numberOfRecordsPerBatch, 5);
+ // Set to be larger than 1 batch but smaller than 2.
+ int magicMaxTotalBytes = 200;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
+ // MockLog#read returns data in batches and will return an additional
batch if one of them
+ // exceeds maxTotalBytes.
+ assertEquals(magicMaxTotalBytes, records.sizeInBytes());
+ }
+
+ @Test
+ public void testLogLimitsReturnsAtLeastOne() throws IOException {
+ int numberOfRecordsPerBatch = 10;
+ // 5 records are written in batches of 141 bytes each (at time of
writing).
+ int magicMaxBatchSizeBytes = 141;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ magicMaxBatchSizeBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ append(log, numberOfRecordsPerBatch, 5);
+ append(log, numberOfRecordsPerBatch, 5);
+ // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+ // Meaning we will read only the first batch and not the second.
+ int magicMaxTotalBytes = 1;
+ Records records = log.read(
+ 0,
+ Isolation.UNCOMMITTED,
+ magicMaxTotalBytes
+ ).records;
+ assertTrue(
+ records.sizeInBytes() > magicMaxTotalBytes,
+ String.format(
+ "Expected records size (%d) > maxTotalBytes (%d) since one
whole batch must be returned",
+ records.sizeInBytes(),
+ magicMaxTotalBytes
+ )
+ );
+ int recordCount = 0;
+ var iterator = records.records().iterator();
+ while (iterator.hasNext()) {
+ recordCount++;
+ iterator.next();
+ }
+ assertEquals(numberOfRecordsPerBatch, recordCount);
+ }
+
private static MetadataLogConfig createMetadataLogConfig(
int internalLogSegmentBytes,
long logSegmentMillis,
long retentionMaxBytes,
long retentionMillis,
- int internalMaxBatchSizeInBytes, //: Int =
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
- int internalMaxFetchSizeInBytes //: Int =
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+ int internalMaxBatchSizeInBytes
) {
Map<String, ?> config = Map.of(
MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG,
internalLogSegmentBytes,
@@ -1097,9 +1200,8 @@ public class KafkaRaftLogTest {
MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG,
retentionMaxBytes,
MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG,
retentionMillis,
MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG,
internalMaxBatchSizeInBytes,
-
MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG,
internalMaxFetchSizeInBytes,
MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG,
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
- );
+ );
return new MetadataLogConfig(new
AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false));
}