This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf429241268 KAFKA-19572: Added check to prevent NPE logs during
ShareConsumer::close (#20290)
bf429241268 is described below
commit bf429241268378ea41f3ffc3fb073aba495660fb
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Aug 7 15:33:28 2025 -0400
KAFKA-19572: Added check to prevent NPE logs during ShareConsumer::close
(#20290)
*What*
https://issues.apache.org/jira/browse/KAFKA-19572
- If a `ShareConsumer` constructor failed due to any exception, then we
call `close()` in the catch block.
- If there were uninitialized members accessed during `close()`, then it
would throw a NPE. Currently there are no null checks, hence we were
attempting to use these fields during `close()` execution.
- To avoid this, PR adds null checks in the `close()` function before we
access fields which possibly could be null.
Reviewers: Apoorv Mittal <[email protected]>, Lianet Magrans
<[email protected]>
---
.../kafka/clients/consumer/internals/ShareConsumerImpl.java | 10 ++++++++++
.../clients/consumer/internals/ShareConsumerImplTest.java | 10 ++++++++++
2 files changed, 20 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 33309ffb63d..32663249e7a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -928,6 +928,9 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
private void stopFindCoordinatorOnClose() {
+ if (applicationEventHandler == null) {
+ return;
+ }
log.debug("Stop finding coordinator during consumer close");
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
}
@@ -944,6 +947,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* 2. leave the group
*/
private void sendAcknowledgementsAndLeaveGroup(final Timer timer, final
AtomicReference<Throwable> firstException) {
+ if (applicationEventHandler == null || backgroundEventProcessor ==
null ||
+ backgroundEventReaper == null || backgroundEventQueue == null) {
+ return;
+ }
completeQuietly(
() -> applicationEventHandler.addAndGet(new
ShareAcknowledgeOnCloseEvent(acknowledgementsToSend(),
calculateDeadlineMs(timer))),
"Failed to send pending acknowledgements with a timeout(ms)="
+ timer.timeoutMs(), firstException);
@@ -1035,6 +1042,9 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* If the acknowledgement commit callback throws an exception, this method
will throw an exception.
*/
private void handleCompletedAcknowledgements(boolean onClose) {
+ if (backgroundEventQueue == null || backgroundEventReaper == null ||
backgroundEventProcessor == null) {
+ return;
+ }
// If the user gets any fatal errors, they will get these exceptions
in the background queue.
// While closing, we ignore these exceptions so that the consumers
close successfully.
processBackgroundEvents(onClose ? e -> (e instanceof
GroupAuthorizationException
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6880ad16b8d..09fc99d8e24 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -77,6 +78,7 @@ import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -207,11 +209,19 @@ public class ShareConsumerImplTest {
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
"an.invalid.class");
final ConsumerConfig config = new ConsumerConfig(props);
+
+ LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
KafkaException ce = assertThrows(
KafkaException.class,
() -> newConsumer(config));
assertTrue(ce.getMessage().contains("Failed to construct Kafka share
consumer"), "Unexpected exception message: " + ce.getMessage());
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class
cannot be found"), "Unexpected cause: " + ce.getCause());
+
+ boolean npeLogged = appender.getEvents().stream()
+ .flatMap(event -> event.getThrowableInfo().stream())
+ .anyMatch(str -> str.contains("NullPointerException"));
+
+ assertFalse(npeLogged, "Unexpected NullPointerException during
consumer construction");
}
@Test