[
https://issues.apache.org/jira/browse/KAFKA-19571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18061703#comment-18061703
]
Ilyas Toumlilt commented on KAFKA-19571:
----------------------------------------
Updated the "Proposed fix" section in description to match the actual merged
fix.
> Race condition between log segment flush and file deletion causing log dir to
> go offline
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-19571
> URL: https://issues.apache.org/jira/browse/KAFKA-19571
> Project: Kafka
> Issue Type: Bug
> Components: core, log
> Affects Versions: 3.7.1
> Reporter: Ilyas Toumlilt
> Assignee: Ilyas Toumlilt
> Priority: Major
> Fix For: 4.2.0, 4.3.0, 4.0.2, 4.1.2
>
>
> h1. Context
> We are using Kafka v3.7.1 with Zookeeper, our brokers are configured with
> multiple disks in a JBOD setup, routine intra-broker data rebalancing is
> performed using Cruise Control to manage disk utilization. During these
> rebalance operations, a race condition between a log segment flush operation
> and the file deletion that is part of the replica's directory move. This race
> leads to a `NoSuchFileException` when the flush operation targets a file path
> that has just been deleted by the rebalance process. This exception
> incorrectly forces the broker to take the entire log directory offline.
> h1. Logs / Stack trace
> {code:java}
> 2025-07-23 19:03:30,114 WARN Failed to flush file
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot (org.apache.kafka.
> common.utils.Utils)
> java.nio.file.NoSuchFileException:
> /var/lib/kafka-08/topic_01-12/00000000024420850595.snapshot
> at
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
> at
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
> at
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
> at
> java.base/sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:182)
> at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
> at java.base/java.nio.channels.FileChannel.open(FileChannel.java:345)
> at
> org.apache.kafka.common.utils.Utils.flushFileIfExists(Utils.java:1029)
> at
> kafka.log.UnifiedLog.$anonfun$flushProducerStateSnapshot$2(UnifiedLog.scala:1766)
> at
> kafka.log.UnifiedLog.flushProducerStateSnapshot(UnifiedLog.scala:1915)
> at kafka.log.UnifiedLog.$anonfun$roll$2(UnifiedLog.scala:1679)
> at java.base/java.util.Optional.ifPresent(Optional.java:183)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1679)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,114 ERROR Error while flushing log for topic_01-12 in dir
> /var/lib/kafka-08 with offset 24420850595 (exclusi
> ve) and recovery point 24420850595
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
> at
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
> at com.yammer.metrics.core.Timer.time(Timer.java:91)
> at
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
> at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
> at kafka.log.LocalLog.flush(LocalLog.scala:176)
> at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
> at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
> at
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,115 ERROR Uncaught exception in scheduled task
> 'flush-log' (org.apache.kafka.server.util.KafkaScheduler)
> org.apache.kafka.common.errors.KafkaStorageException: Error while flushing
> log for topic_01-12 in dir /var/lib/kafka-08 with off
> set 24420850595 (exclusive) and recovery point 24420850595
> Caused by: java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at
> java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
> at
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:631)
> at
> org.apache.kafka.storage.internals.log.LogSegment$2.call(LogSegment.java:627)
> at com.yammer.metrics.core.Timer.time(Timer.java:91)
> at
> org.apache.kafka.storage.internals.log.LogSegment.flush(LogSegment.java:627)
> at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:176)
> at java.base/java.lang.Iterable.forEach(Iterable.java:75)
> at kafka.log.LocalLog.flush(LocalLog.scala:176)
> at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1719)
> at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1915)
> at
> kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1700)
> at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1680)
> at
> org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> 2025-07-23 19:03:30,117 WARN [ReplicaManager broker=32] Stopping serving
> replicas in dir /var/lib/kafka-08 (kafka.server.ReplicaManager) {code}
> Stack Trace Analysis
> The failure begins with a benign `{{{}WARN`{}}} when a scheduled task tries
> to flush a producer state snapshot that was moved during a disk rebalance;
> this {{`NoSuchFileException`}} is anticipated and handled gracefully by the
> code. As implemented in https://issues.apache.org/jira/browse/KAFKA-13403 to
> swallow the exception.
> However, the same task then attempts to flush the actual log segment, which
> fails with a critical, unhandled `{{{}ClosedChannelException{}}}` because
> the file handles were invalidated by the directory's move. This unhandled I/O
> error propagates up and terminates the background task, causing the
> `{{{}KafkaScheduler{}}}` to log it as an uncaught
> {{{}`{}}}{{{}KafkaStorageException`{}}}. As a direct consequence, the
> `{{{}ReplicaManager{}}}` detects this fatal storage error and triggers its
> safety mechanism, taking the entire log directory offline to prevent
> potential data corruption.
> h1. Expected Behavior
> A {{`NoSuchFileException`}} in this context should not cause the entire log
> directory to be marked as offline.
> h1. Workaround
> The current workaround is to manually restart the affected Kafka broker. The
> restart clears the in-memory state, and upon re-scanning the log directories,
> the broker marks the disk as healthy again.
> h1. Proposed fix
> The fix was implemented in `LogManager.replaceCurrentWithFutureLog()`
> *Approach:* Do not close the source log when swapping the current log with
> the future log during replica rebalancing. Closing it was what caused pending
> flush/read operations to see closed channels and trigger the log directory
> failure.
> *Changes:*
> # Remove the explicit `close()` of the source log in
> `replaceCurrentWithFutureLog()`. After renaming the replica directory to the
> `.delete` suffix, the source log is no longer closed there.
> # Leave file handles open so that any in-flight operations (e.g. log
> flusher, fetch requests) can still use the renamed files. On Unix, renamed
> files stay accessible until all file handles are closed.
> # Rely on existing async deletion: the source log is already scheduled for
> deletion via `addLogToBeDeleted()`. The background delete-logs thread closes
> the log and deletes the files after the configured delay
> (`log.segment.delete.delay.ms`). By then, no other operations should still be
> using those handles.
> This avoids the race for both flush and read paths (unlike a fix only in
> `LogSegment.flush()`), and is consistent with `LogManager.asyncDelete()`.
>
> *Testing:*
> A new unit test `testReplaceCurrentWithFutureLogDoesNotCloseSourceLog` in
> `LogManagerTest` verifies that the source log is not closed during the swap
> and that `flush()` can be called without error afterward.
> h2. Related issues
> * https://issues.apache.org/jira/browse/KAFKA-13403 was fixed to swallow the
> first `{{{}NoSuchFileException{}}}` WARN in the above stacktrace, but not the
> underlying exception.
> * https://issues.apache.org/jira/browse/KAFKA-15391 is similar but
> different, it swallows `NoSuchFileException` for race condition on log
> directory move/delete, but not on the segment file level.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)