This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3f84d38e8edac0ea87e57b1948883d97251044d3
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Mar 31 17:22:51 2026 +0100

    MINOR: fix share poll event to call the share membership manager (#21701)
    
        Fix to call the shareMembershipMgr reconcile when processing a share
        poll event (not the consumerMembershipManager)
    
        No changes in logic because maybeReconcile is implemented in the parent
        class AbstractMembershipMgr, but it's confusing and could lead to errors
        if ever we override the maybeReconcile.
    
        Reviewers: Andrew Schofield <[email protected]>
---
 .../events/ApplicationEventProcessor.java          | 10 +++---
 .../events/ApplicationEventProcessorTest.java      | 36 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index daa9fe6a3a9..314684b6129 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -228,12 +228,12 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final SharePollEvent event) {
-        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
-            consumerMembershipManager.maybeReconcile(true));
-        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
-            hrm.membershipManager().onConsumerPoll();
-            hrm.resetPollTimer(event.pollTimeMs());
+        
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
+            shareMembershipManager.maybeReconcile(true);
+            shareMembershipManager.onConsumerPoll();
         });
+        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
+            hrm.resetPollTimer(event.pollTimeMs()));
     }
 
     private void process(final CreateFetchRequestsEvent event) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f07a9da5ab3..e41e3c23054 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -31,6 +31,9 @@ import 
org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import 
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager;
+import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
 import 
org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
 import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -83,7 +86,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("ClassDataAbstractionCoupling")
+@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
 public class ApplicationEventProcessorTest {
     private final Time time = new MockTime();
     private final CommitRequestManager commitRequestManager = 
mock(CommitRequestManager.class);
@@ -95,6 +98,8 @@ public class ApplicationEventProcessorTest {
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
     private final StreamsGroupHeartbeatRequestManager 
streamsGroupHeartbeatRequestManager = 
mock(StreamsGroupHeartbeatRequestManager.class);
     private final StreamsMembershipManager streamsMembershipManager = 
mock(StreamsMembershipManager.class);
+    private final ShareHeartbeatRequestManager shareHeartbeatRequestManager = 
mock(ShareHeartbeatRequestManager.class);
+    private final ShareMembershipManager shareMembershipManager = 
mock(ShareMembershipManager.class);
     private ApplicationEventProcessor processor;
 
     private void setupProcessor(boolean withGroupId) {
@@ -139,6 +144,22 @@ public class ApplicationEventProcessorTest {
         );
     }
 
+    private void setupShareProcessor() {
+        RequestManagers requestManagers = new RequestManagers(
+            new LogContext(),
+            mock(ShareConsumeRequestManager.class),
+            Optional.of(mock(CoordinatorRequestManager.class)),
+            Optional.of(shareHeartbeatRequestManager),
+            Optional.of(shareMembershipManager)
+        );
+        processor = new ApplicationEventProcessor(
+            new LogContext(),
+            requestManagers,
+            metadata,
+            subscriptionState
+        );
+    }
+
     @Test
     public void testPrepClosingCommitEvents() {
         setupProcessor(true);
@@ -283,6 +304,19 @@ public class ApplicationEventProcessorTest {
         verify(fetchRequestManager).createFetchRequests();
     }
 
+    @Test
+    public void testSharePollEventCallsShareManagers() {
+        SharePollEvent event = new SharePollEvent(12345);
+
+        setupShareProcessor();
+        processor.process(event);
+
+        verify(shareMembershipManager).maybeReconcile(true);
+        verify(shareMembershipManager).onConsumerPoll();
+
+        
verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
+    }
+
     @Test
     public void testTopicSubscriptionChangeEvent() {
         Set<String> topics = Set.of("topic1", "topic2");

Reply via email to