josefk31 commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2706191639
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -118,14 +118,22 @@ UnifiedLog log() {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+ public int defaultLocalReadMaxRecordsSizeBytes() {
+ return config.internalMaxFetchSizeInBytes();
+ }
+
+ @Override
+ public LogFetchInfo read(long startOffset, Isolation readIsolation, int
maxRecordSizeBytes) {
Review Comment:
Done!
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception
{
+ // Create an explicit test to check that
controller.quorum.fetch.max.size.bytes is used to construct fetch
Review Comment:
Done! Let me know what you think :)
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -1866,8 +1888,7 @@ FetchRequestData fetchRequest(
long fetchOffset,
int lastFetchedEpoch,
OptionalLong highWatermark,
- int maxWaitTimeMs
- ) {
+ int maxWaitTimeMs) {
Review Comment:
Good catch! Done!
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws
IOException {
);
}
+ @Test
+ public void testReadOfDefaultLogValue() throws IOException {
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 128,
+ 1
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ // Append twice to ensure we have two batches.
+ append(log, 1, 1);
+ append(log, 2, 1);
+
+ // If the default configured value of 1 is used we will read a single
record.
+ LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+ // Exactly 1 batch of records will be read. Since there are 2 batches,
with the first batch having 1 record
+ // only 1 record should be returned.
+ assertRecords(info, 1, 1);
+ }
+
+ @Test
+ public void testNonDefaultReadFromLog() throws IOException {
+ int batchSizeBytes = 1024;
+ int maxSizeToReadBytes = 1;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ batchSizeBytes,
+ maxSizeToReadBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ int recordsPerBatch = 5;
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+
+ // Default value of 1 is NOT used in this case.
+ LogFetchInfo info = log.read(0,
+ Isolation.UNCOMMITTED,
+ batchSizeBytes * 3);
+
+ assertRecords(info, recordsPerBatch * 2, recordsPerBatch);
+ }
+
+ private static void assertRecords(LogFetchInfo info, int numberExpected,
int recordsPerBatch) {
Review Comment:
Hmmm. Since this is a `private static` method and none of the other `private
static` utility methods in this class have a java doc I don't think it's really
needed.
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws
IOException {
);
}
+ @Test
+ public void testReadOfDefaultLogValue() throws IOException {
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 128,
+ 1
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ // Append twice to ensure we have two batches.
+ append(log, 1, 1);
+ append(log, 2, 1);
+
+ // If the default configured value of 1 is used we will read a single
record.
+ LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+ // Exactly 1 batch of records will be read. Since there are 2 batches,
with the first batch having 1 record
+ // only 1 record should be returned.
+ assertRecords(info, 1, 1);
+ }
Review Comment:
Then 2 records will be returned. It seems like the LogSegment implementation
will return at least 1 *batch*. I changed the test slightly to illustrate this.
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,26 @@ public interface RaftLog extends AutoCloseable {
/**
* Read a set of records within a range of offsets.
+ * maxTotalRecordsSizeBytes specifies a "soft" max for total byte size of
the records to read.
*/
- LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+ LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxTotalRecordsSizeBytes);
+
+ /**
+ * Configures a soft max for total size of bytes read via default read
function implementation.
+ * Reads which are called from Fetch requests have a configured soft-max
at the Quorum level (KIP-1219).
+ * Most reads are, however, intended to be from a disk based raft log to
memory.
+ * The default is intended for local reads with only reads intended to be
transmitted as a response to fetch using
+ * the non-default implementation.
Review Comment:
Fair enough. I removed this comment because I felt like there was too much
exposition. The most important thing that an implementer should know is that it
should be a "soft-max" with respect to batches. Don't think additional
information is needed.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java:
##########
@@ -1321,7 +1323,7 @@ public void testFetchSnapshotResponsePartialData(boolean
withKip853Rpc) throws E
snapshotRequest,
context.metadataPartition,
localId,
- KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+ context.fetchSnapshotMaxSizeBytes
Review Comment:
The kip configuration is `controller.quorum.fetchsnapshot.max.bytes` so it
is setup to resemble the configured value (although it is a bit "different"
when compared with the old `KafkaRaftClient.MAX_FETCH_SIZE_BYTES`).
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java:
##########
@@ -91,6 +93,75 @@ private static void
testFetchResponseWithInvalidRecord(MemoryRecords records, in
assertEquals(oldLogEndOffset, context.log.endOffset().offset());
}
+ @Test
+ void testFetchRequestObeysConfiguredMaximumBytesToFetch() throws Exception
{
+ // Create an explicit test to check that
controller.quorum.fetch.max.size.bytes is used to construct fetch
+ // requests.
+ int epoch = 2;
+ int localId = KafkaRaftClientTest.randomReplicaId();
+ ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+ ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1,
true);
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ local.id(),
+ local.directoryId().get()
+ )
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(local, electedLeader)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withElectedLeader(epoch, electedLeader.id())
+ // Explicitly change the configuration here.
+ .withFetchMaxSizeBytes(1024)
+ .build();
+
+ context.pollUntilRequest();
+ RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+ // assertFetchRequestData contains a check which verifies the
SizeBytes field of the Fetch request.
+ context.assertFetchRequestData(fetchRequest, epoch, 0L, 0,
OptionalLong.empty());
+ }
+
+ @Test
+ public void testMaxBytesRequestedFromLogsRespectsValueInFetchRequest()
throws Exception {
+ var epoch = 2;
+ var id = KafkaRaftClientTest.randomReplicaId();
+ var localKey = KafkaRaftClientTest.replicaKey(id, true);
+ var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+ var localMaxSizeBytes = 1024;
+ var remoteMaxSizeBytes = 512;
+
+ RaftClientTestContext context = new RaftClientTestContext.Builder(
+ localKey.id(),
+ localKey.directoryId().get()
+ )
+ .appendToLog(epoch, List.of("a", "b", "c"))
+ .appendToLog(epoch, List.of("d", "e", "f"))
+ .withStartingVoters(
+ VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
KRaftVersion.KRAFT_VERSION_1
+ )
+ .withUnknownLeader(epoch)
+ .withFetchMaxSizeBytes(localMaxSizeBytes)
+ .build();
+
+ context.unattachedToLeader();
+ epoch = context.currentEpoch();
+
+ // The next read from MockLog will be intended for a fetch request.
+ // We wish to assert that it uses the value supplied from the fetch.
+ context.log.setExpectedMaxTotalRecordsSizeBytes(remoteMaxSizeBytes);
+
+ // Send a fetch request with max bytes that are different from the
configured value.
+ FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L,
epoch, 500);
+ request.setMaxBytes(remoteMaxSizeBytes);
+ context.deliverRequest(request);
+
+ context.pollUntilResponse();
+ FetchResponseData.PartitionData partitionData =
context.assertSentFetchPartitionResponse();
+ // Failures for this test will appear in error-logs.
Review Comment:
Added comment which is a bit more clear.
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws
IOException {
);
}
+ @Test
+ public void testReadOfDefaultLogValue() throws IOException {
Review Comment:
Great suggestion! Naming things is hard 🤣
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws
IOException {
);
}
+ @Test
+ public void testReadOfDefaultLogValue() throws IOException {
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 128,
+ 1
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ // Append twice to ensure we have two batches.
+ append(log, 1, 1);
+ append(log, 2, 1);
+
+ // If the default configured value of 1 is used we will read a single
record.
+ LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+ // Exactly 1 batch of records will be read. Since there are 2 batches,
with the first batch having 1 record
+ // only 1 record should be returned.
+ assertRecords(info, 1, 1);
+ }
+
+ @Test
+ public void testNonDefaultReadFromLog() throws IOException {
+ int batchSizeBytes = 1024;
+ int maxSizeToReadBytes = 1;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ batchSizeBytes,
+ maxSizeToReadBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ int recordsPerBatch = 5;
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+
+ // Default value of 1 is NOT used in this case.
Review Comment:
Good catch! I removed the comment because I think your
[suggestion](https://github.com/apache/kafka/pull/21028#discussion_r2688313678)
made the code more clear so it is no longer needed.
##########
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java:
##########
@@ -1083,6 +1086,70 @@ public void testSegmentsLessThanLatestSnapshot() throws
IOException {
);
}
+ @Test
+ public void testReadOfDefaultLogValue() throws IOException {
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ 128,
+ 1
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ // Append twice to ensure we have two batches.
+ append(log, 1, 1);
+ append(log, 2, 1);
+
+ // If the default configured value of 1 is used we will read a single
record.
+ LogFetchInfo info = log.read(0, Isolation.UNCOMMITTED);
+
+ // Exactly 1 batch of records will be read. Since there are 2 batches,
with the first batch having 1 record
+ // only 1 record should be returned.
+ assertRecords(info, 1, 1);
+ }
+
+ @Test
+ public void testNonDefaultReadFromLog() throws IOException {
+ int batchSizeBytes = 1024;
+ int maxSizeToReadBytes = 1;
+ MetadataLogConfig config = createMetadataLogConfig(
+ 10240,
+ 10 * 1000,
+ 10240,
+ 60 * 1000,
+ batchSizeBytes,
+ maxSizeToReadBytes
+ );
+ KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+ int recordsPerBatch = 5;
+ append(log, recordsPerBatch, 1);
+ append(log, recordsPerBatch, 1);
+
+ // Default value of 1 is NOT used in this case.
+ LogFetchInfo info = log.read(0,
+ Isolation.UNCOMMITTED,
+ batchSizeBytes * 3);
Review Comment:
Improved the test by parameterizing it and actually worked out an exact
meaningful value for `batchSizeBytes`. Hopefully it is more clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]