This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f0c3d93104c KAFKA-19597: Stop the RSM after closing the remote-log
reader threads to handle requests gracefully (#20342)
f0c3d93104c is described below
commit f0c3d93104caa33a2566eff0656de37573a3922a
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Aug 19 21:56:27 2025 +0530
KAFKA-19597: Stop the RSM after closing the remote-log reader threads to
handle requests gracefully (#20342)
During shutdown, when the RSM closes first, then the ongoing requests
might throw an error. To handle the ongoing requests gracefully, closing
the RSM after closing the remote-log reader thread pools.
Reviewers: Satish Duggana <[email protected]>
---
.../apache/kafka/server/log/remote/storage/RemoteLogManager.java | 8 ++++----
.../kafka/server/log/remote/storage/RemoteLogManagerTest.java | 4 ++--
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 47b5417e25a..a9b2c67ba79 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -2038,9 +2038,6 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
- Utils.closeQuietly(remoteStorageManagerPlugin,
"remoteStorageManagerPlugin");
- Utils.closeQuietly(remoteLogMetadataManagerPlugin,
"remoteLogMetadataManagerPlugin");
- Utils.closeQuietly(indexCache, "RemoteIndexCache");
rlmCopyThreadPool.close();
rlmExpirationThreadPool.close();
@@ -2050,10 +2047,13 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
} finally {
removeMetrics();
}
-
leaderCopyRLMTasks.clear();
leaderExpirationRLMTasks.clear();
followerRLMTasks.clear();
+
+ Utils.closeQuietly(indexCache, "RemoteIndexCache");
+ Utils.closeQuietly(remoteLogMetadataManagerPlugin,
"remoteLogMetadataManagerPlugin");
+ Utils.closeQuietly(remoteStorageManagerPlugin,
"remoteStorageManagerPlugin");
closed = true;
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 5201511e75d..b45b1118f5c 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -1769,9 +1769,9 @@ public class RemoteLogManagerTest {
void testIdempotentClose() throws IOException {
remoteLogManager.close();
remoteLogManager.close();
- InOrder inorder = inOrder(remoteStorageManager,
remoteLogMetadataManager);
- inorder.verify(remoteStorageManager, times(1)).close();
+ InOrder inorder = inOrder(remoteLogMetadataManager,
remoteStorageManager);
inorder.verify(remoteLogMetadataManager, times(1)).close();
+ inorder.verify(remoteStorageManager, times(1)).close();
}
@Test