junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225
##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -103,7 +103,7 @@ class DelayedFetch(
// We will not force complete the fetch request if a replica
should be throttled.
if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
return forceComplete()
- } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+ } else if (fetchOffset.onSameSegment(endOffset) &&
fetchOffset.messageOffset < endOffset.messageOffset) {
Review Comment:
Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is
the truncated leader case. In that case, we should always call
`forceComplete()` immediately, right? The current code only calls
`forceComplete()` when the offset metadata is available, but it's more
consistent to do that regardless of the availability of the offset metadata.
Should we do sth like the following?
```
if (fetchOffset.messageOffset > endOffset.messageOffset) {
// Case F, this can happen when the new fetch operation is on
a truncated leader
debug(s"Satisfying fetch $this since it is fetching later
segments of partition $topicIdPartition.")
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
if (fetchOffset.onOlderSegment(endOffset)) {
// Case F, this can happen when the fetch operation is
falling behind the current segment
// or the partition has just rolled a new segment
debug(s"Satisfying fetch $this immediately since it is
fetching older segments.")
// We will not force complete the fetch request if a replica
should be throttled.
if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
return forceComplete()
} else if (fetchOffset.onSameSegment(endOffset)) {
// we take the partition fetch size as upper bound when
accumulating the bytes (skip if a throttled partition)
val bytesAvailable =
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
accumulatedSize += bytesAvailable
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]