This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new cd35cc4011f KAFKA-19697 NPE Cannot invoke
org.apache.kafka.connect.runtime.ConnectMetrics$MetricGroup.close() (#21717)
cd35cc4011f is described below
commit cd35cc4011f0eb22bec48ac9430036f769dfc372
Author: Ken Huang <[email protected]>
AuthorDate: Mon Mar 16 23:33:54 2026 +0800
KAFKA-19697 NPE Cannot invoke
org.apache.kafka.connect.runtime.ConnectMetrics$MetricGroup.close() (#21717)
The PR fixed an NPE when connectorStatusMetrics unregistering task. The
issue will happen if a connector has more than 1 task on one worker and
these tasks fail to start. For example, if the DNS cant resolve producer
URL, tasks will fail to build. If these tasks failing to start,
ConnectorStatusMetricsGroup will unregister them. After first task
unregistered , connectorStatusMetrics removed connector. When second
task calls `recordTaskRemoved()`, connectorStatusMetrics is empty, so
`connectorStatusMetrics.get(connectorTaskId.connector()).close();` will
throw NPE. Like in method `recordTaskAdded()`, this fix checks
connectorStatusMetrics at the beginning of method `recordTaskRemoved()`.
Co-authored-by: Fan Yang <[email protected]>
Reviewers: Ding <[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/connect/runtime/Worker.java | 4 +--
.../apache/kafka/connect/runtime/WorkerTest.java | 30 ++++++++++++++++++++++
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 2021d63f320..1ee6f925f4b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -2397,8 +2397,8 @@ public final class Worker {
protected synchronized void recordTaskRemoved(ConnectorTaskId
connectorTaskId) {
// Unregister connector task count metric if we remove the last
task of the connector
if (tasks.keySet().stream().noneMatch(id ->
id.connector().equals(connectorTaskId.connector()))) {
-
connectorStatusMetrics.get(connectorTaskId.connector()).close();
- connectorStatusMetrics.remove(connectorTaskId.connector());
+ MetricGroup metricGroup =
connectorStatusMetrics.remove(connectorTaskId.connector());
+ if (metricGroup != null) metricGroup.close();
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 64b6757f27d..ecc08f6159d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -147,6 +147,7 @@ import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
import static
org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -897,6 +898,35 @@ public class WorkerTest {
verifyKafkaClusterId();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean
enableTopicCreation) {
+ setup(enableTopicCreation);
+ mockKafkaClusterId();
+ mockInternalConverters();
+ mockFileConfigProvider();
+
+ worker = new Worker(WORKER_ID,
+ new MockTime(),
+ plugins,
+ config,
+ offsetBackingStore,
+ noneConnectorClientConfigOverridePolicy);
+ worker.herder = herder;
+
+ // Pass an empty tasks map to simulate all tasks failing to start
+ Worker.ConnectorStatusMetricsGroup metricsGroup = new
Worker.ConnectorStatusMetricsGroup(
+ worker.metrics(), new ConcurrentHashMap<>(), herder
+ );
+
+ ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
+ ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
+ metricsGroup.recordTaskAdded(taskId1);
+ metricsGroup.recordTaskAdded(taskId2);
+ metricsGroup.recordTaskRemoved(taskId1);
+ assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2),
"should not throw NPE");
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testStartTaskFailure(boolean enableTopicCreation) {