This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a332a845630 MINOR: Various code cleanups in server (#22021)
a332a845630 is described below
commit a332a8456309a9fe801f8a1e0d1d229dac9bccf0
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Apr 11 10:57:09 2026 +0200
MINOR: Various code cleanups in server (#22021)
Small Java cleanups fixing logging and types
Reviewers: Andrew Schofield <[email protected]>
---
.../main/java/org/apache/kafka/server/BrokerLifecycleManager.java | 4 ++--
.../src/main/java/org/apache/kafka/server/DelayedActionQueue.java | 2 +-
.../java/org/apache/kafka/server/share/session/ShareSession.java | 8 ++++----
.../org/apache/kafka/server/share/session/ShareSessionCache.java | 4 ++--
.../org/apache/kafka/server/share/session/ShareSessionTest.java | 4 ++--
5 files changed, 11 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
index 0ce78168704..f1a88a8fb9a 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -570,10 +570,10 @@ public class BrokerLifecycleManager {
return;
}
if (response.authenticationException() != null) {
- logger.error("Unable to register broker $nodeId because of an
authentication exception.", response.authenticationException());
+ logger.error("Unable to register broker {} because of an
authentication exception.", nodeId, response.authenticationException());
scheduleNextCommunicationAfterFailure();
} else if (response.versionMismatch() != null) {
- logger.error("Unable to register broker $nodeId because of an
API version problem.", response.versionMismatch());
+ logger.error("Unable to register broker {} because of an API
version problem.", nodeId, response.versionMismatch());
scheduleNextCommunicationAfterFailure();
} else if (response.responseBody() == null) {
logger.warn("Unable to register broker {}.", nodeId);
diff --git
a/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
index cf00dc1dcba..9703908a680 100644
--- a/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
+++ b/server/src/main/java/org/apache/kafka/server/DelayedActionQueue.java
@@ -43,7 +43,7 @@ public class DelayedActionQueue implements ActionQueue {
Runnable action = queue.poll();
if (action == null) return;
action.run();
- } catch (Throwable e) {
+ } catch (Exception e) {
LOGGER.error("failed to complete delayed actions", e);
}
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
index 9df63c10b41..284f2abdfa2 100644
---
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
+++
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java
@@ -89,7 +89,7 @@ public class ShareSession {
return partitionMap.size();
}
- public synchronized Boolean isEmpty() {
+ public synchronized boolean isEmpty() {
return partitionMap.isEmpty();
}
@@ -137,11 +137,11 @@ public class ShareSession {
return previousSize != -1 ? cachedSize - previousSize : cachedSize;
}
- public static String partitionsToLogString(Collection<TopicIdPartition>
partitions, Boolean traceEnabled) {
+ public static String partitionsToLogString(Collection<TopicIdPartition>
partitions, boolean traceEnabled) {
if (traceEnabled) {
- return String.format("( %s )", String.join(", ",
partitions.toString()));
+ return partitions.toString();
}
- return String.format("%s partition(s)", partitions.size());
+ return partitions.size() + " partition(s)";
}
public String toString() {
diff --git
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
index 29a33146543..964836058c6 100644
---
a/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
+++
b/server/src/main/java/org/apache/kafka/server/share/session/ShareSessionCache.java
@@ -93,10 +93,10 @@ public class ShareSessionCache {
* Get a session by session key.
*
* @param key The share session key.
- * @return The session, or None if no such session was found.
+ * @return The session, or null if no such session was found.
*/
public synchronized ShareSession get(ShareSessionKey key) {
- return sessions.getOrDefault(key, null);
+ return sessions.get(key);
}
/**
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
index 8ce4cfaa418..716cf6576a1 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
@@ -39,7 +39,7 @@ public class ShareSessionTest {
assertEquals("2 partition(s)", response);
response = ShareSession.partitionsToLogString(partitions, true);
- assertEquals(String.format("( [%s:foo-0, %s:bar-1] )", uuid1, uuid2),
response);
+ assertEquals(String.format("[%s:foo-0, %s:bar-1]", uuid1, uuid2),
response);
}
@Test
@@ -48,6 +48,6 @@ public class ShareSessionTest {
assertEquals("0 partition(s)", response);
response = ShareSession.partitionsToLogString(List.of(), true);
- assertEquals("( [] )", response);
+ assertEquals("[]", response);
}
}