This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e2154fef858 MINOR: Cleanups in LogManager (#21916)
e2154fef858 is described below
commit e2154fef858dc0df684eeaddc48579db6c259991
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Mar 31 19:44:10 2026 +0200
MINOR: Cleanups in LogManager (#21916)
Reviewers: Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
---
.../kafka/storage/internals/log/LogManager.java | 99 ++++++++++++----------
1 file changed, 52 insertions(+), 47 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
index 17ad60e455e..6aa3892a59f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
@@ -672,44 +672,39 @@ public class LogManager {
uncleanLogDirs.add(logDirAbsolutePath);
}
- List<Runnable> jobsForDir = logsToLoad.stream().map(logDir -> {
- // KafkaStorageException might be thrown, ex: during
writing LeaderEpochFileCache
- // And while converting IOException to
KafkaStorageException, we've already handled the exception. So we can ignore it
here.
- // loadLog is completed for all logs under the logDir,
mark it.
- return (Runnable) () -> {
- LOG.debug("Loading log {}", logDir);
- Optional<UnifiedLog> log = Optional.empty();
- long logLoadStartMs = time.hiResClockMs();
- try {
- log = Optional.of(loadLog(logDir,
hadCleanShutdown.get(), recoveryPoints, logStartOffsets,
- defaultConfig, topicConfigOverrides,
numRemainingSegments, isStray));
- } catch (IOException ioe) {
- handleIOException(offlineDirs, logDirAbsolutePath,
ioe);
- } catch (KafkaStorageException kse) {
- // KafkaStorageException might be thrown, ex:
during writing LeaderEpochFileCache
- // And while converting IOException to
KafkaStorageException, we've already handled the exception.
- // So only throw if it's not the case here.
- if (!(kse.getCause() instanceof IOException)) {
- throw kse;
- }
- } finally {
- long logLoadDurationMs = time.hiResClockMs() -
logLoadStartMs;
- int remainingLogs =
decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath);
- int currentNumLoaded = logsToLoad.size() -
remainingLogs;
- if (log.isPresent()) {
- UnifiedLog loadedLog = log.get();
- LOG.info("Completed load of {} with {}
segments, local-log-start-offset {} and log-end-offset {} in {}ms ({}/{}
completed in {})",
- loadedLog,
loadedLog.numberOfSegments(), loadedLog.localLogStartOffset(),
loadedLog.logEndOffset(), logLoadDurationMs, currentNumLoaded,
logsToLoad.size(), logDirAbsolutePath);
- } else {
- LOG.info("Error while loading logs in {} in
{}ms ({}/{} completed in {})", logDir, logLoadDurationMs, currentNumLoaded,
logsToLoad.size(), logDirAbsolutePath);
- }
-
- if (remainingLogs == 0) {
- // loadLog is completed for all logs under the
logDir, mark it.
- loadLogsCompletedFlags.put(logDirAbsolutePath,
true);
- }
+ List<Runnable> jobsForDir = logsToLoad.stream().map(logDir ->
(Runnable) () -> {
+ LOG.debug("Loading log {}", logDir);
+ Optional<UnifiedLog> log = Optional.empty();
+ long logLoadStartMs = time.hiResClockMs();
+ try {
+ log = Optional.of(loadLog(logDir,
hadCleanShutdown.get(), recoveryPoints, logStartOffsets,
+ defaultConfig, topicConfigOverrides,
numRemainingSegments, isStray));
+ } catch (IOException ioe) {
+ handleIOException(offlineDirs, logDirAbsolutePath,
ioe);
+ } catch (KafkaStorageException kse) {
+ // KafkaStorageException might be thrown, ex: during
writing LeaderEpochFileCache
+ // And while converting IOException to
KafkaStorageException, we've already handled the exception.
+ // So only throw if it's not the case here.
+ if (!(kse.getCause() instanceof IOException)) {
+ throw kse;
+ }
+ } finally {
+ long logLoadDurationMs = time.hiResClockMs() -
logLoadStartMs;
+ int remainingLogs =
decNumRemainingLogs(numRemainingLogs, logDirAbsolutePath);
+ int currentNumLoaded = logsToLoad.size() -
remainingLogs;
+ if (log.isPresent()) {
+ UnifiedLog loadedLog = log.get();
+ LOG.info("Completed load of {} with {} segments,
local-log-start-offset {} and log-end-offset {} in {}ms ({}/{} completed in
{})",
+ loadedLog, loadedLog.numberOfSegments(),
loadedLog.localLogStartOffset(), loadedLog.logEndOffset(), logLoadDurationMs,
currentNumLoaded, logsToLoad.size(), logDirAbsolutePath);
+ } else {
+ LOG.info("Error while loading logs in {} in {}ms
({}/{} completed in {})", logDir, logLoadDurationMs, currentNumLoaded,
logsToLoad.size(), logDirAbsolutePath);
}
- };
+
+ if (remainingLogs == 0) {
+ // loadLog is completed for all logs under the
logDir, mark it.
+ loadLogsCompletedFlags.put(logDirAbsolutePath,
true);
+ }
+ }
}).toList();
jobs.add(jobsForDir.stream().map(pool::submit).collect(Collectors.toList()));
@@ -1477,16 +1472,26 @@ public class LogManager {
"which is not allowed when running in
KRaft mode."));
int partitionId = futureLog.topicPartition().partition();
- return Optional.ofNullable(tpToDirectoryUuid.get(topicId,
partitionId, brokerId))
- .filter(dir ->
directoryId(futureLog.parentDir()).map(id -> id.equals(dir)).orElse(false))
- .map(dir -> {
- Optional<UnifiedLog> currentLog =
Optional.ofNullable(currentLogs.get(futureLog.topicPartition()))
- .filter(log -> log.topicId()
- .map(id -> id.equals(topicId))
- .orElse(false));
- return Map.entry(futureLog, currentLog);
- })
- .stream();
+ // Get the directory assigned by the controller for this
partition
+ Uuid assignedDirectory = tpToDirectoryUuid.get(topicId,
partitionId, brokerId);
+ if (assignedDirectory == null) {
+ return Stream.empty();
+ }
+
+ // Check if the future log is in the assigned directory
+ Optional<Uuid> futureLogDirectoryId =
directoryId(futureLog.parentDir());
+ if (futureLogDirectoryId.isEmpty() ||
!futureLogDirectoryId.get().equals(assignedDirectory)) {
+ return Stream.empty();
+ }
+
+ // Find the matching current log (if it exists)
+ Optional<UnifiedLog> currentLog =
Optional.ofNullable(currentLogs.get(futureLog.topicPartition()))
+ .filter(log -> {
+ Optional<Uuid> currentLogTopicId =
log.topicId();
+ return currentLogTopicId.isPresent() &&
currentLogTopicId.get().equals(topicId);
+ });
+
+ return Stream.of(Map.entry(futureLog, currentLog));
})
.collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
}