This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0e0ec478ca KAFKA-20312: Handle null leader during OffsetFetcher 
regroup safely (#21760)
d0e0ec478ca is described below

commit d0e0ec478ca736895d4ff418002e93f1ff16ebe8
Author: nileshkumar3 <[email protected]>
AuthorDate: Mon Apr 27 07:11:19 2026 -0500

    KAFKA-20312: Handle null leader during OffsetFetcher regroup safely (#21760)
    
    Description:
    
    This PR fixes a potential NullPointerException in
    OffsetFetcherUtils.regroupPartitionMapByNode when regrouping partitions
    by leader during offset reset / list-offsets.
    
    Background
    
    Partitions are grouped by leader via metadata.fetch().leaderFor(tp). If
    metadata changes between the initial leader lookup and the regroup step
    (e.g. leadership change or stale metadata), leaderFor(tp) can return
    null. The previous implementation used Collectors.groupingBy(...,
    leaderFor(...)), which throws an NPE when the classifier returns null.
    
    Fix
    
    OffsetFetcherUtils.regroupPartitionMapByNode  Replaced the stream-based
    grouping with a loop that skips partitions whose leader is null, adds
    them to a caller-provided partitionsToRetry set, and does not trigger
    metadata refresh (callers are responsible for retry and metadata).
    
    Callers
    
    OffsetFetcher (classic consumer): passes partitionsToRetry into the
    helper; in resetPositionsAsync, when the set is non-empty, calls
    setNextAllowedRetry(partitionsToRetry, now + retryBackoffMs) and
    metadata.requestUpdate(false).  OffsetsRequestManager (new consumer):
    passes a local retry set into the helper, then adds skipped partitions
    to state.remainingToSearch (with timestamp) and calls
    metadata.requestUpdate(false) when the set is non-empty.  This keeps
    existing retry semantics and avoids the NPE.
    
    Tests
    
    
    
OffsetFetcherTest.testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup
    Simulates leaderFor(tp) returning null during regroup (first
    metadata.fetch() stubbed to a cluster with no partition, then real
    method). Asserts no exception, partition stays pending reset, and after
    backoff and a second attempt with valid metadata the offset reset
    succeeds.
    
    
    
OffsetsRequestManagerTest.testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE
    Simulates the same scenario in the fetch-offsets path: currentLeader has
    a leader but metadata.fetch() returns a cluster where one partition has
    no leader. Asserts no NPE, one request sent (for the partition with a
    leader), and that the skipped partition is retried after metadata update
    and completes successfully.
    
    Reviewers: TengYao Chi <[email protected]>
    
    ---------
    
    Co-authored-by: TengYao Chi <[email protected]>
---
 .../clients/consumer/internals/OffsetFetcher.java  |  8 ++-
 .../consumer/internals/OffsetFetcherUtils.java     | 24 +++++++--
 .../consumer/internals/OffsetsRequestManager.java  | 11 +++-
 .../consumer/internals/OffsetFetcherTest.java      | 59 +++++++++++++++++++++
 .../internals/OffsetsRequestManagerTest.java       | 60 ++++++++++++++++++++++
 5 files changed, 154 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index 446f39686de..b97a4a3cde7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -247,8 +247,12 @@ public class OffsetFetcher {
     private void resetPositionsAsync(Map<TopicPartition, 
AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
         Map<TopicPartition, Long> partitionResetTimestamps = 
partitionAutoOffsetResetStrategyMap.entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().timestamp().get()));
+        Set<TopicPartition> partitionsToRetry = new HashSet<>();
         Map<Node, Map<TopicPartition, ListOffsetsPartition>> 
timestampsToSearchByNode =
-                groupListOffsetRequests(partitionResetTimestamps, new 
HashSet<>());
+                groupListOffsetRequests(partitionResetTimestamps, 
partitionsToRetry);
+        if (!partitionsToRetry.isEmpty()) {
+            metadata.requestUpdate(false);
+        }
         for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry 
: timestampsToSearchByNode.entrySet()) {
             Node node = entry.getKey();
             final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = 
entry.getValue();
@@ -413,7 +417,7 @@ public class OffsetFetcher {
                 }
             }
         }
-        return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
+        return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, 
partitionsToRetry);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index d2277882277..7a3d535782f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -165,11 +165,25 @@ class OffsetFetcherUtils {
             return new OffsetFetcherUtils.ListOffsetResult(fetchedOffsets, 
partitionsToRetry);
     }
 
-    <T> Map<Node, Map<TopicPartition, T>> 
regroupPartitionMapByNode(Map<TopicPartition, T> partitionMap) {
-        return partitionMap.entrySet()
-                .stream()
-                .collect(Collectors.groupingBy(entry -> 
metadata.fetch().leaderFor(entry.getKey()),
-                        Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+    <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(
+            Map<TopicPartition, T> partitionMap,
+            Set<TopicPartition> partitionsToRetry) {
+        Map<Node, Map<TopicPartition, T>> partitionsByNode = new HashMap<>();
+
+        final var cluster = metadata.fetch();
+
+        partitionMap.forEach((tp, value) -> {
+            Node leader = cluster.leaderFor(tp);
+            if (leader == null) {
+                log.debug("Leader for partition {} is unknown while regrouping 
partition map by node", tp);
+                partitionsToRetry.add(tp);
+                return;
+            }
+            partitionsByNode.computeIfAbsent(leader, __ -> new HashMap<>())
+                .put(tp, value);
+        });
+
+        return partitionsByNode;
     }
 
     Map<TopicPartition, SubscriptionState.FetchPosition> 
refreshAndGetPartitionsToValidate() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 4574bef9442..23d95811555 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -911,7 +911,16 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
                         .setCurrentLeaderEpoch(currentLeaderEpoch));
             }
         }
-        return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
+        Set<TopicPartition> partitionsSkippedInRegroup = new HashSet<>();
+        Map<Node, Map<TopicPartition, 
ListOffsetsRequestData.ListOffsetsPartition>> result =
+                offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, 
partitionsSkippedInRegroup);
+        if (!partitionsSkippedInRegroup.isEmpty()) {
+            metadata.requestUpdate(false);
+            listOffsetsRequestState.ifPresent(state ->
+                    partitionsSkippedInRegroup.forEach(tp ->
+                            state.remainingToSearch.put(tp, 
timestampsToSearch.get(tp))));
+        }
+        return result;
     }
 
     // Visible for testing
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 4f917b66167..a62a9145bd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -94,6 +95,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 public class OffsetFetcherTest {
@@ -350,6 +352,63 @@ public class OffsetFetcherTest {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    /**
+     * Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition 
with null leader
+     * (e.g. leader was known when building partitionDataMap but metadata 
changed before regroup),
+     * we should not throw NPE; partition is skipped and retried after backoff.
+     */
+    @Test
+    public void 
testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST);
+
+        // Cluster with no partition info so leaderFor(tp0) returns null 
during regroup
+        Cluster clusterWithNoLeader = new Cluster(
+                "mockClusterId",
+                Collections.singletonList(new Node(0, "localhost", 9092)),
+                Collections.emptyList(),
+                Collections.emptySet(),
+                Collections.emptySet());
+
+        // First fetch() (during regroup) returns cluster with no leader; 
subsequent calls use real metadata
+        ConsumerMetadata metadataSpy = spy(metadata);
+        
when(metadataSpy.fetch()).thenReturn(clusterWithNoLeader).thenAnswer(InvocationOnMock::callRealMethod);
+
+        LogContext logContext = new LogContext();
+        offsetFetcher = new OffsetFetcher(logContext,
+                consumerClient,
+                metadataSpy,
+                subscriptions,
+                time,
+                retryBackoffMs,
+                requestTimeoutMs,
+                IsolationLevel.READ_UNCOMMITTED,
+                apiVersions);
+
+        offsetFetcher.resetPositionsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        // Should not crash; partition still needs reset (skipped in regroup)
+        assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+        assertFalse(subscriptions.hasValidPosition(tp0));
+
+        // Metadata refresh, then retry after backoff with valid metadata and 
successful response
+        client.prepareMetadataUpdate(initialUpdateResponse);
+        consumerClient.pollNoWakeup();
+
+        time.sleep(retryBackoffMs);
+        
client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP,
 validLeaderEpoch),
+                listOffsetResponse(Errors.NONE, 1L, 5L));
+
+        offsetFetcher.resetPositionsIfNeeded();
+        consumerClient.pollNoWakeup();
+
+        assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+        assertTrue(subscriptions.isFetchable(tp0));
+        assertEquals(5L, subscriptions.position(tp0).offset);
+    }
+
     @Test
     public void testListOffsetNoUpdateMissingEpoch() {
         buildFetcher();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 1da988864e5..309fd938620 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -258,6 +258,66 @@ public class OffsetsRequestManagerTest {
         verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, 
expectedOffsets);
     }
 
+    /**
+     * Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition 
with null leader
+     * (e.g. metadata race), it should skip that partition and add it to 
remainingToSearch instead
+     * of throwing NullPointerException.
+     */
+    @Test
+    public void testFetchOffsetsRegroupSkipsNullLeaderPartitionNoNPE() throws 
ExecutionException,
+            InterruptedException {
+        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+        timestampsToSearch.put(TEST_PARTITION_1, 
ListOffsetsRequest.EARLIEST_TIMESTAMP);
+        timestampsToSearch.put(TEST_PARTITION_2, 
ListOffsetsRequest.EARLIEST_TIMESTAMP);
+
+        // currentLeader returns a leader for both partitions (so both enter 
partitionDataMap)
+        
when(metadata.currentLeader(TEST_PARTITION_1)).thenReturn(testLeaderEpoch(LEADER_1,
 Optional.empty()));
+        
when(metadata.currentLeader(TEST_PARTITION_2)).thenReturn(testLeaderEpoch(LEADER_2,
 Optional.empty()));
+        
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
+
+        // metadata.fetch() returns a cluster where PARTITION_2 has null 
leader (e.g. race: leader lost)
+        List<PartitionInfo> partitions = new ArrayList<>();
+        partitions.add(new PartitionInfo(TEST_TOPIC, 1, LEADER_1, null, null));
+        partitions.add(new PartitionInfo(TEST_TOPIC, 2, null, null, null));
+        Cluster clusterWithNullLeader = new Cluster("clusterId", 
Collections.singletonList(LEADER_1),
+                partitions, Collections.emptySet(), Collections.emptySet());
+        when(metadata.fetch()).thenReturn(clusterWithNullLeader);
+
+        CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> 
fetchOffsetsFuture =
+                assertDoesNotThrow(
+                        () -> requestManager.fetchOffsets(timestampsToSearch, 
false),
+                        "Should not throw NPE; only PARTITION_1 has a leader 
in regroup, so one request for LEADER_1");
+        assertEquals(1, requestManager.requestsToSend());
+        // requestsToRetry is populated when the in-flight request completes 
and remainingToSearch is non-empty, not yet
+        assertEquals(0, requestManager.requestsToRetry());
+
+        // Complete request for PARTITION_1
+        NetworkClientDelegate.PollResult res = 
requestManager.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
+        NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
+        ClientResponse clientResponse = buildClientResponse(unsentRequest,
+                Collections.singletonMap(TEST_PARTITION_1, new 
OffsetAndTimestampInternal(5L, -1, Optional.empty())));
+        clientResponse.onComplete();
+        assertFalse(fetchOffsetsFuture.isDone());
+
+        // Metadata update: now both partitions have leaders; retry should 
send request for PARTITION_2
+        mockSuccessfulRequest(Map.of(TEST_PARTITION_1, LEADER_1, 
TEST_PARTITION_2, LEADER_2));
+        requestManager.onUpdate(new ClusterResource(""));
+        assertEquals(1, requestManager.requestsToSend());
+
+        // Complete the retry request (only PARTITION_2 in this batch)
+        NetworkClientDelegate.PollResult retryPoll = 
requestManager.poll(time.milliseconds());
+        assertEquals(1, retryPoll.unsentRequests.size());
+        ClientResponse retryResponse = 
buildClientResponse(retryPoll.unsentRequests.get(0),
+                Collections.singletonMap(TEST_PARTITION_2, new 
OffsetAndTimestampInternal(10L, -1, Optional.empty())));
+        retryResponse.onComplete();
+
+        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = new 
HashMap<>();
+        expectedOffsets.put(TEST_PARTITION_1, new 
OffsetAndTimestampInternal(5L, -1, Optional.empty()));
+        expectedOffsets.put(TEST_PARTITION_2, new 
OffsetAndTimestampInternal(10L, -1, Optional.empty()));
+        verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, 
expectedOffsets);
+    }
+
     @ParameterizedTest
     @MethodSource("retriableErrors")
     public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) 
throws ExecutionException, InterruptedException {

Reply via email to