This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d0e0ec478ca KAFKA-20312: Handle null leader during OffsetFetcher
regroup safely (#21760)
d0e0ec478ca is described below
commit d0e0ec478ca736895d4ff418002e93f1ff16ebe8
Author: nileshkumar3 <[email protected]>
AuthorDate: Mon Apr 27 07:11:19 2026 -0500
KAFKA-20312: Handle null leader during OffsetFetcher regroup safely (#21760)
Description:
This PR fixes a potential NullPointerException in
OffsetFetcherUtils.regroupPartitionMapByNode when regrouping partitions
by leader during offset reset / list-offsets.
Background
Partitions are grouped by leader via metadata.fetch().leaderFor(tp). If
metadata changes between the initial leader lookup and the regroup step
(e.g. leadership change or stale metadata), leaderFor(tp) can return
null. The previous implementation used Collectors.groupingBy(...,
leaderFor(...)), which throws an NPE when the classifier returns null.
Fix
OffsetFetcherUtils.regroupPartitionMapByNode Replaced the stream-based
grouping with a loop that skips partitions whose leader is null, adds
them to a caller-provided partitionsToRetry set, and does not trigger
metadata refresh (callers are responsible for retry and metadata).
Callers
OffsetFetcher (classic consumer): passes partitionsToRetry into the
helper; in resetPositionsAsync, when the set is non-empty, calls
setNextAllowedRetry(partitionsToRetry, now + retryBackoffMs) and
metadata.requestUpdate(false). OffsetsRequestManager (new consumer):
passes a local retry set into the helper, then adds skipped partitions
to state.remainingToSearch (with timestamp) and calls
metadata.requestUpdate(false) when the set is non-empty. This keeps
existing retry semantics and avoids the NPE.
Tests
OffsetFetcherTest.testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup
Simulates leaderFor(tp) returning null during regroup (first
metadata.fetch() stubbed to a cluster with no partition, then real
method). Asserts no exception, partition stays pending reset, and after
backoff and a second attempt with valid metadata the offset reset
succeeds.
OffsetsRequestManagerTest.testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE
Simulates the same scenario in the fetch-offsets path: currentLeader has
a leader but metadata.fetch() returns a cluster where one partition has
no leader. Asserts no NPE, one request sent (for the partition with a
leader), and that the skipped partition is retried after metadata update
and completes successfully.
Reviewers: TengYao Chi <[email protected]>
---------
Co-authored-by: TengYao Chi <[email protected]>
---
.../clients/consumer/internals/OffsetFetcher.java | 8 ++-
.../consumer/internals/OffsetFetcherUtils.java | 24 +++++++--
.../consumer/internals/OffsetsRequestManager.java | 11 +++-
.../consumer/internals/OffsetFetcherTest.java | 59 +++++++++++++++++++++
.../internals/OffsetsRequestManagerTest.java | 60 ++++++++++++++++++++++
5 files changed, 154 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
index 446f39686de..b97a4a3cde7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java
@@ -247,8 +247,12 @@ public class OffsetFetcher {
private void resetPositionsAsync(Map<TopicPartition,
AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> partitionResetTimestamps =
partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().timestamp().get()));
+ Set<TopicPartition> partitionsToRetry = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetsPartition>>
timestampsToSearchByNode =
- groupListOffsetRequests(partitionResetTimestamps, new
HashSet<>());
+ groupListOffsetRequests(partitionResetTimestamps,
partitionsToRetry);
+ if (!partitionsToRetry.isEmpty()) {
+ metadata.requestUpdate(false);
+ }
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry
: timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetsPartition> resetTimestamps =
entry.getValue();
@@ -413,7 +417,7 @@ public class OffsetFetcher {
}
}
}
- return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
+ return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap,
partitionsToRetry);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
index d2277882277..7a3d535782f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java
@@ -165,11 +165,25 @@ class OffsetFetcherUtils {
return new OffsetFetcherUtils.ListOffsetResult(fetchedOffsets,
partitionsToRetry);
}
- <T> Map<Node, Map<TopicPartition, T>>
regroupPartitionMapByNode(Map<TopicPartition, T> partitionMap) {
- return partitionMap.entrySet()
- .stream()
- .collect(Collectors.groupingBy(entry ->
metadata.fetch().leaderFor(entry.getKey()),
- Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(
+ Map<TopicPartition, T> partitionMap,
+ Set<TopicPartition> partitionsToRetry) {
+ Map<Node, Map<TopicPartition, T>> partitionsByNode = new HashMap<>();
+
+ final var cluster = metadata.fetch();
+
+ partitionMap.forEach((tp, value) -> {
+ Node leader = cluster.leaderFor(tp);
+ if (leader == null) {
+ log.debug("Leader for partition {} is unknown while regrouping
partition map by node", tp);
+ partitionsToRetry.add(tp);
+ return;
+ }
+ partitionsByNode.computeIfAbsent(leader, __ -> new HashMap<>())
+ .put(tp, value);
+ });
+
+ return partitionsByNode;
}
Map<TopicPartition, SubscriptionState.FetchPosition>
refreshAndGetPartitionsToValidate() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 4574bef9442..23d95811555 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -911,7 +911,16 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
.setCurrentLeaderEpoch(currentLeaderEpoch));
}
}
- return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
+ Set<TopicPartition> partitionsSkippedInRegroup = new HashSet<>();
+ Map<Node, Map<TopicPartition,
ListOffsetsRequestData.ListOffsetsPartition>> result =
+ offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap,
partitionsSkippedInRegroup);
+ if (!partitionsSkippedInRegroup.isEmpty()) {
+ metadata.requestUpdate(false);
+ listOffsetsRequestState.ifPresent(state ->
+ partitionsSkippedInRegroup.forEach(tp ->
+ state.remainingToSearch.put(tp,
timestampsToSearch.get(tp))));
+ }
+ return result;
}
// Visible for testing
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 4f917b66167..a62a9145bd7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -62,6 +62,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
import java.time.Duration;
import java.time.Instant;
@@ -94,6 +95,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class OffsetFetcherTest {
@@ -350,6 +352,63 @@ public class OffsetFetcherTest {
assertEquals(5, subscriptions.position(tp0).offset);
}
+ /**
+ * Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition
with null leader
+ * (e.g. leader was known when building partitionDataMap but metadata
changed before regroup),
+ * we should not throw NPE; partition is skipped and retried after backoff.
+ */
+ @Test
+ public void
testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup() {
+ buildFetcher();
+ assignFromUser(singleton(tp0));
+ subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST);
+
+ // Cluster with no partition info so leaderFor(tp0) returns null
during regroup
+ Cluster clusterWithNoLeader = new Cluster(
+ "mockClusterId",
+ Collections.singletonList(new Node(0, "localhost", 9092)),
+ Collections.emptyList(),
+ Collections.emptySet(),
+ Collections.emptySet());
+
+ // First fetch() (during regroup) returns cluster with no leader;
subsequent calls use real metadata
+ ConsumerMetadata metadataSpy = spy(metadata);
+
when(metadataSpy.fetch()).thenReturn(clusterWithNoLeader).thenAnswer(InvocationOnMock::callRealMethod);
+
+ LogContext logContext = new LogContext();
+ offsetFetcher = new OffsetFetcher(logContext,
+ consumerClient,
+ metadataSpy,
+ subscriptions,
+ time,
+ retryBackoffMs,
+ requestTimeoutMs,
+ IsolationLevel.READ_UNCOMMITTED,
+ apiVersions);
+
+ offsetFetcher.resetPositionsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ // Should not crash; partition still needs reset (skipped in regroup)
+ assertTrue(subscriptions.isOffsetResetNeeded(tp0));
+ assertFalse(subscriptions.hasValidPosition(tp0));
+
+ // Metadata refresh, then retry after backoff with valid metadata and
successful response
+ client.prepareMetadataUpdate(initialUpdateResponse);
+ consumerClient.pollNoWakeup();
+
+ time.sleep(retryBackoffMs);
+
client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP,
validLeaderEpoch),
+ listOffsetResponse(Errors.NONE, 1L, 5L));
+
+ offsetFetcher.resetPositionsIfNeeded();
+ consumerClient.pollNoWakeup();
+
+ assertFalse(subscriptions.isOffsetResetNeeded(tp0));
+ assertTrue(subscriptions.isFetchable(tp0));
+ assertEquals(5L, subscriptions.position(tp0).offset);
+ }
+
@Test
public void testListOffsetNoUpdateMissingEpoch() {
buildFetcher();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 1da988864e5..309fd938620 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -258,6 +258,66 @@ public class OffsetsRequestManagerTest {
verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture,
expectedOffsets);
}
+ /**
+ * Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition
with null leader
+ * (e.g. metadata race), it should skip that partition and add it to
remainingToSearch instead
+ * of throwing NullPointerException.
+ */
+ @Test
+ public void testFetchOffsetsRegroupSkipsNullLeaderPartitionNoNPE() throws
ExecutionException,
+ InterruptedException {
+ Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
+ timestampsToSearch.put(TEST_PARTITION_1,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
+ timestampsToSearch.put(TEST_PARTITION_2,
ListOffsetsRequest.EARLIEST_TIMESTAMP);
+
+ // currentLeader returns a leader for both partitions (so both enter
partitionDataMap)
+
when(metadata.currentLeader(TEST_PARTITION_1)).thenReturn(testLeaderEpoch(LEADER_1,
Optional.empty()));
+
when(metadata.currentLeader(TEST_PARTITION_2)).thenReturn(testLeaderEpoch(LEADER_2,
Optional.empty()));
+
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);
+
+ // metadata.fetch() returns a cluster where PARTITION_2 has null
leader (e.g. race: leader lost)
+ List<PartitionInfo> partitions = new ArrayList<>();
+ partitions.add(new PartitionInfo(TEST_TOPIC, 1, LEADER_1, null, null));
+ partitions.add(new PartitionInfo(TEST_TOPIC, 2, null, null, null));
+ Cluster clusterWithNullLeader = new Cluster("clusterId",
Collections.singletonList(LEADER_1),
+ partitions, Collections.emptySet(), Collections.emptySet());
+ when(metadata.fetch()).thenReturn(clusterWithNullLeader);
+
+ CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>>
fetchOffsetsFuture =
+ assertDoesNotThrow(
+ () -> requestManager.fetchOffsets(timestampsToSearch,
false),
+ "Should not throw NPE; only PARTITION_1 has a leader
in regroup, so one request for LEADER_1");
+ assertEquals(1, requestManager.requestsToSend());
+ // requestsToRetry is populated when the in-flight request completes
and remainingToSearch is non-empty, not yet
+ assertEquals(0, requestManager.requestsToRetry());
+
+ // Complete request for PARTITION_1
+ NetworkClientDelegate.PollResult res =
requestManager.poll(time.milliseconds());
+ assertEquals(1, res.unsentRequests.size());
+ NetworkClientDelegate.UnsentRequest unsentRequest =
res.unsentRequests.get(0);
+ ClientResponse clientResponse = buildClientResponse(unsentRequest,
+ Collections.singletonMap(TEST_PARTITION_1, new
OffsetAndTimestampInternal(5L, -1, Optional.empty())));
+ clientResponse.onComplete();
+ assertFalse(fetchOffsetsFuture.isDone());
+
+ // Metadata update: now both partitions have leaders; retry should
send request for PARTITION_2
+ mockSuccessfulRequest(Map.of(TEST_PARTITION_1, LEADER_1,
TEST_PARTITION_2, LEADER_2));
+ requestManager.onUpdate(new ClusterResource(""));
+ assertEquals(1, requestManager.requestsToSend());
+
+ // Complete the retry request (only PARTITION_2 in this batch)
+ NetworkClientDelegate.PollResult retryPoll =
requestManager.poll(time.milliseconds());
+ assertEquals(1, retryPoll.unsentRequests.size());
+ ClientResponse retryResponse =
buildClientResponse(retryPoll.unsentRequests.get(0),
+ Collections.singletonMap(TEST_PARTITION_2, new
OffsetAndTimestampInternal(10L, -1, Optional.empty())));
+ retryResponse.onComplete();
+
+ Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = new
HashMap<>();
+ expectedOffsets.put(TEST_PARTITION_1, new
OffsetAndTimestampInternal(5L, -1, Optional.empty()));
+ expectedOffsets.put(TEST_PARTITION_2, new
OffsetAndTimestampInternal(10L, -1, Optional.empty()));
+ verifyRequestSuccessfullyCompleted(fetchOffsetsFuture,
expectedOffsets);
+ }
+
@ParameterizedTest
@MethodSource("retriableErrors")
public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error)
throws ExecutionException, InterruptedException {