kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362
##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
/**
* Given a message offset, find its corresponding offset metadata in the
log.
- * If the message offset is out of range, throw an OffsetOutOfRangeException
+ * 1. If the message offset is lesser than the log-start-offset, then throw
an OffsetOutOfRangeException
+ * 2. If the message offset is lesser than the local-log-start-offset, then
it returns the message-only metadata
+ * 3. If the message offset is greater than the log-end-offset, then it
returns the message-only metadata
*/
- private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata
= {
+ private[log] def convertToOffsetMetadataOrThrow(offset: Long):
LogOffsetMetadata = {
Review Comment:
To solve the infinite loop, instead of returning the message-only
`LogOffsetMetadata`. Can we retain the same behavior without the loop?
Something like below:
```java
def read(startOffset: Long,
maxLength: Int,
minOneMessage: Boolean,
maxOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in
dir ${dir.getParent}") {
trace(s"Reading maximum $maxLength bytes at offset $startOffset from
log with " +
s"total length ${segments.sizeInBytes} bytes")
val endOffsetMetadata = nextOffsetMetadata
val endOffset = endOffsetMetadata.messageOffset
val segmentOpt = segments.floorSegment(startOffset)
// return error on attempt to read beyond the log end offset
if (startOffset > endOffset || !segmentOpt.isPresent)
throw new OffsetOutOfRangeException(s"Received request for offset
$startOffset for partition $topicPartition, " +
s"but we only have log segments upto $endOffset.")
if (startOffset == maxOffsetMetadata.messageOffset) {
emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
} else if (startOffset > maxOffsetMetadata.messageOffset) {
// Updated code to avoid the loop:
val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get,
startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata,
includeAbortedTxns = false)
emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata,
includeAbortedTxns)
} else {
readFetchDataInfo(segmentOpt.get, startOffset, maxLength,
minOneMessage, maxOffsetMetadata, includeAbortedTxns)
}
}
}
private def readFetchDataInfo(segment: LogSegment,
startOffset: Long,
maxLength: Int,
minOneMessage: Boolean,
maxOffsetMetadata: LogOffsetMetadata,
includeAbortedTxns: Boolean): FetchDataInfo
= {
// Do the read on the segment with a base offset less than the target
offset
// but if that segment doesn't contain any messages with an offset
greater than that
// continue to read from successive segments until we get some messages
or we reach the end of the log
var fetchDataInfo: FetchDataInfo = null
var segmentOpt: Optional[LogSegment] = Optional.of(segment)
while (fetchDataInfo == null && segmentOpt.isPresent) {
val segment = segmentOpt.get
val baseOffset = segment.baseOffset
val maxPosition =
// Use the max offset position if it is on this segment; otherwise,
the segment size is the limit.
if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset)
maxOffsetMetadata.relativePositionInSegment
else segment.size
fetchDataInfo = segment.read(startOffset, maxLength, maxPosition,
minOneMessage)
if (fetchDataInfo != null) {
if (includeAbortedTxns)
fetchDataInfo = addAbortedTransactions(startOffset, segment,
fetchDataInfo)
} else segmentOpt = segments.higherSegment(baseOffset)
}
if (fetchDataInfo != null) fetchDataInfo
else {
// okay we are beyond the end of the last segment with no data fetched
although the start offset is in range,
// this can happen when all messages with offset larger than start
offsets have been deleted.
// In this case, we will return the empty set with log end offset
metadata
new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]