This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 3f84d38e8edac0ea87e57b1948883d97251044d3 Author: Andrew Schofield <[email protected]> AuthorDate: Tue Mar 31 17:22:51 2026 +0100 MINOR: fix share poll event to call the share membership manager (#21701) Fix to call the shareMembershipMgr reconcile when processing a share poll event (not the consumerMembershipManager) No changes in logic because maybeReconcile is implemented in the parent class AbstractMembershipMgr, but it's confusing and could lead to errors if ever we override the maybeReconcile. Reviewers: Andrew Schofield <[email protected]> --- .../events/ApplicationEventProcessor.java | 10 +++--- .../events/ApplicationEventProcessorTest.java | 36 +++++++++++++++++++++- 2 files changed, 40 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index daa9fe6a3a9..314684b6129 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -228,12 +228,12 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven } private void process(final SharePollEvent event) { - requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> - consumerMembershipManager.maybeReconcile(true)); - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); + requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> { + shareMembershipManager.maybeReconcile(true); + shareMembershipManager.onConsumerPoll(); }); + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> + hrm.resetPollTimer(event.pollTimeMs())); } private void process(final CreateFetchRequestsEvent event) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index f07a9da5ab3..e41e3c23054 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -31,6 +31,9 @@ import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; +import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; +import org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager; +import org.apache.kafka.clients.consumer.internals.ShareMembershipManager; import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager; import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager; import org.apache.kafka.clients.consumer.internals.SubscriptionState; @@ -83,7 +86,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@SuppressWarnings("ClassDataAbstractionCoupling") +@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"}) public class ApplicationEventProcessorTest { private final Time time = new MockTime(); private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class); @@ -95,6 +98,8 @@ public class ApplicationEventProcessorTest { private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class); private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class); + private final ShareHeartbeatRequestManager shareHeartbeatRequestManager = mock(ShareHeartbeatRequestManager.class); + private final ShareMembershipManager shareMembershipManager = mock(ShareMembershipManager.class); private ApplicationEventProcessor processor; private void setupProcessor(boolean withGroupId) { @@ -139,6 +144,22 @@ public class ApplicationEventProcessorTest { ); } + private void setupShareProcessor() { + RequestManagers requestManagers = new RequestManagers( + new LogContext(), + mock(ShareConsumeRequestManager.class), + Optional.of(mock(CoordinatorRequestManager.class)), + Optional.of(shareHeartbeatRequestManager), + Optional.of(shareMembershipManager) + ); + processor = new ApplicationEventProcessor( + new LogContext(), + requestManagers, + metadata, + subscriptionState + ); + } + @Test public void testPrepClosingCommitEvents() { setupProcessor(true); @@ -283,6 +304,19 @@ public class ApplicationEventProcessorTest { verify(fetchRequestManager).createFetchRequests(); } + @Test + public void testSharePollEventCallsShareManagers() { + SharePollEvent event = new SharePollEvent(12345); + + setupShareProcessor(); + processor.process(event); + + verify(shareMembershipManager).maybeReconcile(true); + verify(shareMembershipManager).onConsumerPoll(); + + verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs()); + } + @Test public void testTopicSubscriptionChangeEvent() { Set<String> topics = Set.of("topic1", "topic2");
