This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b954b35d0ae KAFKA-20332: Fix to ensure app thread not collecting
records for partitions being revoked (#21897)
b954b35d0ae is described below
commit b954b35d0aeaf66d6bedccc50b4ddae28f2afe87
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Apr 7 21:22:44 2026 -0400
KAFKA-20332: Fix to ensure app thread not collecting records for partitions
being revoked (#21897)
This addresses race conditions where the app thread could collect/return
records for revoked partitions.
Fix by ensuring that the app thread does not return buffered records if
it hasn't checked pending reconciliations. Once it checked pending
reconciliations, we know that partitions being revoked were marked as
non-fetchable (so it's when we can safely move onto fetching/collecting
in the app thread). Also ensure that background reconciliations do not
trigger revocations (the app thread could already have records in
memory, collected from the buffer, for those partitions, which would
lead to the consumer returning records for revoked partitions if the
background completes the revocation before the app thread returns).
With these fixes we are sure that the app thread only collects/returns
records after it has marked revoked partitions as non-fetchable.
This fix applies to the consumer only (share consumer remains unchanged
with this PR, can trigger full reconciliation & assignment update from
the background)
Reviewers: Andrew Schofield <[email protected]>, nileshkumar3
<[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>, Kirk True <[email protected]>
---
.../internals/AbstractMembershipManager.java | 11 ++--
.../consumer/internals/AsyncKafkaConsumer.java | 28 ++++++++--
.../consumer/internals/ShareMembershipManager.java | 13 +++++
.../events/ApplicationEventProcessor.java | 4 ++
.../consumer/internals/events/AsyncPollEvent.java | 37 ++++++++++++-
.../consumer/internals/AsyncKafkaConsumerTest.java | 62 ++++++++++++++++++++++
6 files changed, 148 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c1972900f7f..5782e138202 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -838,9 +838,6 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
return;
}
- if (autoCommitEnabled && !canCommit) return;
- markReconciliationInProgress();
-
// Keep copy of assigned TopicPartitions created from the
TopicIdPartitions that are
// being reconciled. Needed for interactions with the centralized
subscription state that
// does not support topic IDs yet, and for the callbacks.
@@ -858,6 +855,14 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
revokedPartitions.addAll(ownedPartitions);
revokedPartitions.removeAll(assignedTopicPartitions);
+ // If canCommit is false (called from background poll(), not from
AsyncPollEvent), skip
+ // reconciliation if it would involve revocation or auto-commit.
+ // Reconciliations revoking partitions cannot be triggered from the
background because the app thread could be returning records for those
partitions already.
+ // Reconciliations just adding new partitions are safe to trigger from
the background thread since new partitions won't have buffered records.
+ if (!canCommit && (autoCommitEnabled || !revokedPartitions.isEmpty()))
return;
+
+ markReconciliationInProgress();
+
log.info("Reconciling assignment with local epoch {}\n" +
"\tMember: {}\n" +
"\tAssigned partitions: {}\n" +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 17330650e29..8adc71b904b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1958,9 +1958,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
private Fetch<K, V> pollForFetches(Timer timer) {
- long pollTimeout = isCommittedOffsetsManagementEnabled()
- ? Math.min(applicationEventHandler.maximumTimeToWait(),
timer.remainingMs())
- : timer.remainingMs();
// if data is available already, return it immediately
final Fetch<K, V> fetch = collectFetch();
@@ -1968,6 +1965,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return fetch;
}
+ long pollTimeout = isCommittedOffsetsManagementEnabled()
+ ? Math.min(applicationEventHandler.maximumTimeToWait(),
timer.remainingMs())
+ : timer.remainingMs();
// With the non-blocking poll design, it's possible that at this point
the background thread is
// concurrently working to update positions. Therefore, a _copy_ of
the current assignment is retrieved
// and iterated looking for any partitions with invalid positions.
This is done to avoid being stuck
@@ -2019,6 +2019,28 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* for returning.
*/
private Fetch<K, V> collectFetch() {
+ // Do not return buffered records if the background hasn't checked for
pending reconciliations
+ // for the inflight poll event.
+ // This is key because partitions may need revocation, so we need to
wait for the reconciliation check
+ // that triggers commits and marks partitions as pending revocation,
before we can
+ // safely collect records from the buffer.
+ if (inflightPoll != null &&
!inflightPoll.isReconciliationCheckComplete()) {
+ // If the background hasn't had the time to check for pending
reconciliation,
+ // we need to wait for that check before moving on (instead of
returning empty right away,
+ // which will lead to blocking on buffer data)
+ long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
+ if (timeoutMs > 0) {
+ try {
+
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
+ } catch (TimeoutException e) {
+ return Fetch.empty();
+ }
+ } else {
+ // No time to wait and reconciliation check not complete
+ return Fetch.empty();
+ }
+ }
+
// With the non-blocking async poll, it's critical that the
application thread wait until the background
// thread has completed the stage of validating positions. This
prevents a race condition where both
// threads may attempt to update the SubscriptionState.position() for
a given partition. So if the background
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
index dfdbd958401..e37c30f2934 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import
org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -199,4 +200,16 @@ public class ShareMembershipManager extends
AbstractMembershipManager<ShareGroup
public int leaveGroupEpoch() {
return ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
}
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * For the ShareConsumer, full reconciliations can always be triggered
from the background thread
+ * (fully updates assignment).
+ */
+ @Override
+ public PollResult poll(final long currentTimeMs) {
+ maybeReconcile(true);
+ return PollResult.EMPTY;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 02208f39dee..d024d3736a4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -760,6 +760,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
+ // We completed checking pending reconciliations (commits triggered,
revoked partitions marked to prevent fetching)
+ // so the application thread poll loop can safely continue progress
now (fetching)
+ event.markReconciliationCheckComplete();
+
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager =
requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
index 068193ca498..cffd0607f06 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Time;
import java.time.Duration;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
/**
* This class represents the non-blocking event that executes logic
functionally equivalent to the following:
@@ -47,6 +48,7 @@ public class AsyncPollEvent extends ApplicationEvent
implements MetadataErrorNot
private volatile KafkaException error;
private volatile boolean isComplete;
private volatile boolean isValidatePositionsComplete;
+ private final CompletableFuture<Void> reconciliationCheckFuture = new
CompletableFuture<>();
/**
* Creates a new event to signify a multi-stage processing of {@link
Consumer#poll(Duration)} logic.
@@ -85,15 +87,47 @@ public class AsyncPollEvent extends ApplicationEvent
implements MetadataErrorNot
this.isValidatePositionsComplete = true;
}
+ /**
+ * @return the future that completes when the background thread has
checked any pending reconciliation
+ * for this poll event. Once complete, revocations have been handled
(commit triggered and partitions
+ * marked as pending revocation), so the app thread can safely proceed to
fetch/collect records.
+ */
+ public CompletableFuture<Void> reconciliationCheckFuture() {
+ return reconciliationCheckFuture;
+ }
+
+ /**
+ * @return true if the background already checked any pending
reconciliation when processing this poll event.
+ * If it completed the check, we know that revocations were handled
(commit triggered and partitions marked as pending revocation),
+ * so the app thread can safely proceed to fetch/collect records.
+ */
+ public boolean isReconciliationCheckComplete() {
+ return reconciliationCheckFuture.isDone();
+ }
+
+ /**
+ * Mark that reconciliation check is complete for this poll event.
+ * This should be called after the background has checked pending
reconciliations when processing this poll event
+ * (triggered commits, and marked partitions as pending revocation if
needed)
+ */
+ public void markReconciliationCheckComplete() {
+ reconciliationCheckFuture.complete(null);
+ }
+
public boolean isComplete() {
return isComplete;
}
public void completeSuccessfully() {
+ // Complete reconciliation future as safety net in case it wasn't
already marked complete
+ reconciliationCheckFuture.complete(null);
isComplete = true;
}
public void completeExceptionally(KafkaException e) {
+ // Complete reconciliation future to unblock any waiters - the error
will be surfaced
+ // through the normal checkInflightPoll() mechanism via the error field
+ reconciliationCheckFuture.complete(null);
error = e;
isComplete = true;
}
@@ -110,6 +144,7 @@ public class AsyncPollEvent extends ApplicationEvent
implements MetadataErrorNot
", pollTimeMs=" + pollTimeMs +
", error=" + error +
", isComplete=" + isComplete +
- ", isValidatePositionsComplete=" + isValidatePositionsComplete;
+ ", isValidatePositionsComplete=" + isValidatePositionsComplete +
+ ", isReconciliationCheckComplete=" +
isReconciliationCheckComplete();
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 9743da654b2..801942225b7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -535,6 +535,68 @@ public class AsyncKafkaConsumerTest {
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
+ /**
+ * Test that poll() does not return records until the reconciliation check
is complete.
+ * This prevents a race condition where records could be returned for
partitions that
+ * are being revoked (see KAFKA-20332).
+ */
+ @Test
+ public void testPollWaitsForReconciliationCheckComplete() {
+ final String topicName = "foo";
+ final int partition = 3;
+ final TopicPartition tp = new TopicPartition(topicName, partition);
+ final List<ConsumerRecord<String, String>> records = asList(
+ new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
+ new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
+ );
+
+ SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
+ consumer = newConsumer(
+ mock(FetchBuffer.class),
+ new ConsumerInterceptors<>(Collections.emptyList(), metrics),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ subscriptions);
+
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ // PositionsValidator starts with metadataUpdateVersion=-1. Stub
metadata.updateVersion() to match,
+ // so canSkipUpdateFetchPositions() passes and we test the
reconciliation check path.
+ doReturn(-1).when(metadata).updateVersion();
+
+ completeTopicSubscriptionChangeEventSuccessfully();
+ consumer.subscribe(singleton(topicName),
mock(ConsumerRebalanceListener.class));
+ // Simulate partition assignment from group coordinator
+ subscriptions.assignFromSubscribed(singleton(tp));
+
+ // Set up position so canSkipUpdateFetchPositions() returns true
(partition in FETCHING state)
+ completeSeekUnvalidatedEventSuccessfully();
+ subscriptions.seek(tp, 0);
+
+ // Set up fetch collector to return records when called
+ doReturn(Fetch.forPartition(tp, records, true, new
OffsetAndMetadata(4, Optional.of(0), "")))
+ .when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
+ // Capture the AsyncPollEvent to manually control when reconciliation
check is marked complete
+ AtomicReference<AsyncPollEvent> capturedEvent = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ AsyncPollEvent event = invocation.getArgument(0);
+ assertTrue(capturedEvent.compareAndSet(null, event));
+ // Do NOT mark reconciliation check complete - simulating
background hasn't processed it yet
+ return null;
+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
+
+ // Poll should return empty because reconciliation check is not
complete.
+ ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
+ assertTrue(result1.isEmpty(), "Poll should not return records if it
hasn't completed checking and triggering pending reconciliations.");
+
+ // Now mark reconciliation check complete on the captured event
+ assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been
captured");
+ capturedEvent.get().markReconciliationCheckComplete();
+
+ // Next poll should return the records since reconciliation check is
now complete
+ ConsumerRecords<?, ?> result2 = consumer.poll(Duration.ZERO);
+ assertEquals(2, result2.count(), "Expected 2 records after
reconciliation check is complete");
+ }
+
@Test
public void testEnsureCallbackExecutedByApplicationThread() {
consumer = newConsumer();