jiafu1115 commented on code in PR #21361:
URL: https://github.com/apache/kafka/pull/21361#discussion_r2739368431
##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -907,28 +908,69 @@ private void maybeUpdateCopiedOffset(UnifiedLog log)
throws RemoteStorageExcepti
}
}
+ /**
+ * Check if segment has already expired based on remote storage's
retention time.
+ */
+ private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment
segment, long retentionMs) throws IOException {
+ if (retentionMs <= 0) {
+ return false;
+ }
+ return time.milliseconds() - segment.largestTimestamp() >
retentionMs;
+ }
+
+ /**
+ * Check if segment has already expired based on remote storageās
retention size.
+ */
+ private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment
segment, long retentionBytes, long logSize, long accumulatedSkippedSize) {
+ if (retentionBytes <= 0) {
+ return false;
+ }
+ return (logSize - retentionBytes - accumulatedSkippedSize) >
segment.size();
+ }
+
/**
* Segments which match the following criteria are eligible for
copying to remote storage:
* 1) Segment is not the active segment and
* 2) Segment end-offset is less than the last-stable-offset as
remote storage should contain only
* committed/acked messages
+ *
+ * Additionally, if a segment has already expired based on remote
storage's retention configuration,
+ * it will be skipped from upload and logStartOffset will be updated
to allow local deletion.
+ *
* @param log The log from which the segments are to be copied
* @param fromOffset The offset from which the segments are to be
copied
* @param lastStableOffset The last stable offset of the log
* @return candidate log segments to be copied to remote storage
*/
- List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long
fromOffset, Long lastStableOffset) {
+ List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long
fromOffset, Long lastStableOffset) throws IOException {
List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
List<LogSegment> segments = log.logSegments(fromOffset,
Long.MAX_VALUE);
- if (!segments.isEmpty()) {
- for (int idx = 1; idx < segments.size(); idx++) {
- LogSegment previousSeg = segments.get(idx - 1);
- LogSegment currentSeg = segments.get(idx);
- if (currentSeg.baseOffset() <= lastStableOffset) {
- candidateLogSegments.add(new
EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
- }
+ if (segments.isEmpty()) {
+ return candidateLogSegments;
+ }
+ long retentionMs = log.config() != null ? log.config().retentionMs
: -1;
+ long retentionSize = log.config() != null ?
log.config().retentionSize : -1;
+ // Compute log.size() once when retention is size-based; skip when
not needed to avoid wasted work.
+ long logSize = retentionSize > 0 ? log.size() : -1;
+ long accumulatedSkippedSize = 0;
+ for (int idx = 1; idx < segments.size(); idx++) {
+ LogSegment previousSeg = segments.get(idx - 1);
+ LogSegment currentSeg = segments.get(idx);
+ if (currentSeg.baseOffset() > lastStableOffset) {
+ continue;
+ }
+
+ if (isSegmentExpiredByTimeForRemoteStorage(previousSeg,
retentionMs) ||
+ isSegmentExpiredBySizeForRemoteStorage(previousSeg,
retentionSize, logSize, accumulatedSkippedSize)) {
+ long newLogStartOffset = currentSeg.baseOffset();
+ log.maybeIncrementLogStartOffset(newLogStartOffset,
LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention);
+ logger.info("Segment {} has already expired based on
remote storage's retention configuration. Skipping upload and incrementing
logStartOffset to {} to allow local deletion.",
+ previousSeg, newLogStartOffset);
+ accumulatedSkippedSize += previousSeg.size();
+ continue;
Review Comment:
Note: The middle segment should not be skipped. Find segments starting from
the oldest until the user-supplied predicate is false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]