kamalcph commented on code in PR #20428:
URL: https://github.com/apache/kafka/pull/20428#discussion_r2757639340
##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -805,6 +819,29 @@ abstract class AbstractFetcherThread(name: String,
}
}
+ /**
+ * Determines the earliest offset for pending uploads, taking into account
+ * both local and remote storage conditions.
+ */
+ private def earliestPendingUploadOffset(topicPartition: TopicPartition,
currentLeaderEpoch: Int, leaderLogStartOffset: OffsetAndEpoch): OffsetAndEpoch
= {
+ val earliestPendingUploadOffset =
leader.fetchEarliestPendingUploadOffset(topicPartition, currentLeaderEpoch)
+ if (earliestPendingUploadOffset.offset == -1L) {
+ val leaderLocalStartOffset =
leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch)
+ if (leaderLocalStartOffset.offset == leaderLogStartOffset.offset) {
+ return leaderLocalStartOffset
+ }
+ throw new OffsetNotAvailableException("Segments are uploaded to remote
storage, but the leader does not have the information about the uploaded
segments")
Review Comment:
Shall we update the error message to be specific?
```
Segments are uploaded to remote storage, but the leader does not know the
earliest pending upload offset.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]