This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f2d9b0172e KAFKA-20309: Limit SharePollEvent to single instance 
(#21757)
2f2d9b0172e is described below

commit 2f2d9b0172ec6bfee6d332ff8b7cd39f2660a964
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 16 14:25:09 2026 +0000

    KAFKA-20309: Limit SharePollEvent to single instance (#21757)
    
    When a share consumer is initially joining a group but not yet fetching
    records, the poll loop has no records to wait for. Each iteration of the
    loop was creating an instance of `SharePollEvent` without considering
    whether an in-flight event existed. As a result, many events could
    briefly queue up for no good reason, only to be drained once the
    consumer successfully joined the group.
    
    The PR follows a similar design as used in the `AsyncKafkaConsumer` for
    managing its poll event, without that code's extra complication of
    validating the position.
    
    Reviewers: TaiJuWu <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../consumer/internals/ShareConsumerImpl.java      | 54 ++++++++++++++++++++--
 .../events/ApplicationEventProcessor.java          | 15 ++++--
 .../consumer/internals/events/SharePollEvent.java  | 36 ++++++++++++++-
 .../events/ApplicationEventProcessorTest.java      |  6 +--
 4 files changed, 98 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index bc7e591d78b..448e3436bd9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -204,6 +204,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     // Init value is needed to avoid NPE in case of exception raised in the 
constructor
     private Optional<ClientTelemetryReporter> clientTelemetryReporter = 
Optional.empty();
 
+    private SharePollEvent inFlightPoll;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
 
     // currentThread holds the threadId of the current thread accessing the 
KafkaShareConsumer
@@ -614,15 +615,19 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
             shouldSendShareFetchEvent = true;
 
-            do {
-                // Make sure the network thread can tell the application is 
actively polling
-                applicationEventHandler.add(new 
SharePollEvent(timer.currentTimeMs()));
+            // This distinguishes the first pass of the inner do/while loop 
from subsequent passes for the
+            // in-flight poll event logic.
+            boolean firstPass = true;
 
+            do {
                 // We must not allow wake-ups between polling for fetches and 
returning the records.
                 // A wake-up between returned fetches and returning records 
would lead to never
                 // returning the records in the fetches. Thus, we trigger a 
possible wake-up before we poll fetches.
                 wakeupTrigger.maybeTriggerWakeup();
 
+                // Make sure the network thread can tell the application is 
actively polling
+                checkInFlightPoll(timer, firstPass);
+                firstPass = false;
                 final ShareFetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
                     currentFetch = fetch;
@@ -649,6 +654,49 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         }
     }
 
+    private void checkInFlightPoll(Timer timer, boolean firstPass) {
+        if (firstPass && inFlightPoll != null) {
+            maybeClearPreviousInFlightPoll();
+        }
+
+        boolean newlySubmittedEvent = false;
+
+        if (inFlightPoll == null) {
+            inFlightPoll = new SharePollEvent(calculateDeadlineMs(timer), 
timer.currentTimeMs());
+            newlySubmittedEvent = true;
+            log.trace("In-flight event {} submitted", inFlightPoll);
+            applicationEventHandler.add(inFlightPoll);
+        }
+
+        timer.update();
+
+        if (inFlightPoll != null) {
+            maybeClearCurrentInFlightPoll(newlySubmittedEvent);
+        }
+    }
+
+    private void maybeClearPreviousInFlightPoll() {
+        if (inFlightPoll.isComplete()) {
+            log.trace("Previous in-flight event {} completed, clearing", 
inFlightPoll);
+            inFlightPoll = null;
+        } else if (inFlightPoll.isExpired(time)) {
+            log.trace("Previous in-flight event {} expired without completing, 
clearing", inFlightPoll);
+            inFlightPoll = null;
+        }
+    }
+
+    private void maybeClearCurrentInFlightPoll(boolean newlySubmittedEvent) {
+        if (inFlightPoll.isComplete()) {
+            log.trace("In-flight event {} completed without error, clearing", 
inFlightPoll);
+            inFlightPoll = null;
+        } else if (!newlySubmittedEvent) {
+            if (inFlightPoll.isExpired(time)) {
+                log.trace("In-flight event {} expired without completing, 
clearing", inFlightPoll);
+                inFlightPoll = null;
+            }
+        }
+    }
+
     private ShareFetch<K, V> pollForFetches(final Timer timer) {
         long pollTimeout = 
Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs());
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 314684b6129..5e11b3a0ef6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -28,6 +28,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
 import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
 import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
@@ -228,12 +229,16 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final SharePollEvent event) {
-        
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
-            shareMembershipManager.maybeReconcile(true);
-            shareMembershipManager.onConsumerPoll();
+        
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager ->
+            shareMembershipManager.maybeReconcile(true));
+
+        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+            ShareMembershipManager membershipManager = hrm.membershipManager();
+            membershipManager.onConsumerPoll();
+            hrm.resetPollTimer(event.pollTimeMs());
         });
-        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
-            hrm.resetPollTimer(event.pollTimeMs()));
+
+        event.completeSuccessfully();
     }
 
     private void process(final CreateFetchRequestsEvent event) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
index 2db7b18173c..577f95a4837 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
@@ -16,21 +16,53 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.ShareConsumer;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+
 public class SharePollEvent extends ApplicationEvent {
 
+    private final long deadlineMs;
     private final long pollTimeMs;
+    private volatile boolean isComplete;
 
-    public SharePollEvent(final long pollTimeMs) {
+    /**
+     * @param deadlineMs        Time, in milliseconds, at which point the 
event must be completed; based on the
+     *                          {@link Duration} passed to {@link 
ShareConsumer#poll(Duration)}
+     * @param pollTimeMs        Time, in milliseconds, at which point the 
event was created
+     */
+    public SharePollEvent(final long deadlineMs, final long pollTimeMs) {
         super(Type.SHARE_POLL);
+        this.deadlineMs = deadlineMs;
         this.pollTimeMs = pollTimeMs;
     }
 
+    public long deadlineMs() {
+        return deadlineMs;
+    }
+
     public long pollTimeMs() {
         return pollTimeMs;
     }
 
+    public boolean isExpired(final Time time) {
+        return time.milliseconds() >= deadlineMs();
+    }
+
+    public boolean isComplete() {
+        return isComplete;
+    }
+
+    public void completeSuccessfully() {
+        isComplete = true;
+    }
+
     @Override
     public String toStringBase() {
-        return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
+        return super.toStringBase() +
+            ", deadlineMs=" + deadlineMs +
+            ", pollTimeMs=" + pollTimeMs +
+            ", isComplete=" + isComplete;
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 938a4b86618..e3a9a05c4cd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -306,14 +306,14 @@ public class ApplicationEventProcessorTest {
 
     @Test
     public void testSharePollEventCallsShareManagers() {
-        SharePollEvent event = new SharePollEvent(12345);
+        SharePollEvent event = new SharePollEvent(12346, 12345);
 
         setupShareProcessor();
+        
when(shareHeartbeatRequestManager.membershipManager()).thenReturn(shareMembershipManager);
         processor.process(event);
-
+        assertTrue(event.isComplete());
         verify(shareMembershipManager).maybeReconcile(true);
         verify(shareMembershipManager).onConsumerPoll();
-
         
verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
     }
 

Reply via email to