chia7712 commented on code in PR #16614:
URL: https://github.com/apache/kafka/pull/16614#discussion_r1683591221
##########
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##########
@@ -348,7 +348,8 @@ public void truncateFromEndAsyncFlush(long endOffset) {
// - We still flush the change in #assign synchronously,
meaning that it's guaranteed that the checkpoint file always has no missing
entries.
// * Even when stale epochs are restored from the checkpoint
file after the unclean shutdown, it will be handled by
// another truncateFromEnd call on log loading procedure,
so it won't be a problem
- scheduler.scheduleOnce("leader-epoch-cache-flush-" +
topicPartition, this::writeToFileForTruncation);
+ List<EpochEntry> entries = new ArrayList<>(epochs.values());
+ scheduler.scheduleOnce("leader-epoch-cache-flush-" +
topicPartition, () -> checkpoint.writeForTruncation(entries));
Review Comment:
> Since the issue is only in the test, I am wondering if we could fix the
test directly. For example, perhaps we could introduce a NoOpScheduler and use
it in the test, since the test doesn't depend on the leader epoch entries to be
actually flushed to disk.
this is another good approach.
> This approach introduces a new correctness issue. With this change, it's
possible for older epoch entries to overwrite the newer epoch entries in the
leader epoch file. Consider the following sequence: we take a snapshot of the
epoch entries here; a new epoch entry is added and is flushed to disk; the
scheduler then writes the snapshot to disk. This can lead to the case where the
leader epoch file doesn't contain all entries up to the recovery point.
Sorry to cause possible correctness issue. @FrankYang0529 and I had
discussed the approach offline when I noticed that deadlock, and I suggest to
change the production code directly. It seems to me this PR does NOT change the
execution order, because the "writeToFileForTruncation" does not hold the
single lock to complete the "snapshot" and "flush".
```java
private void writeToFileForTruncation() {
// phase 1: create snapshot by holding read lock
List<EpochEntry> entries;
lock.readLock().lock();
try {
entries = new ArrayList<>(epochs.values());
} finally {
lock.readLock().unlock();
}
// phase 2: flush by holding write lock
checkpoint.writeForTruncation(entries);
}
```
Hence, the issue you mentioned can happen even though we revert this PR. for
example:
1. `writeToFileForTruncation` (run by scheduler) take a snapshot of the
epoch entries in phase 1 (see comment in above code)
2. a new epoch entry is added and is flushed to disk
3. `writeToFileForTruncation` (run by scheduler) then writes the snapshot to
disk in phase 2 (see comment in above code)
In summary: there are two follow-up:
1. rewrite `testLogRecoveryMetrics` by `NoOpScheduler`
2. add `writeToFileForTruncation` back except for "snapshot". for example:
```java
private void writeToFileForTruncation() {
lock.readLock().lock();
try {
checkpoint.writeForTruncation(epochs.values());
} finally {
lock.readLock().unlock();
}
}
```
@junrao WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]