This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2154fef858 MINOR: Cleanups in LogManager (#21916)
e2154fef858 is described below

commit e2154fef858dc0df684eeaddc48579db6c259991
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Mar 31 19:44:10 2026 +0200

    MINOR: Cleanups in LogManager (#21916)
    
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
---
 .../kafka/storage/internals/log/LogManager.java    | 99 ++++++++++++----------
 1 file changed, 52 insertions(+), 47 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
index 17ad60e455e..6aa3892a59f 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
@@ -672,44 +672,39 @@ public class LogManager {
                     uncleanLogDirs.add(logDirAbsolutePath);
                 }
 
-                List<Runnable> jobsForDir = logsToLoad.stream().map(logDir -> {
-                    // KafkaStorageException might be thrown, ex: during 
writing LeaderEpochFileCache
-                    // And while converting IOException to 
KafkaStorageException, we've already handled the exception. So we can ignore it 
here.
-                    // loadLog is completed for all logs under the logDir, 
mark it.
-                    return (Runnable) () -> {
-                        LOG.debug("Loading log {}", logDir);
-                        Optional<UnifiedLog> log = Optional.empty();
-                        long logLoadStartMs = time.hiResClockMs();
-                        try {
-                            log = Optional.of(loadLog(logDir, 
hadCleanShutdown.get(), recoveryPoints, logStartOffsets,
-                                    defaultConfig, topicConfigOverrides, 
numRemainingSegments, isStray));
-                        } catch (IOException ioe) {
-                            handleIOException(offlineDirs, logDirAbsolutePath, 
ioe);
-                        } catch (KafkaStorageException kse) {
-                            // KafkaStorageException might be thrown, ex: 
during writing LeaderEpochFileCache
-                            // And while converting IOException to 
KafkaStorageException, we've already handled the exception.
-                            // So only throw if it's not the case here.
-                            if (!(kse.getCause() instanceof IOException)) {
-                                throw kse;
-                            }
-                        } finally {
-                            long logLoadDurationMs = time.hiResClockMs() - 
logLoadStartMs;
-                            int remainingLogs = 
decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath);
-                            int currentNumLoaded = logsToLoad.size() - 
remainingLogs;
-                            if (log.isPresent()) {
-                                UnifiedLog loadedLog = log.get();
-                                LOG.info("Completed load of {} with {} 
segments, local-log-start-offset {} and log-end-offset {} in {}ms ({}/{} 
completed in {})",
-                                        loadedLog, 
loadedLog.numberOfSegments(), loadedLog.localLogStartOffset(), 
loadedLog.logEndOffset(), logLoadDurationMs, currentNumLoaded, 
logsToLoad.size(), logDirAbsolutePath);
-                            } else {
-                                LOG.info("Error while loading logs in {} in 
{}ms ({}/{} completed in {})", logDir, logLoadDurationMs, currentNumLoaded, 
logsToLoad.size(), logDirAbsolutePath);
-                            }
-
-                            if (remainingLogs == 0) {
-                                // loadLog is completed for all logs under the 
logDir, mark it.
-                                loadLogsCompletedFlags.put(logDirAbsolutePath, 
true);
-                            }
+                List<Runnable> jobsForDir = logsToLoad.stream().map(logDir -> 
(Runnable) () -> {
+                    LOG.debug("Loading log {}", logDir);
+                    Optional<UnifiedLog> log = Optional.empty();
+                    long logLoadStartMs = time.hiResClockMs();
+                    try {
+                        log = Optional.of(loadLog(logDir, 
hadCleanShutdown.get(), recoveryPoints, logStartOffsets,
+                                defaultConfig, topicConfigOverrides, 
numRemainingSegments, isStray));
+                    } catch (IOException ioe) {
+                        handleIOException(offlineDirs, logDirAbsolutePath, 
ioe);
+                    } catch (KafkaStorageException kse) {
+                        // KafkaStorageException might be thrown, ex: during 
writing LeaderEpochFileCache
+                        // And while converting IOException to 
KafkaStorageException, we've already handled the exception.
+                        // So only throw if it's not the case here.
+                        if (!(kse.getCause() instanceof IOException)) {
+                            throw kse;
+                        }
+                    } finally {
+                        long logLoadDurationMs = time.hiResClockMs() - 
logLoadStartMs;
+                        int remainingLogs = 
decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath);
+                        int currentNumLoaded = logsToLoad.size() - 
remainingLogs;
+                        if (log.isPresent()) {
+                            UnifiedLog loadedLog = log.get();
+                            LOG.info("Completed load of {} with {} segments, 
local-log-start-offset {} and log-end-offset {} in {}ms ({}/{} completed in 
{})",
+                                    loadedLog, loadedLog.numberOfSegments(), 
loadedLog.localLogStartOffset(), loadedLog.logEndOffset(), logLoadDurationMs, 
currentNumLoaded, logsToLoad.size(), logDirAbsolutePath);
+                        } else {
+                            LOG.info("Error while loading logs in {} in {}ms 
({}/{} completed in {})", logDir, logLoadDurationMs, currentNumLoaded, 
logsToLoad.size(), logDirAbsolutePath);
                         }
-                    };
+
+                        if (remainingLogs == 0) {
+                            // loadLog is completed for all logs under the 
logDir, mark it.
+                            loadLogsCompletedFlags.put(logDirAbsolutePath, 
true);
+                        }
+                    }
                 }).toList();
 
                 
jobs.add(jobsForDir.stream().map(pool::submit).collect(Collectors.toList()));
@@ -1477,16 +1472,26 @@ public class LogManager {
                                     "which is not allowed when running in 
KRaft mode."));
                     int partitionId = futureLog.topicPartition().partition();
 
-                    return Optional.ofNullable(tpToDirectoryUuid.get(topicId, 
partitionId, brokerId))
-                            .filter(dir -> 
directoryId(futureLog.parentDir()).map(id -> id.equals(dir)).orElse(false))
-                            .map(dir -> {
-                                Optional<UnifiedLog> currentLog = 
Optional.ofNullable(currentLogs.get(futureLog.topicPartition()))
-                                        .filter(log -> log.topicId()
-                                                .map(id -> id.equals(topicId))
-                                                .orElse(false));
-                                return Map.entry(futureLog, currentLog);
-                            })
-                            .stream();
+                    // Get the directory assigned by the controller for this 
partition
+                    Uuid assignedDirectory = tpToDirectoryUuid.get(topicId, 
partitionId, brokerId);
+                    if (assignedDirectory == null) {
+                        return Stream.empty();
+                    }
+
+                    // Check if the future log is in the assigned directory
+                    Optional<Uuid> futureLogDirectoryId = 
directoryId(futureLog.parentDir());
+                    if (futureLogDirectoryId.isEmpty() || 
!futureLogDirectoryId.get().equals(assignedDirectory)) {
+                        return Stream.empty();
+                    }
+
+                    // Find the matching current log (if it exists)
+                    Optional<UnifiedLog> currentLog = 
Optional.ofNullable(currentLogs.get(futureLog.topicPartition()))
+                            .filter(log -> {
+                                Optional<Uuid> currentLogTopicId = 
log.topicId();
+                                return currentLogTopicId.isPresent() && 
currentLogTopicId.get().equals(topicId);
+                            });
+
+                    return Stream.of(Map.entry(futureLog, currentLog));
                 })
                 .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }

Reply via email to