kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131
##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
assertTrue(fetchResultOpt.isDefined)
+
+ val fetchResult = fetchResultOpt.get
+ assertEquals(Errors.NONE, fetchResult.error)
+ }
+
+ @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark
minBytes={0}")
+ @ValueSource(ints = Array(1, 2))
+ def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+ val fetchOffset = 450L
+ val logStartOffset = 5L
+ val currentLeaderEpoch = Optional.of[Integer](10)
+ val replicaId = 1
+
+ val fetchStatus = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId,
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500,
minBytes = minBytes)
+
+ var fetchResultOpt: Option[FetchPartitionData] = None
+ def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
+ fetchResultOpt = Some(responses.head._2)
+ }
+
+ val delayedFetch = new DelayedFetch(
+ params = fetchParams,
+ fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ replicaManager = replicaManager,
+ quota = replicaQuota,
+ responseCallback = callback
+ )
+
+ val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+ // high-watermark is lesser than the log-start-offset
+ val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+ when(partition.fetchOffsetSnapshot(
+ currentLeaderEpoch,
+ fetchOnlyFromLeader = true))
+ .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata,
endOffsetMetadata, endOffsetMetadata))
+ when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+ expectReadFromReplica(fetchParams, topicIdPartition,
fetchStatus.fetchInfo, Errors.NONE)
+
+ val expected = minBytes == 1
+ assertEquals(expected, delayedFetch.tryComplete())
+ assertEquals(expected, delayedFetch.isCompleted)
Review Comment:
In the test, the
[LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java)
contains message-only offset for logEndOffset, highWatermark, and
lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the
newly added condition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]