This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf429241268 KAFKA-19572: Added check to prevent NPE logs during 
ShareConsumer::close (#20290)
bf429241268 is described below

commit bf429241268378ea41f3ffc3fb073aba495660fb
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Aug 7 15:33:28 2025 -0400

    KAFKA-19572: Added check to prevent NPE logs during ShareConsumer::close 
(#20290)
    
    *What*
    https://issues.apache.org/jira/browse/KAFKA-19572
    
    - If a `ShareConsumer` constructor failed due to any exception, then we
    call `close()` in the catch block.
    
    - If there were uninitialized members accessed during `close()`, then it
    would throw a NPE. Currently there are no null checks, hence we were
    attempting to use these fields during `close()` execution.
    
    - To avoid this, PR adds null checks in the `close()` function before we
    access fields which possibly could be null.
    
    Reviewers: Apoorv Mittal <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../kafka/clients/consumer/internals/ShareConsumerImpl.java    | 10 ++++++++++
 .../clients/consumer/internals/ShareConsumerImplTest.java      | 10 ++++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 33309ffb63d..32663249e7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -928,6 +928,9 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     }
 
     private void stopFindCoordinatorOnClose() {
+        if (applicationEventHandler == null) {
+            return;
+        }
         log.debug("Stop finding coordinator during consumer close");
         applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
     }
@@ -944,6 +947,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * 2. leave the group
      */
     private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final 
AtomicReference<Throwable> firstException) {
+        if (applicationEventHandler == null || backgroundEventProcessor == 
null ||
+            backgroundEventReaper == null || backgroundEventQueue == null) {
+            return;
+        }
         completeQuietly(
                 () -> applicationEventHandler.addAndGet(new 
ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(), 
calculateDeadlineMs(timer))),
                 "Failed to send pending acknowledgements with a timeout(ms)=" 
+ timer.timeoutMs(), firstException);
@@ -1035,6 +1042,9 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * If the acknowledgement commit callback throws an exception, this method 
will throw an exception.
      */
     private void handleCompletedAcknowledgements(boolean onClose) {
+        if (backgroundEventQueue == null || backgroundEventReaper == null || 
backgroundEventProcessor == null) {
+            return;
+        }
         // If the user gets any fatal errors, they will get these exceptions 
in the background queue.
         // While closing, we ignore these exceptions so that the consumers 
close successfully.
         processBackgroundEvents(onClose ? e -> (e instanceof 
GroupAuthorizationException
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6880ad16b8d..09fc99d8e24 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -207,11 +209,19 @@ public class ShareConsumerImplTest {
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
         props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"an.invalid.class");
         final ConsumerConfig config = new ConsumerConfig(props);
+
+        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
         KafkaException ce = assertThrows(
                 KafkaException.class,
                 () -> newConsumer(config));
         assertTrue(ce.getMessage().contains("Failed to construct Kafka share 
consumer"), "Unexpected exception message: " + ce.getMessage());
         assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class 
cannot be found"), "Unexpected cause: " + ce.getCause());
+
+        boolean npeLogged = appender.getEvents().stream()
+                .flatMap(event -> event.getThrowableInfo().stream())
+                .anyMatch(str -> str.contains("NullPointerException"));
+
+        assertFalse(npeLogged, "Unexpected NullPointerException during 
consumer construction");
     }
 
     @Test

Reply via email to