junrao commented on code in PR #16614:
URL: https://github.com/apache/kafka/pull/16614#discussion_r1683340380
##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) {
// - We still flush the change in #assign synchronously,
meaning that it's guaranteed that the checkpoint file always has no missing
entries.
// * Even when stale epochs are restored from the checkpoint
file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure,
so it won't be a problem
- scheduler.scheduleOnce("leader-epoch-cache-flush-" +
topicPartition, this::writeToFileForTruncation);
+ List<EpochEntry> entries = new ArrayList<>(epochs.values());
+ scheduler.scheduleOnce("leader-epoch-cache-flush-" +
topicPartition, () -> checkpoint.writeForTruncation(entries));
Review Comment:
This approach introduces a new correctness issue. With this change, it's
possible for older epoch entries to overwrite the newer epoch entries in the
leader epoch file. Consider the following sequence: we take a snapshot of the
epoch entries here; a new epoch entry is added and is flushed to disk; the
scheduler then writes the snapshot to disk. This can lead to the case where
the leader epoch file doesn't contain all entries up to the recovery point.
Since the issue is only in the test, I am wondering if we could fix the test
directly. For example, perhaps we could introduce a NoOpScheduler and use it in
the test, since the test doesn't depend on the leader epoch entries to be
actually flushed to disk.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]