This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new f631d2fa379 KAFKA-19697 NPE Cannot invoke 
org.apache.kafka.connect.runtime.ConnectMetrics$MetricGroup.close() (#21717)
f631d2fa379 is described below

commit f631d2fa37914ca63393568f00cf25ebf918c8dc
Author: Ken Huang <[email protected]>
AuthorDate: Mon Mar 16 23:33:54 2026 +0800

    KAFKA-19697 NPE Cannot invoke 
org.apache.kafka.connect.runtime.ConnectMetrics$MetricGroup.close() (#21717)
    
    The PR fixed an NPE when connectorStatusMetrics unregistering task. The
    issue will happen if a connector has more than 1 task on one worker and
    these tasks fail to start. For example, if the DNS cant resolve producer
    URL, tasks will fail to build. If these tasks failing to start,
    ConnectorStatusMetricsGroup will unregister them. After first task
    unregistered , connectorStatusMetrics removed connector. When second
    task calls `recordTaskRemoved()`, connectorStatusMetrics is empty, so
    `connectorStatusMetrics.get(connectorTaskId.connector()).close();` will
    throw NPE.  Like in method `recordTaskAdded()`, this fix checks
    connectorStatusMetrics at the beginning of method `recordTaskRemoved()`.
    
    Co-authored-by: Fan Yang <[email protected]>
    
    Reviewers: Ding <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  4 +--
 .../apache/kafka/connect/runtime/WorkerTest.java   | 30 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 591e9816a7a..430285c1796 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -2330,8 +2330,8 @@ public final class Worker {
         protected synchronized void recordTaskRemoved(ConnectorTaskId 
connectorTaskId) {
             // Unregister connector task count metric if we remove the last 
task of the connector
             if (tasks.keySet().stream().noneMatch(id -> 
id.connector().equals(connectorTaskId.connector()))) {
-                
connectorStatusMetrics.get(connectorTaskId.connector()).close();
-                connectorStatusMetrics.remove(connectorTaskId.connector());
+                MetricGroup metricGroup = 
connectorStatusMetrics.remove(connectorTaskId.connector());
+                if (metricGroup != null) metricGroup.close();
             }
         }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 65262983d9f..d5870f7643c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -145,6 +145,7 @@ import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.GRO
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
 import static org.apache.kafka.connect.sink.SinkTask.TOPICS_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -896,6 +897,35 @@ public class WorkerTest {
         verifyKafkaClusterId();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testConnectorStatusMetricsGroup_tasksFailedToStart(boolean 
enableTopicCreation) {
+        setup(enableTopicCreation);
+        mockKafkaClusterId();
+        mockInternalConverters();
+        mockFileConfigProvider();
+
+        worker = new Worker(WORKER_ID,
+                new MockTime(),
+                plugins,
+                config,
+                offsetBackingStore,
+                noneConnectorClientConfigOverridePolicy);
+        worker.herder = herder;
+
+        // Pass an empty tasks map to simulate all tasks failing to start
+        Worker.ConnectorStatusMetricsGroup metricsGroup = new 
Worker.ConnectorStatusMetricsGroup(
+                worker.metrics(), new ConcurrentHashMap<>(), herder
+        );
+
+        ConnectorTaskId taskId1 = new ConnectorTaskId("c1", 0);
+        ConnectorTaskId taskId2 = new ConnectorTaskId("c1", 1);
+        metricsGroup.recordTaskAdded(taskId1);
+        metricsGroup.recordTaskAdded(taskId2);
+        metricsGroup.recordTaskRemoved(taskId1);
+        assertDoesNotThrow(() -> metricsGroup.recordTaskRemoved(taskId2), 
"should not throw NPE");
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testStartTaskFailure(boolean enableTopicCreation) {

Reply via email to