This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 5610f3af0cd KAFKA-20165: Handle retriable partition errors gracefully 
on OffsetFetch (#21464)
5610f3af0cd is described below

commit 5610f3af0cd39377f5077cd2eef05965a89fd49e
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Apr 2 10:35:45 2026 -0400

    KAFKA-20165: Handle retriable partition errors gracefully on OffsetFetch 
(#21464)
    
    Fix to ensure consumer poll/committed do not fail fatally on retriable
    UNKNOWN_TOPIC_ID partition-level error when fetching committed offsets.
    
    This issue didn't happen before using topic IDs on offset fetch, because
    when fetching with topic names the GC returns no error for unknown
    partitions (just includes offset -1). But with topic IDs (OffsetFetch
    v10+), if a topic is not known to the GC, the OffsetFetch response
    includes UNKNOWN_TOPIC_ID as error.
    
    The fix is to let poll/committed retry while there is time. If the error
    does not recover, fetch offsets still returns gracefully (offsets for
    the partitions that have, null for the partitions that were deleted).
    This keeps the contract of the committed API, and ensures poll uses the
    committed offsets available (it will use partition offsets for
    partitions that still need positions)
    
    Added integration tests that fail without the fix, to show the gaps.
    
    Reviewers: David Jacot <[email protected]>, Nikita Shupletsov
     <[email protected]>
---
 .../consumer/PlaintextConsumerAssignTest.java      |  57 ++++++
 .../consumer/PlaintextConsumerCommitTest.java      |  61 ++++++
 .../consumer/internals/CommitRequestManager.java   | 208 ++++++++++++++++++---
 .../consumer/internals/OffsetsRequestManager.java  |   5 +-
 .../events/ApplicationEventProcessor.java          |  11 +-
 .../common/message/OffsetFetchResponse.json        |   1 +
 .../internals/CommitRequestManagerTest.java        |  40 ++--
 .../internals/OffsetsRequestManagerTest.java       |  20 +-
 .../events/ApplicationEventProcessorTest.java      |   6 +-
 9 files changed, 344 insertions(+), 65 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
index b12b6dacb4d..b1699851a18 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 
@@ -38,6 +39,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -298,6 +300,61 @@ public class PlaintextConsumerAssignTest {
         }
     }
 
+    @ClusterTest
+    public void testAsyncPollAfterTopicDeleted() throws Exception {
+        testPollAfterTopicDeleted(GroupProtocol.CONSUMER);
+    }
+
+    @ClusterTest
+    public void testClassicPollAfterTopicDeleted() throws Exception {
+        testPollAfterTopicDeleted(GroupProtocol.CLASSIC);
+    }
+
+    /**
+     * Test that poll() doesn't fail to retrieve committed offsets after the 
assigned topic is deleted.
+     * Validates fix for KAFKA-20165.
+     */
+    private void testPollAfterTopicDeleted(GroupProtocol groupProtocol) throws 
Exception {
+        String topicToDelete = "topic-to-delete-assign";
+        clusterInstance.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
+        TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
+
+        int numRecords = 10;
+        long startingTimestamp = System.currentTimeMillis();
+
+        Map<String, Object> consumerConfig = Map.of(
+            GROUP_PROTOCOL_CONFIG, groupProtocol.name,
+            GROUP_ID_CONFIG, "test-group",
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        try (Consumer<byte[], byte[]> consumer = 
clusterInstance.consumer(consumerConfig);
+             var admin = clusterInstance.admin()) {
+
+            // Send records and consume them (this caches the topic ID in the 
consumer)
+            ClientsTestUtils.sendRecords(clusterInstance, tpToDelete, 
numRecords, startingTimestamp);
+            consumer.assign(List.of(tpToDelete));
+            consumer.seek(tpToDelete, 0);
+            ClientsTestUtils.consumeAndVerifyRecords(consumer, tpToDelete, 
numRecords, 0, 0, startingTimestamp);
+            consumer.commitSync();
+
+            // Delete the topic and wait for deletion to propagate to metadata
+            admin.deleteTopics(List.of(topicToDelete)).all().get();
+            TestUtils.waitForCondition(
+                () -> 
!admin.listTopics().names().get().contains(topicToDelete),
+                10000,
+                "Topic should be removed from metadata");
+
+            // Change assignment to force the consumer to fetch committed 
offsets on next poll()
+            // The consumer still has the topic ID cached, so it will use 
version 10+
+            // and get UNKNOWN_TOPIC_ID from the broker
+            consumer.unsubscribe();
+            consumer.assign(List.of(tpToDelete));
+
+            // poll() should not throw - internally fetches committed offsets 
for deleted topic and recovers
+            assertDoesNotThrow(() -> 
consumer.poll(java.time.Duration.ofMillis(5000)));
+        }
+    }
+
     private static class CountConsumerCommitCallback implements 
OffsetCommitCallback {
         int successCount = 0;
         int failCount = 0;
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
index 5ff54f5e888..ef4c9c9552d 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -208,6 +208,67 @@ public class PlaintextConsumerCommitTest {
         }
     }
 
+    /**
+     * Test that calling consumer.committed() for a deleted topic returns null 
for each partition.
+     * This validates the fix for KAFKA-20165.
+     */
+    @ClusterTest
+    public void testClassicConsumerCommittedDeletedTopic() throws Exception {
+        testConsumerCommittedDeletedTopic(GroupProtocol.CLASSIC);
+    }
+
+    /**
+     * Test that calling consumer.committed() for a deleted topic returns null 
for each partition.
+     * This validates the fix for KAFKA-20165.
+     */
+    @ClusterTest
+    public void testAsyncConsumerCommittedDeletedTopic() throws Exception {
+        testConsumerCommittedDeletedTopic(GroupProtocol.CONSUMER);
+    }
+
+    /**
+     * Common test logic for both Classic and Async consumer.
+     * Tests that calling consumer.committed() for a deleted topic returns 
null.
+     * As per the Javadoc for committed(): "If any of the partitions requested 
do not exist, the result
+     * map will contain null as the value for that partition."
+     *
+     * For Classic consumer (topic names only), the GroupCoordinator returns 
no error for deleted topics (offsets -1).
+     *
+     * For Async consumer (topic IDs and topic names), the GroupCoordinator 
returns UNKNOWN_TOPIC_ID for the deleted topic
+     * when the client uses topic IDs.
+     * The consumer handles this as a retriable partition error and eventually 
returns null,
+     * keeping the same contract as the ClassicConsumer.
+     */
+    private void testConsumerCommittedDeletedTopic(GroupProtocol 
groupProtocol) throws Exception {
+        String topicToDelete = "topic-to-delete";
+        cluster.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
+        TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
+
+        try (var consumer = createConsumer(groupProtocol, false);
+             var admin = cluster.admin()) {
+            consumer.assign(List.of(tpToDelete));
+
+            // Commit an offset to ensure the consumer has the topic ID cached 
and there's data to fetch
+            consumer.commitSync(Map.of(tpToDelete, new OffsetAndMetadata(0)));
+
+            // Verify the commit was successful
+            assertEquals(0, 
consumer.committed(Set.of(tpToDelete)).get(tpToDelete).offset());
+
+            // Delete the topic
+            admin.deleteTopics(List.of(topicToDelete)).all().get();
+
+            // Eventually, the response should return null for the deleted 
topic partition.
+            TestUtils.waitForCondition(
+                () -> {
+                    var committed = consumer.committed(Set.of(tpToDelete), 
Duration.ofMillis(5000));
+                    return committed.get(tpToDelete) == null;
+                },
+                10000,
+                "Expected null for deleted topic partition"
+            );
+        }
+    }
+
     @ClusterTest
     public void testClassicConsumerAsyncCommit() throws InterruptedException {
         testAsyncCommit(GroupProtocol.CLASSIC);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index d3a13f66bca..00c5812a5fa 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -502,21 +502,26 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
     /**
      * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
+     * This is used when initializing positions on poll,
+     * and for fetching committed offsets from API calls to consumer.committed.
+     * This function will retry upon expected retriable errors while the 
timeout hasn't expired.
+     * It will fail fatally with KafkaException for all unexpected errors.
      *
-     * @param partitions       Partitions to fetch offsets for.
-     * @param deadlineMs       Time until which the request should be retried 
if it fails
-     *                         with expected retriable errors.
+     * @param partitions Partitions to fetch offsets for.
+     * @param deadlineMs Time until which the request should be retried if it 
fails
+     *                   with expected retriable errors.
      * @return Future that will complete when a successful response is 
received, or the request
-     * fails and cannot be retried. Note that the request is retried whenever 
it fails with
-     * retriable expected error and the retry time hasn't expired.
+     * fails and cannot be retried. The result contains both successful 
offsets and any retriable
+     * partition errors (like UNKNOWN_TOPIC_ID) that occurred. Note that the 
request is retried
+     * whenever it fails with expected retriable errors and the retry timeout 
hasn't expired.
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchOffsets(
+    public CompletableFuture<OffsetFetchResult> fetchOffsets(
         final Set<TopicPartition> partitions,
         final long deadlineMs) {
         if (partitions.isEmpty()) {
-            return CompletableFuture.completedFuture(Collections.emptyMap());
+            return CompletableFuture.completedFuture(new 
OffsetFetchResult(Collections.emptyMap(), Collections.emptyMap()));
         }
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
+        CompletableFuture<OffsetFetchResult> result = new 
CompletableFuture<>();
         OffsetFetchRequestState request = createOffsetFetchRequest(partitions, 
deadlineMs);
         fetchOffsetsWithRetries(request, result);
         return result;
@@ -542,8 +547,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     }
 
     private void fetchOffsetsWithRetries(final OffsetFetchRequestState 
fetchRequest,
-                                         final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
currentResult = pendingRequests.addOffsetFetchRequest(fetchRequest);
+                                         final 
CompletableFuture<OffsetFetchResult> result) {
+        CompletableFuture<OffsetFetchResult> currentResult = 
pendingRequests.addOffsetFetchRequest(fetchRequest);
 
         // Retry the same fetch request while it fails with RetriableException 
and the retry timeout hasn't expired.
         currentResult.whenComplete((res, error) -> {
@@ -552,24 +557,99 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 log.warn("A duplicated, inflight, request was identified, but 
unable to find it in the " +
                     "outbound buffer: {}", fetchRequest);
             }
-            if (error == null) {
-                maybeUpdateLastSeenEpochIfNewer(res);
-                result.complete(res);
-            } else {
-                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-                    if (fetchRequest.isExpired()) {
-                        log.debug("OffsetFetch request for {} timed out and 
won't be retried anymore", fetchRequest.requestedPartitions);
-                        
result.completeExceptionally(maybeWrapAsTimeoutException(error));
-                    } else {
-                        fetchRequest.resetFuture();
-                        fetchOffsetsWithRetries(fetchRequest, result);
-                    }
-                } else
-                    result.completeExceptionally(error);
+
+            // Group-level error
+            if (error != null) {
+                handleGroupLevelError(fetchRequest, result, error);
+                return;
+            }
+
+            // Partition-level errors
+            if (res.hasRetriablePartitionErrors()) {
+                handleRetriablePartitionErrors(fetchRequest, result, res);
+                return;
             }
+
+            handleSuccessfulOffsetFetch(result, res);
+
         });
     }
 
+    /**
+     * Handles a successful offset fetch response with no errors.
+     */
+    private void handleSuccessfulOffsetFetch(final 
CompletableFuture<OffsetFetchResult> result,
+                                             final OffsetFetchResult res) {
+        maybeUpdateLastSeenEpochIfNewer(res.offsets());
+        result.complete(res);
+    }
+
+    /**
+     * Handles group-level errors from an offset fetch request.
+     * Group-level errors indicate the entire request failed (e.g., 
coordinator unavailable).
+     */
+    private void handleGroupLevelError(final OffsetFetchRequestState 
fetchRequest,
+                                       final 
CompletableFuture<OffsetFetchResult> result,
+                                       final Throwable error) {
+        boolean isRetriable = (error instanceof RetriableException) ||
+            isStaleEpochErrorAndValidEpochAvailable(error);
+
+        if (!isRetriable) {
+            result.completeExceptionally(error);
+            return;
+        }
+
+        if (fetchRequest.isExpired()) {
+            log.debug("OffsetFetch request for {} timed out and won't be 
retried anymore",
+                fetchRequest.requestedPartitions);
+            result.completeExceptionally(maybeWrapAsTimeoutException(error));
+            return;
+        }
+
+        retryOffsetFetchOnError(fetchRequest, result, "retriable error: " + 
error.getMessage());
+    }
+
+    /**
+     * Handles retriable partition-level errors from an offset fetch response.
+     *
+     * <p>The only retriable partition errors are UNKNOWN_TOPIC_ID and 
UNKNOWN_TOPIC_OR_PARTITION.
+     * When expired or not enough time for another retry, we return partial 
results with null for
+     * the errored partitions. We check against {@code remainingBackoffMs} to 
ensure we complete
+     * before the {@link 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper}
+     * expires the event with TimeoutException.
+     */
+    private void handleRetriablePartitionErrors(final OffsetFetchRequestState 
fetchRequest,
+                                            final 
CompletableFuture<OffsetFetchResult> result,
+                                            final OffsetFetchResult res) {
+        long currentTimeMs = time.milliseconds();
+
+        // Return partial results if there is no time for another retry.
+        // We check against remainingBackoffMs to ensure we complete before the
+        // CompletableEventReaper expires the event with TimeoutException.
+        if (fetchRequest.isExpired() || fetchRequest.remainingMs() <= 
fetchRequest.remainingBackoffMs(currentTimeMs)) {
+            log.debug("OffsetFetch request for partitions {} returning partial 
results with some partition errors {}",
+                fetchRequest.requestedPartitions, 
res.retriablePartitionErrors().keySet());
+            maybeUpdateLastSeenEpochIfNewer(res.offsets());
+            result.complete(res);
+            return;
+        }
+
+        retryOffsetFetchOnError(fetchRequest, result,
+            "retriable partition errors: " + 
res.retriablePartitionErrors().keySet());
+    }
+
+    /**
+     * Retries an offset fetch request after an error.
+     */
+    private void retryOffsetFetchOnError(final OffsetFetchRequestState 
fetchRequest,
+                                         final 
CompletableFuture<OffsetFetchResult> result,
+                                         final String reason) {
+        log.debug("OffsetFetch request for {} retrying due to {}",
+            fetchRequest.requestedPartitions, reason);
+        fetchRequest.resetFuture();
+        fetchOffsetsWithRetries(fetchRequest, result);
+    }
+
     private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
         return error instanceof StaleMemberEpochException && 
memberInfo.memberEpoch.isPresent();
     }
@@ -963,6 +1043,66 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         abstract void removeRequest();
     }
 
+    /**
+     * Result of an offset fetch request.
+     * Contains the successfully fetched offsets and any
+     * retriable partition errors (e.g., UNKNOWN_TOPIC_ID).
+     * This allows returning partial results, including offsets and partition 
errors.
+     */
+    public static class OffsetFetchResult {
+        /**
+         * Partitions with offsets successfully retrieved
+         */
+        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+
+        /**
+         * Partitions with retriable errors
+         */
+        private final Map<TopicPartition, Errors> retriablePartitionErrors;
+
+        public OffsetFetchResult(Map<TopicPartition, OffsetAndMetadata> 
offsets,
+                                  Map<TopicPartition, Errors> 
retriablePartitionErrors) {
+            this.offsets = offsets;
+            this.retriablePartitionErrors = retriablePartitionErrors;
+        }
+
+        public Map<TopicPartition, OffsetAndMetadata> offsets() {
+            return offsets;
+        }
+
+        public Map<TopicPartition, Errors> retriablePartitionErrors() {
+            return retriablePartitionErrors;
+        }
+
+        public boolean hasRetriablePartitionErrors() {
+            return !retriablePartitionErrors.isEmpty();
+        }
+
+        /**
+         * Converts this result to a map of offsets, using null for partitions 
that had retriable errors.
+         * This is expected to be used when the caller wants to return partial 
results to the user, where null indicates
+         * that the offset for that partition could not be fetched.
+         *
+         * @return A new map containing all successfully fetched offsets, plus 
null entries for partitions
+         *         that had retriable errors.
+         */
+        public Map<TopicPartition, OffsetAndMetadata> toOffsetMapWithNulls() {
+            Map<TopicPartition, OffsetAndMetadata> result = new 
HashMap<>(offsets);
+            for (TopicPartition tp : retriablePartitionErrors.keySet()) {
+                result.put(tp, null);
+            }
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            return "OffsetFetchResult{" +
+                "offsets=" + offsets +
+                ", retriablePartitionErrors=" + retriablePartitionErrors +
+                '}';
+        }
+    }
+
     class OffsetFetchRequestState extends RetriableRequestState {
 
         /**
@@ -981,7 +1121,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          * Future with the result of the request. This can be reset using 
{@link #resetFuture()}
          * to get a new result when the request is retried.
          */
-        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
+        private CompletableFuture<OffsetFetchResult> future;
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
                                        final long retryBackoffMs,
@@ -1131,6 +1271,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         private void onSuccess(final long currentTimeMs,
                                final 
OffsetFetchResponseData.OffsetFetchResponseGroup response) {
             var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+            var retriablePartitionErrors = new HashMap<TopicPartition, 
Errors>();
             var unstableTxnOffsetTopicPartitions = new 
HashSet<TopicPartition>();
             var unauthorizedTopics = new HashSet<String>();
             var failedRequestRegistered = false;
@@ -1153,8 +1294,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         }
 
                         if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION || 
error == Errors.UNKNOWN_TOPIC_ID) {
-                            future.completeExceptionally(new 
KafkaException("Topic does not exist"));
-                            return;
+                            // Track retriable partition error. Continue 
processing other partitions.
+                            // The caller can decide to retry and eventually 
return the partial results only.
+                            retriablePartitionErrors.put(tp, error);
                         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) 
{
                             unauthorizedTopics.add(tp.topic());
                         } else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
@@ -1194,13 +1336,17 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 future.completeExceptionally(new 
UnstableOffsetCommitException("There are " +
                     "unstable offsets for the requested topic partitions"));
             } else {
-                onSuccessfulAttempt(currentTimeMs);
-                future.complete(offsets);
+                if (retriablePartitionErrors.isEmpty()) {
+                    // Register success if there are no partition errors.
+                    // If there were partition errors, a failed attempt has 
been already registered above.
+                    onSuccessfulAttempt(currentTimeMs);
+                }
+                future.complete(new OffsetFetchResult(offsets, 
retriablePartitionErrors));
             }
         }
 
         private void chainFuture(
-            final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
otherFuture) {
+            final CompletableFuture<OffsetFetchResult> otherFuture) {
             this.future.whenComplete((r, t) -> {
                 if (t != null) {
                     otherFuture.completeExceptionally(t);
@@ -1255,7 +1401,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          * <p>If the request is new, it invokes a callback to remove itself 
from the {@code inflightOffsetFetches}
          * upon completion.
          */
-        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final OffsetFetchRequestState request) {
+        private CompletableFuture<OffsetFetchResult> 
addOffsetFetchRequest(final OffsetFetchRequestState request) {
             Optional<OffsetFetchRequestState> dupe =
                     unsentOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
             Optional<OffsetFetchRequestState> inflight =
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 080dfa5cd96..4574bef9442 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -377,10 +377,11 @@ public final class OffsetsRequestManager implements 
RequestManager, ClusterResou
         if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
             // Generate a new OffsetFetch request and update positions when a 
response is received
             final long fetchCommittedDeadlineMs = Math.max(deadlineMs, 
time.milliseconds() + defaultApiTimeoutMs);
-            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchOffsets =
+            CompletableFuture<CommitRequestManager.OffsetFetchResult> 
fetchOffsets =
                     commitRequestManager.fetchOffsets(initializingPartitions, 
fetchCommittedDeadlineMs);
             CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchOffsetsAndRefresh =
-                    fetchOffsets.whenComplete((offsets, error) -> {
+                    
fetchOffsets.thenApply(CommitRequestManager.OffsetFetchResult::toOffsetMapWithNulls)
+                    .whenComplete((offsets, error) -> {
                         pendingOffsetFetchEvent = null;
                         // Update positions with the retrieved offsets
                         refreshOffsets(offsets, error, result);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index fb6134c909d..02208f39dee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -293,8 +293,15 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             return;
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = 
manager.fetchOffsets(event.partitions(), event.deadlineMs());
-        future.whenComplete(complete(event.future()));
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> future = 
manager.fetchOffsets(event.partitions(), event.deadlineMs());
+        future.whenComplete((result, error) -> {
+            if (error != null) {
+                event.future().completeExceptionally(error);
+            } else {
+                Map<TopicPartition, OffsetAndMetadata> offsetMap = 
result.toOffsetMapWithNulls();
+                event.future().complete(offsetMap);
+            }
+        });
     }
 
     /**
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json 
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index e92590e38e1..91de93441e5 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -40,6 +40,7 @@
   // protocol is used.
   //
   // Version 10 adds support for topic ids and removes support for topic names 
(KIP-848).
+  // It can return UNKNOWN_TOPIC_ID if topic IDs used and the topic is not 
found in metadata.
   "validVersions": "1-10",
   "flexibleVersions": "6+",
   // Supported errors:
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 540c2902384..fe337f79bd7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -737,7 +737,7 @@ public class CommitRequestManagerTest {
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
                 commitRequestManager,
                 partitions,
                 2,
@@ -761,7 +761,7 @@ public class CommitRequestManagerTest {
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
 
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
             commitRequestManager,
             partitions,
             2,
@@ -791,7 +791,7 @@ public class CommitRequestManagerTest {
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
 
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
             commitRequestManager,
             partitions,
             1,
@@ -804,7 +804,7 @@ public class CommitRequestManagerTest {
             try {
                 // The topic received in response should be included in the 
result even
                 // if it's not in the consumer metadata anymore.
-                assertTrue(f.get().containsKey(new TopicPartition("t1", 0)));
+                assertTrue(f.get().offsets().containsKey(new 
TopicPartition("t1", 0)));
             } catch (InterruptedException | ExecutionException e) {
                 fail();
             }
@@ -825,7 +825,7 @@ public class CommitRequestManagerTest {
 
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
             commitRequestManager,
             partitions,
             1,
@@ -848,7 +848,7 @@ public class CommitRequestManagerTest {
 
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
                 commitRequestManager,
                 partitions,
                 1,
@@ -878,7 +878,7 @@ public class CommitRequestManagerTest {
 
         TopicPartition tp = new TopicPartition("topic1", 0);
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult =
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult =
             commitManager.fetchOffsets(Collections.singleton(tp), deadlineMs);
 
         // Send fetch request
@@ -911,7 +911,7 @@ public class CommitRequestManagerTest {
         assertFalse(fetchResult.isCompletedExceptionally());
         Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = null;
         try {
-            offsetsAndMetadata = fetchResult.get();
+            offsetsAndMetadata = fetchResult.get().offsets();
         } catch (InterruptedException | ExecutionException e) {
             fail(e);
         }
@@ -935,7 +935,7 @@ public class CommitRequestManagerTest {
         partitions.add(new TopicPartition("t1", 0));
 
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         completeOffsetFetchRequestWithError(commitRequestManager, partitions, 
error);
 
@@ -962,7 +962,7 @@ public class CommitRequestManagerTest {
         partitions.add(new TopicPartition("t1", 0));
 
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1249,7 +1249,7 @@ public class CommitRequestManagerTest {
 
         // Send request that is expected to fail with invalid epoch.
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestResult =
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> 
requestResult =
             commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         // Mock member not having a valid epoch anymore (left/failed/fenced).
@@ -1417,7 +1417,7 @@ public class CommitRequestManagerTest {
     }
 
     private void testRetriable(final CommitRequestManager commitRequestManager,
-                               final 
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures,
+                               final 
List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> futures,
                                final Errors error
     ) {
         futures.forEach(f -> assertFalse(f.isDone()));
@@ -1448,7 +1448,7 @@ public class CommitRequestManagerTest {
         
poll.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(poll.unsentRequests.get(0),
 new HashSet<>(), error));
     }
 
-    private void testNonRetriable(final 
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+    private void testNonRetriable(final 
List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> futures) {
         futures.forEach(f -> assertTrue(f.isCompletedExceptionally()));
     }
 
@@ -1530,13 +1530,13 @@ public class CommitRequestManagerTest {
         partitions.add(tp2);
         partitions.add(tp3);
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> future =
                 commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
 
-        // Setting 1 partition with error
+        // Setting 2 partitions (tp1, tp3) with error, tp2 succeeds
         OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
             .setGroupId(DEFAULT_GROUP_ID)
             .setTopics(List.of(
@@ -1665,13 +1665,13 @@ public class CommitRequestManagerTest {
     private static Stream<Arguments> partitionDataErrorSupplier() {
         return Stream.of(
             Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT, true),
-            Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
-            Arguments.of(Errors.UNKNOWN_TOPIC_ID, false),
+            Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, true),
+            Arguments.of(Errors.UNKNOWN_TOPIC_ID, true),
             Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
             Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false));
     }
 
-    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
sendAndVerifyDuplicatedOffsetFetchRequests(
+    private List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
sendAndVerifyDuplicatedOffsetFetchRequests(
             final CommitRequestManager commitRequestManager,
             final Set<TopicPartition> partitions,
             int numRequest,
@@ -1679,14 +1679,14 @@ public class CommitRequestManagerTest {
         return 
sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions, 
numRequest, error, false, Uuid.ZERO_UUID);
     }
 
-    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
sendAndVerifyDuplicatedOffsetFetchRequests(
+    private List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
sendAndVerifyDuplicatedOffsetFetchRequests(
             final CommitRequestManager commitRequestManager,
             final Set<TopicPartition> partitions,
             int numRequest,
             final Errors error,
             final boolean shouldUseTopicIds,
             final Uuid topicId) {
-        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = new ArrayList<>();
+        List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> 
futures = new ArrayList<>();
         long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         for (int i = 0; i < numRequest; i++) {
             futures.add(commitRequestManager.fetchOffsets(partitions, 
deadlineMs));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 2251c0f138a..1da988864e5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -675,7 +675,7 @@ public class OffsetsRequestManagerTest {
         mockAssignedPartitionsMissingPositions(initPartitions1, 
initPartitions1, leaderAndEpoch);
 
         // Call to updateFetchPositions. Should send an OffsetFetch request 
and use the response to set positions
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
         CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone(), "Update positions should wait 
for the OffsetFetch request");
@@ -685,7 +685,8 @@ public class OffsetsRequestManagerTest {
         // of initializing partitions hasn't changed)
         
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions1);
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, 
Optional.of(1), "");
-        fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+        fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+            Collections.singletonMap(tp1, offsetAndMetadata), 
Collections.emptyMap()));
 
         assertTrue(updatePositions1.isDone(), "Update positions should 
complete after the OffsetFetch response");
         SubscriptionState.FetchPosition expectedPosition = new 
SubscriptionState.FetchPosition(
@@ -704,7 +705,7 @@ public class OffsetsRequestManagerTest {
         mockAssignedPartitionsMissingPositions(initPartitions1, 
initPartitions1, leaderAndEpoch);
 
         // call to updateFetchPositions. Should send an OffsetFetch request
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
         CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone(), "Update positions should wait 
for the OffsetFetch request");
@@ -717,7 +718,8 @@ public class OffsetsRequestManagerTest {
 
         // Receive response with committed offsets, should complete both calls
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, 
Optional.of(1), "");
-        fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+        fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+            Collections.singletonMap(tp1, offsetAndMetadata), 
Collections.emptyMap()));
 
         assertTrue(updatePositions1.isDone());
         assertTrue(updatePositions2.isDone());
@@ -737,7 +739,7 @@ public class OffsetsRequestManagerTest {
         mockAssignedPartitionsMissingPositions(initPartitions1, 
initPartitions1, leaderAndEpoch);
 
         // call to updateFetchPositions will trigger an OffsetFetch request 
for tp1 (won't complete just yet)
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
         CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone());
@@ -748,7 +750,8 @@ public class OffsetsRequestManagerTest {
         // seek). When the OffsetFetch response is received, it should not 
update the position for tp1 to the
         // committed offset
         
when(subscriptionState.initializingPartitions()).thenReturn(Collections.emptySet());
-        fetchResult.complete(Collections.singletonMap(tp1, new 
OffsetAndMetadata(5)));
+        fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+            Collections.singletonMap(tp1, new OffsetAndMetadata(5)), 
Collections.emptyMap()));
         verify(subscriptionState, never()).seekUnvalidated(any(), any());
     }
 
@@ -765,7 +768,7 @@ public class OffsetsRequestManagerTest {
         mockAssignedPartitionsMissingPositions(initPartitions1, 
initPartitions1, leaderAndEpoch);
 
         // call to updateFetchPositions will trigger an OffsetFetch request 
for tp1 (won't complete just yet)
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
+        CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult 
= new CompletableFuture<>();
         when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
         CompletableFuture<Void> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
         assertFalse(updatePositions1.isDone());
@@ -781,7 +784,8 @@ public class OffsetsRequestManagerTest {
         // include the requested partition tp1
         
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions2);
         OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, 
Optional.empty(), "");
-        fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+        fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+            Collections.singletonMap(tp1, offsetAndMetadata), 
Collections.emptyMap()));
 
         // Position should have been updated for tp1 using the committed offset
         SubscriptionState.FetchPosition expectedPosition = new 
SubscriptionState.FetchPosition(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index e3a9a05c4cd..00b1f6f3587 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -86,7 +86,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
 public class ApplicationEventProcessorTest {
     private final Time time = new MockTime();
     private final CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
@@ -352,7 +352,9 @@ public class ApplicationEventProcessorTest {
         FetchCommittedOffsetsEvent event = new 
FetchCommittedOffsetsEvent(partitions, 12345);
 
         setupProcessor(true);
-        when(commitRequestManager.fetchOffsets(partitions, 
12345)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
+        CommitRequestManager.OffsetFetchResult fetchResult = new 
CommitRequestManager.OffsetFetchResult(
+            topicPartitionOffsets, Collections.emptyMap());
+        when(commitRequestManager.fetchOffsets(partitions, 
12345)).thenReturn(CompletableFuture.completedFuture(fetchResult));
         processor.process(event);
 
         verify(commitRequestManager).fetchOffsets(partitions, 12345);


Reply via email to