malletgu commented on code in PR #16959:
URL: https://github.com/apache/kafka/pull/16959#discussion_r1727172794
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1254,6 +1255,13 @@ void cleanupExpiredRemoteLogSegments() throws
RemoteStorageException, ExecutionE
canProcess = false;
continue;
}
+
+ if
(RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state())) {
+ // If the state is COPY_SEGMENT_STARTED and it's not
under copying process, this must be the previously
+ // failed copied state. We should clean it up directly.
+ danglingSegments.add(metadata);
+ continue;
Review Comment:
I think here we have a risk of race condition if `segmentIdsBeingCopied` is
updated during the for loop which could cause the segment initially being
copied to be deleted by mistake.
For the race condition to happen we would need to :
- When the list of segments is created L1244 a segment is currently being
copied and present in `segmentIdsBeingCopied`
- During the for loop, the segment copy finishes and its id is removed from
`segmentIdsBeingCopied`
- L1259 `if
(RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state()))` can now
be reached with state `COPY_SEGMENT_STARTED` for that segment because it has
been removed from `segmentIdsBeingCopied` which would make it dangling and
cause its deletion.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1374,10 +1387,17 @@ private Optional<RetentionSizeData>
buildRetentionSizeData(long retentionSize,
Iterator<RemoteLogSegmentMetadata> segmentsIterator =
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (segmentsIterator.hasNext()) {
RemoteLogSegmentMetadata segmentMetadata =
segmentsIterator.next();
- RemoteLogSegmentId segmentId =
segmentMetadata.remoteLogSegmentId();
- if (!visitedSegmentIds.contains(segmentId) &&
isRemoteSegmentWithinLeaderEpochs(segmentMetadata, logEndOffset, epochEntries))
{
- remoteLogSizeBytes +=
segmentMetadata.segmentSizeInBytes();
- visitedSegmentIds.add(segmentId);
+ // Only count the size of "COPY_SEGMENT_FINISHED" and
"DELETE_SEGMENT_STARTED" state segments
+ // because "COPY_SEGMENT_STARED" means copy didn't
complete, and "DELETE_SEGMENT_FINISHED" means delete completed.
+ // Note: there might be some "COPY_SEGMENT_STARED"
segments not counted here, but become "COPY_SEGMENT_FINISHED" soon.
+ // It's fine because the missed segment size will be
count in next time, and it won't cause more segment deletion.
Review Comment:
```suggestion
// Only count the size of "COPY_SEGMENT_FINISHED"
and "DELETE_SEGMENT_STARTED" state segments
// because "COPY_SEGMENT_STARTED" means copy didn't
complete, and "DELETE_SEGMENT_FINISHED" means delete did complete.
// Note: there might be some "COPY_SEGMENT_STARTED"
segments not counted here.
// Either they are being copied and will be counted
next time or they are dangling and will be cleaned
// elsewhere.
// Either way, this won't cause more segment
deletion.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]