This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 5610f3af0cd KAFKA-20165: Handle retriable partition errors gracefully
on OffsetFetch (#21464)
5610f3af0cd is described below
commit 5610f3af0cd39377f5077cd2eef05965a89fd49e
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Apr 2 10:35:45 2026 -0400
KAFKA-20165: Handle retriable partition errors gracefully on OffsetFetch
(#21464)
Fix to ensure consumer poll/committed do not fail fatally on retriable
UNKNOWN_TOPIC_ID partition-level error when fetching committed offsets.
This issue didn't happen before using topic IDs on offset fetch, because
when fetching with topic names the GC returns no error for unknown
partitions (just includes offset -1). But with topic IDs (OffsetFetch
v10+), if a topic is not known to the GC, the OffsetFetch response
includes UNKNOWN_TOPIC_ID as error.
The fix is to let poll/committed retry while there is time. If the error
does not recover, fetch offsets still returns gracefully (offsets for
the partitions that have, null for the partitions that were deleted).
This keeps the contract of the committed API, and ensures poll uses the
committed offsets available (it will use partition offsets for
partitions that still need positions)
Added integration tests that fail without the fix, to show the gaps.
Reviewers: David Jacot <[email protected]>, Nikita Shupletsov
<[email protected]>
---
.../consumer/PlaintextConsumerAssignTest.java | 57 ++++++
.../consumer/PlaintextConsumerCommitTest.java | 61 ++++++
.../consumer/internals/CommitRequestManager.java | 208 ++++++++++++++++++---
.../consumer/internals/OffsetsRequestManager.java | 5 +-
.../events/ApplicationEventProcessor.java | 11 +-
.../common/message/OffsetFetchResponse.json | 1 +
.../internals/CommitRequestManagerTest.java | 40 ++--
.../internals/OffsetsRequestManagerTest.java | 20 +-
.../events/ApplicationEventProcessorTest.java | 6 +-
9 files changed, 344 insertions(+), 65 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
index b12b6dacb4d..b1699851a18 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +39,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_IN
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -298,6 +300,61 @@ public class PlaintextConsumerAssignTest {
}
}
+ @ClusterTest
+ public void testAsyncPollAfterTopicDeleted() throws Exception {
+ testPollAfterTopicDeleted(GroupProtocol.CONSUMER);
+ }
+
+ @ClusterTest
+ public void testClassicPollAfterTopicDeleted() throws Exception {
+ testPollAfterTopicDeleted(GroupProtocol.CLASSIC);
+ }
+
+ /**
+ * Test that poll() doesn't fail to retrieve committed offsets after the
assigned topic is deleted.
+ * Validates fix for KAFKA-20165.
+ */
+ private void testPollAfterTopicDeleted(GroupProtocol groupProtocol) throws
Exception {
+ String topicToDelete = "topic-to-delete-assign";
+ clusterInstance.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
+ TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
+
+ int numRecords = 10;
+ long startingTimestamp = System.currentTimeMillis();
+
+ Map<String, Object> consumerConfig = Map.of(
+ GROUP_PROTOCOL_CONFIG, groupProtocol.name,
+ GROUP_ID_CONFIG, "test-group",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ try (Consumer<byte[], byte[]> consumer =
clusterInstance.consumer(consumerConfig);
+ var admin = clusterInstance.admin()) {
+
+ // Send records and consume them (this caches the topic ID in the
consumer)
+ ClientsTestUtils.sendRecords(clusterInstance, tpToDelete,
numRecords, startingTimestamp);
+ consumer.assign(List.of(tpToDelete));
+ consumer.seek(tpToDelete, 0);
+ ClientsTestUtils.consumeAndVerifyRecords(consumer, tpToDelete,
numRecords, 0, 0, startingTimestamp);
+ consumer.commitSync();
+
+ // Delete the topic and wait for deletion to propagate to metadata
+ admin.deleteTopics(List.of(topicToDelete)).all().get();
+ TestUtils.waitForCondition(
+ () ->
!admin.listTopics().names().get().contains(topicToDelete),
+ 10000,
+ "Topic should be removed from metadata");
+
+ // Change assignment to force the consumer to fetch committed
offsets on next poll()
+ // The consumer still has the topic ID cached, so it will use
version 10+
+ // and get UNKNOWN_TOPIC_ID from the broker
+ consumer.unsubscribe();
+ consumer.assign(List.of(tpToDelete));
+
+ // poll() should not throw - internally fetches committed offsets
for deleted topic and recovers
+ assertDoesNotThrow(() ->
consumer.poll(java.time.Duration.ofMillis(5000)));
+ }
+ }
+
private static class CountConsumerCommitCallback implements
OffsetCommitCallback {
int successCount = 0;
int failCount = 0;
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
index 5ff54f5e888..ef4c9c9552d 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -208,6 +208,67 @@ public class PlaintextConsumerCommitTest {
}
}
+ /**
+ * Test that calling consumer.committed() for a deleted topic returns null
for each partition.
+ * This validates the fix for KAFKA-20165.
+ */
+ @ClusterTest
+ public void testClassicConsumerCommittedDeletedTopic() throws Exception {
+ testConsumerCommittedDeletedTopic(GroupProtocol.CLASSIC);
+ }
+
+ /**
+ * Test that calling consumer.committed() for a deleted topic returns null
for each partition.
+ * This validates the fix for KAFKA-20165.
+ */
+ @ClusterTest
+ public void testAsyncConsumerCommittedDeletedTopic() throws Exception {
+ testConsumerCommittedDeletedTopic(GroupProtocol.CONSUMER);
+ }
+
+ /**
+ * Common test logic for both Classic and Async consumer.
+ * Tests that calling consumer.committed() for a deleted topic returns
null.
+ * As per the Javadoc for committed(): "If any of the partitions requested
do not exist, the result
+ * map will contain null as the value for that partition."
+ *
+ * For Classic consumer (topic names only), the GroupCoordinator returns
no error for deleted topics (offsets -1).
+ *
+ * For Async consumer (topic IDs and topic names), the GroupCoordinator
returns UNKNOWN_TOPIC_ID for the deleted topic
+ * when the client uses topic IDs.
+ * The consumer handles this as a retriable partition error and eventually
returns null,
+ * keeping the same contract as the ClassicConsumer.
+ */
+ private void testConsumerCommittedDeletedTopic(GroupProtocol
groupProtocol) throws Exception {
+ String topicToDelete = "topic-to-delete";
+ cluster.createTopic(topicToDelete, 1, (short) BROKER_COUNT);
+ TopicPartition tpToDelete = new TopicPartition(topicToDelete, 0);
+
+ try (var consumer = createConsumer(groupProtocol, false);
+ var admin = cluster.admin()) {
+ consumer.assign(List.of(tpToDelete));
+
+ // Commit an offset to ensure the consumer has the topic ID cached
and there's data to fetch
+ consumer.commitSync(Map.of(tpToDelete, new OffsetAndMetadata(0)));
+
+ // Verify the commit was successful
+ assertEquals(0,
consumer.committed(Set.of(tpToDelete)).get(tpToDelete).offset());
+
+ // Delete the topic
+ admin.deleteTopics(List.of(topicToDelete)).all().get();
+
+ // Eventually, the response should return null for the deleted
topic partition.
+ TestUtils.waitForCondition(
+ () -> {
+ var committed = consumer.committed(Set.of(tpToDelete),
Duration.ofMillis(5000));
+ return committed.get(tpToDelete) == null;
+ },
+ 10000,
+ "Expected null for deleted topic partition"
+ );
+ }
+ }
+
@ClusterTest
public void testClassicConsumerAsyncCommit() throws InterruptedException {
testAsyncCommit(GroupProtocol.CLASSIC);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index d3a13f66bca..00c5812a5fa 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -502,21 +502,26 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
/**
* Enqueue a request to fetch committed offsets, that will be sent on the
next call to {@link #poll(long)}.
+ * This is used when initializing positions on poll,
+ * and for fetching committed offsets from API calls to consumer.committed.
+ * This function will retry upon expected retriable errors while the
timeout hasn't expired.
+ * It will fail fatally with KafkaException for all unexpected errors.
*
- * @param partitions Partitions to fetch offsets for.
- * @param deadlineMs Time until which the request should be retried
if it fails
- * with expected retriable errors.
+ * @param partitions Partitions to fetch offsets for.
+ * @param deadlineMs Time until which the request should be retried if it
fails
+ * with expected retriable errors.
* @return Future that will complete when a successful response is
received, or the request
- * fails and cannot be retried. Note that the request is retried whenever
it fails with
- * retriable expected error and the retry time hasn't expired.
+ * fails and cannot be retried. The result contains both successful
offsets and any retriable
+ * partition errors (like UNKNOWN_TOPIC_ID) that occurred. Note that the
request is retried
+ * whenever it fails with expected retriable errors and the retry timeout
hasn't expired.
*/
- public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
fetchOffsets(
+ public CompletableFuture<OffsetFetchResult> fetchOffsets(
final Set<TopicPartition> partitions,
final long deadlineMs) {
if (partitions.isEmpty()) {
- return CompletableFuture.completedFuture(Collections.emptyMap());
+ return CompletableFuture.completedFuture(new
OffsetFetchResult(Collections.emptyMap(), Collections.emptyMap()));
}
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new
CompletableFuture<>();
+ CompletableFuture<OffsetFetchResult> result = new
CompletableFuture<>();
OffsetFetchRequestState request = createOffsetFetchRequest(partitions,
deadlineMs);
fetchOffsetsWithRetries(request, result);
return result;
@@ -542,8 +547,8 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
private void fetchOffsetsWithRetries(final OffsetFetchRequestState
fetchRequest,
- final
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
currentResult = pendingRequests.addOffsetFetchRequest(fetchRequest);
+ final
CompletableFuture<OffsetFetchResult> result) {
+ CompletableFuture<OffsetFetchResult> currentResult =
pendingRequests.addOffsetFetchRequest(fetchRequest);
// Retry the same fetch request while it fails with RetriableException
and the retry timeout hasn't expired.
currentResult.whenComplete((res, error) -> {
@@ -552,24 +557,99 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
log.warn("A duplicated, inflight, request was identified, but
unable to find it in the " +
"outbound buffer: {}", fetchRequest);
}
- if (error == null) {
- maybeUpdateLastSeenEpochIfNewer(res);
- result.complete(res);
- } else {
- if (error instanceof RetriableException ||
isStaleEpochErrorAndValidEpochAvailable(error)) {
- if (fetchRequest.isExpired()) {
- log.debug("OffsetFetch request for {} timed out and
won't be retried anymore", fetchRequest.requestedPartitions);
-
result.completeExceptionally(maybeWrapAsTimeoutException(error));
- } else {
- fetchRequest.resetFuture();
- fetchOffsetsWithRetries(fetchRequest, result);
- }
- } else
- result.completeExceptionally(error);
+
+ // Group-level error
+ if (error != null) {
+ handleGroupLevelError(fetchRequest, result, error);
+ return;
+ }
+
+ // Partition-level errors
+ if (res.hasRetriablePartitionErrors()) {
+ handleRetriablePartitionErrors(fetchRequest, result, res);
+ return;
}
+
+ handleSuccessfulOffsetFetch(result, res);
+
});
}
+ /**
+ * Handles a successful offset fetch response with no errors.
+ */
+ private void handleSuccessfulOffsetFetch(final
CompletableFuture<OffsetFetchResult> result,
+ final OffsetFetchResult res) {
+ maybeUpdateLastSeenEpochIfNewer(res.offsets());
+ result.complete(res);
+ }
+
+ /**
+ * Handles group-level errors from an offset fetch request.
+ * Group-level errors indicate the entire request failed (e.g.,
coordinator unavailable).
+ */
+ private void handleGroupLevelError(final OffsetFetchRequestState
fetchRequest,
+ final
CompletableFuture<OffsetFetchResult> result,
+ final Throwable error) {
+ boolean isRetriable = (error instanceof RetriableException) ||
+ isStaleEpochErrorAndValidEpochAvailable(error);
+
+ if (!isRetriable) {
+ result.completeExceptionally(error);
+ return;
+ }
+
+ if (fetchRequest.isExpired()) {
+ log.debug("OffsetFetch request for {} timed out and won't be
retried anymore",
+ fetchRequest.requestedPartitions);
+ result.completeExceptionally(maybeWrapAsTimeoutException(error));
+ return;
+ }
+
+ retryOffsetFetchOnError(fetchRequest, result, "retriable error: " +
error.getMessage());
+ }
+
+ /**
+ * Handles retriable partition-level errors from an offset fetch response.
+ *
+ * <p>The only retriable partition errors are UNKNOWN_TOPIC_ID and
UNKNOWN_TOPIC_OR_PARTITION.
+ * When expired or not enough time for another retry, we return partial
results with null for
+ * the errored partitions. We check against {@code remainingBackoffMs} to
ensure we complete
+ * before the {@link
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper}
+ * expires the event with TimeoutException.
+ */
+ private void handleRetriablePartitionErrors(final OffsetFetchRequestState
fetchRequest,
+ final
CompletableFuture<OffsetFetchResult> result,
+ final OffsetFetchResult res) {
+ long currentTimeMs = time.milliseconds();
+
+ // Return partial results if there is no time for another retry.
+ // We check against remainingBackoffMs to ensure we complete before the
+ // CompletableEventReaper expires the event with TimeoutException.
+ if (fetchRequest.isExpired() || fetchRequest.remainingMs() <=
fetchRequest.remainingBackoffMs(currentTimeMs)) {
+ log.debug("OffsetFetch request for partitions {} returning partial
results with some partition errors {}",
+ fetchRequest.requestedPartitions,
res.retriablePartitionErrors().keySet());
+ maybeUpdateLastSeenEpochIfNewer(res.offsets());
+ result.complete(res);
+ return;
+ }
+
+ retryOffsetFetchOnError(fetchRequest, result,
+ "retriable partition errors: " +
res.retriablePartitionErrors().keySet());
+ }
+
+ /**
+ * Retries an offset fetch request after an error.
+ */
+ private void retryOffsetFetchOnError(final OffsetFetchRequestState
fetchRequest,
+ final
CompletableFuture<OffsetFetchResult> result,
+ final String reason) {
+ log.debug("OffsetFetch request for {} retrying due to {}",
+ fetchRequest.requestedPartitions, reason);
+ fetchRequest.resetFuture();
+ fetchOffsetsWithRetries(fetchRequest, result);
+ }
+
private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
return error instanceof StaleMemberEpochException &&
memberInfo.memberEpoch.isPresent();
}
@@ -963,6 +1043,66 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
abstract void removeRequest();
}
+ /**
+ * Result of an offset fetch request.
+ * Contains the successfully fetched offsets and any
+ * retriable partition errors (e.g., UNKNOWN_TOPIC_ID).
+ * This allows returning partial results, including offsets and partition
errors.
+ */
+ public static class OffsetFetchResult {
+ /**
+ * Partitions with offsets successfully retrieved
+ */
+ private final Map<TopicPartition, OffsetAndMetadata> offsets;
+
+ /**
+ * Partitions with retriable errors
+ */
+ private final Map<TopicPartition, Errors> retriablePartitionErrors;
+
+ public OffsetFetchResult(Map<TopicPartition, OffsetAndMetadata>
offsets,
+ Map<TopicPartition, Errors>
retriablePartitionErrors) {
+ this.offsets = offsets;
+ this.retriablePartitionErrors = retriablePartitionErrors;
+ }
+
+ public Map<TopicPartition, OffsetAndMetadata> offsets() {
+ return offsets;
+ }
+
+ public Map<TopicPartition, Errors> retriablePartitionErrors() {
+ return retriablePartitionErrors;
+ }
+
+ public boolean hasRetriablePartitionErrors() {
+ return !retriablePartitionErrors.isEmpty();
+ }
+
+ /**
+ * Converts this result to a map of offsets, using null for partitions
that had retriable errors.
+ * This is expected to be used when the caller wants to return partial
results to the user, where null indicates
+ * that the offset for that partition could not be fetched.
+ *
+ * @return A new map containing all successfully fetched offsets, plus
null entries for partitions
+ * that had retriable errors.
+ */
+ public Map<TopicPartition, OffsetAndMetadata> toOffsetMapWithNulls() {
+ Map<TopicPartition, OffsetAndMetadata> result = new
HashMap<>(offsets);
+ for (TopicPartition tp : retriablePartitionErrors.keySet()) {
+ result.put(tp, null);
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "OffsetFetchResult{" +
+ "offsets=" + offsets +
+ ", retriablePartitionErrors=" + retriablePartitionErrors +
+ '}';
+ }
+ }
+
class OffsetFetchRequestState extends RetriableRequestState {
/**
@@ -981,7 +1121,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* Future with the result of the request. This can be reset using
{@link #resetFuture()}
* to get a new result when the request is retried.
*/
- private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
future;
+ private CompletableFuture<OffsetFetchResult> future;
public OffsetFetchRequestState(final Set<TopicPartition> partitions,
final long retryBackoffMs,
@@ -1131,6 +1271,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
private void onSuccess(final long currentTimeMs,
final
OffsetFetchResponseData.OffsetFetchResponseGroup response) {
var offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
+ var retriablePartitionErrors = new HashMap<TopicPartition,
Errors>();
var unstableTxnOffsetTopicPartitions = new
HashSet<TopicPartition>();
var unauthorizedTopics = new HashSet<String>();
var failedRequestRegistered = false;
@@ -1153,8 +1294,9 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
}
if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
error == Errors.UNKNOWN_TOPIC_ID) {
- future.completeExceptionally(new
KafkaException("Topic does not exist"));
- return;
+ // Track retriable partition error. Continue
processing other partitions.
+ // The caller can decide to retry and eventually
return the partial results only.
+ retriablePartitionErrors.put(tp, error);
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
{
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
@@ -1194,13 +1336,17 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
future.completeExceptionally(new
UnstableOffsetCommitException("There are " +
"unstable offsets for the requested topic partitions"));
} else {
- onSuccessfulAttempt(currentTimeMs);
- future.complete(offsets);
+ if (retriablePartitionErrors.isEmpty()) {
+ // Register success if there are no partition errors.
+ // If there were partition errors, a failed attempt has
been already registered above.
+ onSuccessfulAttempt(currentTimeMs);
+ }
+ future.complete(new OffsetFetchResult(offsets,
retriablePartitionErrors));
}
}
private void chainFuture(
- final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
otherFuture) {
+ final CompletableFuture<OffsetFetchResult> otherFuture) {
this.future.whenComplete((r, t) -> {
if (t != null) {
otherFuture.completeExceptionally(t);
@@ -1255,7 +1401,7 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
* <p>If the request is new, it invokes a callback to remove itself
from the {@code inflightOffsetFetches}
* upon completion.
*/
- private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
addOffsetFetchRequest(final OffsetFetchRequestState request) {
+ private CompletableFuture<OffsetFetchResult>
addOffsetFetchRequest(final OffsetFetchRequestState request) {
Optional<OffsetFetchRequestState> dupe =
unsentOffsetFetches.stream().filter(r ->
r.sameRequest(request)).findAny();
Optional<OffsetFetchRequestState> inflight =
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 080dfa5cd96..4574bef9442 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -377,10 +377,11 @@ public final class OffsetsRequestManager implements
RequestManager, ClusterResou
if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
// Generate a new OffsetFetch request and update positions when a
response is received
final long fetchCommittedDeadlineMs = Math.max(deadlineMs,
time.milliseconds() + defaultApiTimeoutMs);
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
fetchOffsets =
+ CompletableFuture<CommitRequestManager.OffsetFetchResult>
fetchOffsets =
commitRequestManager.fetchOffsets(initializingPartitions,
fetchCommittedDeadlineMs);
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
fetchOffsetsAndRefresh =
- fetchOffsets.whenComplete((offsets, error) -> {
+
fetchOffsets.thenApply(CommitRequestManager.OffsetFetchResult::toOffsetMapWithNulls)
+ .whenComplete((offsets, error) -> {
pendingOffsetFetchEvent = null;
// Update positions with the retrieved offsets
refreshOffsets(offsets, error, result);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index fb6134c909d..02208f39dee 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -293,8 +293,15 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
return;
}
CommitRequestManager manager =
requestManagers.commitRequestManager.get();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
manager.fetchOffsets(event.partitions(), event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> future =
manager.fetchOffsets(event.partitions(), event.deadlineMs());
+ future.whenComplete((result, error) -> {
+ if (error != null) {
+ event.future().completeExceptionally(error);
+ } else {
+ Map<TopicPartition, OffsetAndMetadata> offsetMap =
result.toOffsetMapWithNulls();
+ event.future().complete(offsetMap);
+ }
+ });
}
/**
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json
b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index e92590e38e1..91de93441e5 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -40,6 +40,7 @@
// protocol is used.
//
// Version 10 adds support for topic ids and removes support for topic names
(KIP-848).
+ // It can return UNKNOWN_TOPIC_ID if topic IDs used and the topic is not
found in metadata.
"validVersions": "1-10",
"flexibleVersions": "6+",
// Supported errors:
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 540c2902384..fe337f79bd7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -737,7 +737,7 @@ public class CommitRequestManagerTest {
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
commitRequestManager,
partitions,
2,
@@ -761,7 +761,7 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
commitRequestManager,
partitions,
2,
@@ -791,7 +791,7 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
commitRequestManager,
partitions,
1,
@@ -804,7 +804,7 @@ public class CommitRequestManagerTest {
try {
// The topic received in response should be included in the
result even
// if it's not in the consumer metadata anymore.
- assertTrue(f.get().containsKey(new TopicPartition("t1", 0)));
+ assertTrue(f.get().offsets().containsKey(new
TopicPartition("t1", 0)));
} catch (InterruptedException | ExecutionException e) {
fail();
}
@@ -825,7 +825,7 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
commitRequestManager,
partitions,
1,
@@ -848,7 +848,7 @@ public class CommitRequestManagerTest {
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(new TopicPartition("t1", 0));
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
commitRequestManager,
partitions,
1,
@@ -878,7 +878,7 @@ public class CommitRequestManagerTest {
TopicPartition tp = new TopicPartition("topic1", 0);
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult =
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult =
commitManager.fetchOffsets(Collections.singleton(tp), deadlineMs);
// Send fetch request
@@ -911,7 +911,7 @@ public class CommitRequestManagerTest {
assertFalse(fetchResult.isCompletedExceptionally());
Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = null;
try {
- offsetsAndMetadata = fetchResult.get();
+ offsetsAndMetadata = fetchResult.get().offsets();
} catch (InterruptedException | ExecutionException e) {
fail(e);
}
@@ -935,7 +935,7 @@ public class CommitRequestManagerTest {
partitions.add(new TopicPartition("t1", 0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> result =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
completeOffsetFetchRequestWithError(commitRequestManager, partitions,
error);
@@ -962,7 +962,7 @@ public class CommitRequestManagerTest {
partitions.add(new TopicPartition("t1", 0));
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> result =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
@@ -1249,7 +1249,7 @@ public class CommitRequestManagerTest {
// Send request that is expected to fail with invalid epoch.
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
requestResult =
+ CompletableFuture<CommitRequestManager.OffsetFetchResult>
requestResult =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
// Mock member not having a valid epoch anymore (left/failed/fenced).
@@ -1417,7 +1417,7 @@ public class CommitRequestManagerTest {
}
private void testRetriable(final CommitRequestManager commitRequestManager,
- final
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures,
+ final
List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> futures,
final Errors error
) {
futures.forEach(f -> assertFalse(f.isDone()));
@@ -1448,7 +1448,7 @@ public class CommitRequestManagerTest {
poll.unsentRequests.get(0).handler().onComplete(buildOffsetFetchClientResponse(poll.unsentRequests.get(0),
new HashSet<>(), error));
}
- private void testNonRetriable(final
List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+ private void testNonRetriable(final
List<CompletableFuture<CommitRequestManager.OffsetFetchResult>> futures) {
futures.forEach(f -> assertTrue(f.isCompletedExceptionally()));
}
@@ -1530,13 +1530,13 @@ public class CommitRequestManagerTest {
partitions.add(tp2);
partitions.add(tp3);
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> future =
commitRequestManager.fetchOffsets(partitions, deadlineMs);
NetworkClientDelegate.PollResult res =
commitRequestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
- // Setting 1 partition with error
+ // Setting 2 partitions (tp1, tp3) with error, tp2 succeeds
OffsetFetchResponseData.OffsetFetchResponseGroup groupResponse = new
OffsetFetchResponseData.OffsetFetchResponseGroup()
.setGroupId(DEFAULT_GROUP_ID)
.setTopics(List.of(
@@ -1665,13 +1665,13 @@ public class CommitRequestManagerTest {
private static Stream<Arguments> partitionDataErrorSupplier() {
return Stream.of(
Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT, true),
- Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
- Arguments.of(Errors.UNKNOWN_TOPIC_ID, false),
+ Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, true),
+ Arguments.of(Errors.UNKNOWN_TOPIC_ID, true),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false));
}
- private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
sendAndVerifyDuplicatedOffsetFetchRequests(
+ private List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
sendAndVerifyDuplicatedOffsetFetchRequests(
final CommitRequestManager commitRequestManager,
final Set<TopicPartition> partitions,
int numRequest,
@@ -1679,14 +1679,14 @@ public class CommitRequestManagerTest {
return
sendAndVerifyDuplicatedOffsetFetchRequests(commitRequestManager, partitions,
numRequest, error, false, Uuid.ZERO_UUID);
}
- private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
sendAndVerifyDuplicatedOffsetFetchRequests(
+ private List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
sendAndVerifyDuplicatedOffsetFetchRequests(
final CommitRequestManager commitRequestManager,
final Set<TopicPartition> partitions,
int numRequest,
final Errors error,
final boolean shouldUseTopicIds,
final Uuid topicId) {
- List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures = new ArrayList<>();
+ List<CompletableFuture<CommitRequestManager.OffsetFetchResult>>
futures = new ArrayList<>();
long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
for (int i = 0; i < numRequest; i++) {
futures.add(commitRequestManager.fetchOffsets(partitions,
deadlineMs));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 2251c0f138a..1da988864e5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -675,7 +675,7 @@ public class OffsetsRequestManagerTest {
mockAssignedPartitionsMissingPositions(initPartitions1,
initPartitions1, leaderAndEpoch);
// Call to updateFetchPositions. Should send an OffsetFetch request
and use the response to set positions
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone(), "Update positions should wait
for the OffsetFetch request");
@@ -685,7 +685,8 @@ public class OffsetsRequestManagerTest {
// of initializing partitions hasn't changed)
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10,
Optional.of(1), "");
- fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+ fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+ Collections.singletonMap(tp1, offsetAndMetadata),
Collections.emptyMap()));
assertTrue(updatePositions1.isDone(), "Update positions should
complete after the OffsetFetch response");
SubscriptionState.FetchPosition expectedPosition = new
SubscriptionState.FetchPosition(
@@ -704,7 +705,7 @@ public class OffsetsRequestManagerTest {
mockAssignedPartitionsMissingPositions(initPartitions1,
initPartitions1, leaderAndEpoch);
// call to updateFetchPositions. Should send an OffsetFetch request
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone(), "Update positions should wait
for the OffsetFetch request");
@@ -717,7 +718,8 @@ public class OffsetsRequestManagerTest {
// Receive response with committed offsets, should complete both calls
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10,
Optional.of(1), "");
- fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+ fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+ Collections.singletonMap(tp1, offsetAndMetadata),
Collections.emptyMap()));
assertTrue(updatePositions1.isDone());
assertTrue(updatePositions2.isDone());
@@ -737,7 +739,7 @@ public class OffsetsRequestManagerTest {
mockAssignedPartitionsMissingPositions(initPartitions1,
initPartitions1, leaderAndEpoch);
// call to updateFetchPositions will trigger an OffsetFetch request
for tp1 (won't complete just yet)
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone());
@@ -748,7 +750,8 @@ public class OffsetsRequestManagerTest {
// seek). When the OffsetFetch response is received, it should not
update the position for tp1 to the
// committed offset
when(subscriptionState.initializingPartitions()).thenReturn(Collections.emptySet());
- fetchResult.complete(Collections.singletonMap(tp1, new
OffsetAndMetadata(5)));
+ fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+ Collections.singletonMap(tp1, new OffsetAndMetadata(5)),
Collections.emptyMap()));
verify(subscriptionState, never()).seekUnvalidated(any(), any());
}
@@ -765,7 +768,7 @@ public class OffsetsRequestManagerTest {
mockAssignedPartitionsMissingPositions(initPartitions1,
initPartitions1, leaderAndEpoch);
// call to updateFetchPositions will trigger an OffsetFetch request
for tp1 (won't complete just yet)
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
+ CompletableFuture<CommitRequestManager.OffsetFetchResult> fetchResult
= new CompletableFuture<>();
when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
CompletableFuture<Void> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
assertFalse(updatePositions1.isDone());
@@ -781,7 +784,8 @@ public class OffsetsRequestManagerTest {
// include the requested partition tp1
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions2);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10,
Optional.empty(), "");
- fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+ fetchResult.complete(new CommitRequestManager.OffsetFetchResult(
+ Collections.singletonMap(tp1, offsetAndMetadata),
Collections.emptyMap()));
// Position should have been updated for tp1 using the committed offset
SubscriptionState.FetchPosition expectedPosition = new
SubscriptionState.FetchPosition(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index e3a9a05c4cd..00b1f6f3587 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -86,7 +86,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
public class ApplicationEventProcessorTest {
private final Time time = new MockTime();
private final CommitRequestManager commitRequestManager =
mock(CommitRequestManager.class);
@@ -352,7 +352,9 @@ public class ApplicationEventProcessorTest {
FetchCommittedOffsetsEvent event = new
FetchCommittedOffsetsEvent(partitions, 12345);
setupProcessor(true);
- when(commitRequestManager.fetchOffsets(partitions,
12345)).thenReturn(CompletableFuture.completedFuture(topicPartitionOffsets));
+ CommitRequestManager.OffsetFetchResult fetchResult = new
CommitRequestManager.OffsetFetchResult(
+ topicPartitionOffsets, Collections.emptyMap());
+ when(commitRequestManager.fetchOffsets(partitions,
12345)).thenReturn(CompletableFuture.completedFuture(fetchResult));
processor.process(event);
verify(commitRequestManager).fetchOffsets(partitions, 12345);