This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new b29efde02aa KAFKA-20309: Limit SharePollEvent to single instance
(#21757)
b29efde02aa is described below
commit b29efde02aacbbebba7549774dcfb74f25862077
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 16 14:25:09 2026 +0000
KAFKA-20309: Limit SharePollEvent to single instance (#21757)
When a share consumer is initially joining a group but not yet fetching
records, the poll loop has no records to wait for. Each iteration of the
loop was creating an instance of `SharePollEvent` without considering
whether an in-flight event existed. As a result, many events could
briefly queue up for no good reason, only to be drained once the
consumer successfully joined the group.
The PR follows a similar design as used in the `AsyncKafkaConsumer` for
managing its poll event, without that code's extra complication of
validating the position.
Reviewers: TaiJuWu <[email protected]>, Lianet Magrans
<[email protected]>
---
.../consumer/internals/ShareConsumerImpl.java | 54 ++++++++++++++++++++--
.../events/ApplicationEventProcessor.java | 15 ++++--
.../consumer/internals/events/SharePollEvent.java | 36 ++++++++++++++-
.../events/ApplicationEventProcessorTest.java | 6 +--
4 files changed, 98 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 3a7eb81df60..7bc5e08501f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -205,6 +205,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the
constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter =
Optional.empty();
+ private SharePollEvent inFlightPoll;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
// currentThread holds the threadId of the current thread accessing the
KafkaShareConsumer
@@ -615,15 +616,19 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
shouldSendShareFetchEvent = true;
- do {
- // Make sure the network thread can tell the application is
actively polling
- applicationEventHandler.add(new
SharePollEvent(timer.currentTimeMs()));
+ // This distinguishes the first pass of the inner do/while loop
from subsequent passes for the
+ // in-flight poll event logic.
+ boolean firstPass = true;
+ do {
// We must not allow wake-ups between polling for fetches and
returning the records.
// A wake-up between returned fetches and returning records
would lead to never
// returning the records in the fetches. Thus, we trigger a
possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();
+ // Make sure the network thread can tell the application is
actively polling
+ checkInFlightPoll(timer, firstPass);
+ firstPass = false;
final ShareFetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
currentFetch = fetch;
@@ -650,6 +655,49 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
}
}
+ private void checkInFlightPoll(Timer timer, boolean firstPass) {
+ if (firstPass && inFlightPoll != null) {
+ maybeClearPreviousInFlightPoll();
+ }
+
+ boolean newlySubmittedEvent = false;
+
+ if (inFlightPoll == null) {
+ inFlightPoll = new SharePollEvent(calculateDeadlineMs(timer),
timer.currentTimeMs());
+ newlySubmittedEvent = true;
+ log.trace("In-flight event {} submitted", inFlightPoll);
+ applicationEventHandler.add(inFlightPoll);
+ }
+
+ timer.update();
+
+ if (inFlightPoll != null) {
+ maybeClearCurrentInFlightPoll(newlySubmittedEvent);
+ }
+ }
+
+ private void maybeClearPreviousInFlightPoll() {
+ if (inFlightPoll.isComplete()) {
+ log.trace("Previous in-flight event {} completed, clearing",
inFlightPoll);
+ inFlightPoll = null;
+ } else if (inFlightPoll.isExpired(time)) {
+ log.trace("Previous in-flight event {} expired without completing,
clearing", inFlightPoll);
+ inFlightPoll = null;
+ }
+ }
+
+ private void maybeClearCurrentInFlightPoll(boolean newlySubmittedEvent) {
+ if (inFlightPoll.isComplete()) {
+ log.trace("In-flight event {} completed without error, clearing",
inFlightPoll);
+ inFlightPoll = null;
+ } else if (!newlySubmittedEvent) {
+ if (inFlightPoll.isExpired(time)) {
+ log.trace("In-flight event {} expired without completing,
clearing", inFlightPoll);
+ inFlightPoll = null;
+ }
+ }
+ }
+
private ShareFetch<K, V> pollForFetches(final Timer timer) {
long pollTimeout =
Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 314684b6129..5e11b3a0ef6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -28,6 +28,7 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
@@ -228,12 +229,16 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final SharePollEvent event) {
-
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
- shareMembershipManager.maybeReconcile(true);
- shareMembershipManager.onConsumerPoll();
+
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager ->
+ shareMembershipManager.maybeReconcile(true));
+
+ requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+ ShareMembershipManager membershipManager = hrm.membershipManager();
+ membershipManager.onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
});
- requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
- hrm.resetPollTimer(event.pollTimeMs()));
+
+ event.completeSuccessfully();
}
private void process(final CreateFetchRequestsEvent event) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
index 2db7b18173c..577f95a4837 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java
@@ -16,21 +16,53 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.ShareConsumer;
+import org.apache.kafka.common.utils.Time;
+
+import java.time.Duration;
+
public class SharePollEvent extends ApplicationEvent {
+ private final long deadlineMs;
private final long pollTimeMs;
+ private volatile boolean isComplete;
- public SharePollEvent(final long pollTimeMs) {
+ /**
+ * @param deadlineMs Time, in milliseconds, at which point the
event must be completed; based on the
+ * {@link Duration} passed to {@link
ShareConsumer#poll(Duration)}
+ * @param pollTimeMs Time, in milliseconds, at which point the
event was created
+ */
+ public SharePollEvent(final long deadlineMs, final long pollTimeMs) {
super(Type.SHARE_POLL);
+ this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
}
+ public long deadlineMs() {
+ return deadlineMs;
+ }
+
public long pollTimeMs() {
return pollTimeMs;
}
+ public boolean isExpired(final Time time) {
+ return time.milliseconds() >= deadlineMs();
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public void completeSuccessfully() {
+ isComplete = true;
+ }
+
@Override
public String toStringBase() {
- return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
+ return super.toStringBase() +
+ ", deadlineMs=" + deadlineMs +
+ ", pollTimeMs=" + pollTimeMs +
+ ", isComplete=" + isComplete;
}
}
\ No newline at end of file
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index e41e3c23054..5e8dfc7a5bb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -306,14 +306,14 @@ public class ApplicationEventProcessorTest {
@Test
public void testSharePollEventCallsShareManagers() {
- SharePollEvent event = new SharePollEvent(12345);
+ SharePollEvent event = new SharePollEvent(12346, 12345);
setupShareProcessor();
+
when(shareHeartbeatRequestManager.membershipManager()).thenReturn(shareMembershipManager);
processor.process(event);
-
+ assertTrue(event.isComplete());
verify(shareMembershipManager).maybeReconcile(true);
verify(shareMembershipManager).onConsumerPoll();
-
verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
}