This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new dd15ae62f20 KAFKA-20330: Ack handling improvement on broker restart 
(#21824)
dd15ae62f20 is described below

commit dd15ae62f20e9526591fb24e46b01125ed5f8cfa
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 30 12:50:31 2026 +0100

    KAFKA-20330: Ack handling improvement on broker restart (#21824)
    
    A share consumer uses a share session to keep track of its acquired
    records with each share-partition leader it is talking to. When the
    connection breaks, the share session is lost and acknowledgements fail.
    
    When the connection to a share-partition leader failed while there was
    an outstanding request, the share consumer noticed the disconnection and
    failed the acknowledgements as expected. Also, if the leader changed to
    a different broker, again the share consumer noticed the leadership
    change and failed the acknowledgements as expected.
    
    In the situation where the share-partition leadership did not change
    when the broker restarted AND there was no in-flight request, the share
    consumer did not notice the disconnection and would try to continue the
    share session. There were also a few situations in which
    acknowledgements could be lost and not notified to the acknowledgement
    commit callback to do with this kind of leadership non-transition.
    
    This PR improves the situation by using a consistent exception
    `NotLeaderOrFollowerException` regardless of when the disconnection was
    noticed. It also makes sure that acknowledgements which cannot be sent
    are completed properly in all cases.
    
    It is possible that a tweak to the protocol will be needed to eliminate
    `ShareSessionNotFoundException` in all edge cases, but that would take a
    KIP.
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 141 +++++++++
 .../internals/ShareConsumeRequestManager.java      | 320 ++++++++++++---------
 .../clients/consumer/internals/ShareFetch.java     |   5 +-
 .../common/requests/ShareRequestMetadata.java      |   2 +
 .../internals/ShareConsumeRequestManagerTest.java  | 182 +++++++++++-
 5 files changed, 498 insertions(+), 152 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 89ea1b38c50..34d072072ba 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -48,8 +48,10 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.ShareSessionNotFoundException;
 import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.Header;
@@ -1301,6 +1303,145 @@ public class ShareConsumerTest {
             "Consumer close should not wait for full timeout when broker is 
already shut down");
     }
 
+    @ClusterTest
+    public void 
testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementSync() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))) {
+
+            AtomicBoolean callbackCalled = new AtomicBoolean(false);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) -> {
+                assertInstanceOf(NotLeaderOrFollowerException.class, 
exception);
+                callbackCalled.set(true);
+            });
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            producer.send(record);
+            producer.flush();
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(20000));
+            assertEquals(1, records.count());
+            ConsumerRecord<byte[], byte[]> consumerRecord = 
records.iterator().next();
+
+            // Shutdown the broker
+            assertEquals(1, cluster.brokers().size());
+            KafkaBroker broker = cluster.brokers().get(0);
+            cluster.shutdownBroker(0);
+
+            broker.awaitShutdown();
+
+            // Restart the broker
+            cluster.startBroker(0);
+
+            shareConsumer.acknowledge(consumerRecord);
+            Map<TopicIdPartition, Optional<KafkaException>> commitResult = 
shareConsumer.commitSync(Duration.ofMillis(30000));
+            assertEquals(1, commitResult.size());
+            TopicIdPartition tidp = commitResult.keySet().iterator().next();
+            assertTrue(commitResult.get(tidp).isPresent());
+            assertInstanceOf(NotLeaderOrFollowerException.class, 
commitResult.get(tidp).get());
+
+            assertTrue(callbackCalled.get());
+        }
+    }
+
+    @ClusterTest
+    public void 
testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementAsync() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))) {
+
+            AtomicBoolean callbackCalled = new AtomicBoolean(false);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) -> {
+                assertInstanceOf(NotLeaderOrFollowerException.class, 
exception);
+                callbackCalled.set(true);
+            });
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            producer.send(record);
+            producer.flush();
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(20000));
+            assertEquals(1, records.count());
+            ConsumerRecord<byte[], byte[]> consumerRecord = 
records.iterator().next();
+
+            // Shutdown the broker
+            assertEquals(1, cluster.brokers().size());
+            KafkaBroker broker = cluster.brokers().get(0);
+            cluster.shutdownBroker(0);
+
+            broker.awaitShutdown();
+
+            // Restart the broker
+            cluster.startBroker(0);
+
+            shareConsumer.acknowledge(consumerRecord);
+            shareConsumer.commitAsync();
+
+            int maxRetries = 15;
+            int retries = 0;
+            while (retries < maxRetries) {
+                shareConsumer.poll(Duration.ofMillis(2000));
+                if (callbackCalled.get()) {
+                    break;
+                }
+                retries++;
+            }
+
+            assertTrue(callbackCalled.get());
+        }
+    }
+
+    @ClusterTest
+    public void 
testLeaderRestartWithoutLeadershipChangeImplicitAcknowledgement() {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+
+            AtomicBoolean callbackCalled = new AtomicBoolean(false);
+            
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition, 
exception) -> {
+                assertInstanceOf(ShareSessionNotFoundException.class, 
exception);
+                callbackCalled.set(true);
+            });
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            producer.send(record);
+            producer.flush();
+
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(20000));
+            assertEquals(1, records.count());
+
+            // Shutdown the broker
+            assertEquals(1, cluster.brokers().size());
+            KafkaBroker broker = cluster.brokers().get(0);
+            cluster.shutdownBroker(0);
+
+            broker.awaitShutdown();
+
+            // Restart the broker
+            cluster.startBroker(0);
+
+            int maxRetries = 15;
+            int retries = 0;
+            while (retries < maxRetries) {
+                shareConsumer.poll(Duration.ofMillis(2000));
+                if (callbackCalled.get()) {
+                    break;
+                }
+                retries++;
+            }
+
+            assertTrue(callbackCalled.get());
+        }
+    }
+
     @ClusterTests({
         @ClusterTest(serverProperties = {
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index dd1b829bcb3..104199a9d3b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -255,7 +255,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
             // For record_limit mode, we only send a full ShareFetch to a 
single node at a time.
             // We prepare to build ShareFetch requests for all nodes with 
session handlers to permit
-            // piggy-backing of acknowledgements, and also to adjust the 
topic-partitions
+            // piggybacking of acknowledgements, and also to adjust the 
topic-partitions
             // in the share session, but if the request would contain neither 
of those, it can be skipped.
             boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() && 
target.id() != fetchRecordsNodeId.get();
 
@@ -288,9 +288,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
     /**
      * Add acknowledgements for a topic-partition to the node's in-flight 
acknowledgements.
+     * If we cannot add acknowledgements, they are completed with {@link 
Errors#NOT_LEADER_OR_FOLLOWER} exception.
+     * This probably indicates the connection to the leader broker was lost, 
but then re-established without a
+     * leadership change, in which case the acknowledgements fail.
      *
      * @return True if we can add acknowledgements to the share session.
-     * If we cannot add acknowledgements, they are completed with {@link 
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
      */
     private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
                                              Node node,
@@ -299,7 +301,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         if (handler.isNewSession()) {
             // Failing the acknowledgements as we cannot have piggybacked 
acknowledgements in the initial ShareFetchRequest.
             log.debug("Cannot send acknowledgements on initial epoch for 
ShareSession for partition {}", tip);
-            
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+            
acknowledgements.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
             maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements), 
true, Optional.empty());
             return false;
         } else {
@@ -540,49 +542,75 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
     public CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
commitSync(
             final Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap,
             final long deadlineMs) {
+        final Cluster cluster = metadata.fetch();
         final AtomicInteger resultCount = new AtomicInteger();
         final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
future = new CompletableFuture<>();
         final ResultHandler resultHandler = new ResultHandler(resultCount, 
Optional.of(future));
 
-        final Cluster cluster = metadata.fetch();
+        Map<Integer, Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapAllNodes = new HashMap<>();
+        Map<TopicIdPartition, Acknowledgements> acknowledgementsMapCannotSend 
= new HashMap<>();
+        acknowledgementsMap.forEach((tip, nodeAcks) -> {
+            if ((cluster.nodeById(nodeAcks.nodeId()) == null) || 
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+                Acknowledgements prevAcks = 
acknowledgementsMapCannotSend.putIfAbsent(tip, nodeAcks.acknowledgements());
+                if (prevAcks != null) {
+                    prevAcks.merge(nodeAcks.acknowledgements());
+                }
+            } else {
+                Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new 
HashMap<>());
+                Acknowledgements prevAcks = acksMap.putIfAbsent(tip, 
nodeAcks.acknowledgements());
+                if (prevAcks != null) {
+                    prevAcks.merge(nodeAcks.acknowledgements());
+                }
+            }
+        });
+
+        resultCount.addAndGet(acknowledgementsMapCannotSend.size());
 
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
-            Node node = cluster.nodeById(nodeId);
-            if (node != null) {
-                acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
-
-                // Add the incoming commitSync() request to the queue.
-                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
-                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-                    NodeAcknowledgements nodeAcknowledgements = 
acknowledgementsMap.get(tip);
-                    if ((nodeAcknowledgements != null) && 
(nodeAcknowledgements.nodeId() == node.id())) {
-                        if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
-                            acknowledgementsMapForNode.put(tip, 
nodeAcknowledgements.acknowledgements());
-
-                            
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
-                            log.debug("Added sync acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                            resultCount.incrementAndGet();
-                        } else {
-                            
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
-                        }
-                    }
-                }
+            Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements = 
acknowledgementsMapAllNodes.get(nodeId);
+            if (nodeAcknowledgements == null)
+                return;
 
-                if (!acknowledgementsMapForNode.isEmpty()) {
-                    acknowledgeRequestStates.get(nodeId).addSyncRequest(new 
AcknowledgeRequestState(logContext,
-                        ShareConsumeRequestManager.class.getSimpleName() + 
":1",
-                        deadlineMs,
-                        retryBackoffMs,
-                        retryBackoffMaxMs,
-                        sessionHandler,
-                        nodeId,
-                        acknowledgementsMapForNode,
-                        resultHandler,
-                        AcknowledgeRequestType.COMMIT_SYNC
-                    ));
+            Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToSend 
= new HashMap<>();
+
+            acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
+
+            // Add the incoming commitSync() request to the queue.
+            for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
+                Acknowledgements acknowledgements = 
nodeAcknowledgements.remove(tip);
+                if (acknowledgements != null) {
+                    acknowledgementsMapToSend.put(tip, acknowledgements);
+                    resultCount.incrementAndGet();
+
+                    
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                    log.debug("Added sync acknowledge request for partition {} 
to node {}", tip.topicPartition(), nodeId);
                 }
             }
+
+            resultCount.addAndGet(nodeAcknowledgements.size());
+            nodeAcknowledgements.forEach((tip, acks) -> {
+                acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                resultHandler.complete(tip, acks, 
AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty());
+            });
+
+            if (!acknowledgementsMapToSend.isEmpty()) {
+                acknowledgeRequestStates.get(nodeId).addSyncRequest(new 
AcknowledgeRequestState(logContext,
+                    ShareConsumeRequestManager.class.getSimpleName() + ":1",
+                    deadlineMs,
+                    retryBackoffMs,
+                    retryBackoffMaxMs,
+                    sessionHandler,
+                    nodeId,
+                    acknowledgementsMapToSend,
+                    resultHandler,
+                    AcknowledgeRequestType.COMMIT_SYNC
+                ));
+            }
+        });
+
+        acknowledgementsMapCannotSend.forEach((tip, acks) -> {
+            acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+            resultHandler.complete(tip, acks, 
AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty());
         });
 
         resultHandler.completeIfEmpty();
@@ -602,48 +630,63 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         final Cluster cluster = metadata.fetch();
         final ResultHandler resultHandler = new 
ResultHandler(Optional.empty());
 
+        Map<Integer, Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapAllNodes = new HashMap<>();
+        acknowledgementsMap.forEach((tip, nodeAcks) -> {
+            if ((cluster.nodeById(nodeAcks.nodeId()) == null) || 
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+                log.debug("Leader for the partition is down or has changed, 
failing acknowledgements for partition {}", tip);
+                
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true, Optional.empty());
+            } else {
+                Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new 
HashMap<>());
+                Acknowledgements prevAcks = acksMap.putIfAbsent(tip, 
nodeAcks.acknowledgements());
+                if (prevAcks != null) {
+                    prevAcks.merge(nodeAcks.acknowledgements());
+                }
+            }
+        });
+
         sessionHandlers.forEach((nodeId, sessionHandler) -> {
-            Node node = cluster.nodeById(nodeId);
-            if (node != null) {
-                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = new HashMap<>();
-
-                acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
-
-                for (TopicIdPartition tip : 
sessionHandler.sessionPartitions()) {
-                    NodeAcknowledgements nodeAcknowledgements = 
acknowledgementsMap.get(tip);
-                    if ((nodeAcknowledgements != null) && 
(nodeAcknowledgements.nodeId() == node.id())) {
-                        if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
-                            Acknowledgements acknowledgements = 
nodeAcknowledgements.acknowledgements();
-                            acknowledgementsMapForNode.put(tip, 
acknowledgements);
-
-                            
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                            log.debug("Added async acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                            AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
-                            if (asyncRequestState == null) {
-                                
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new 
AcknowledgeRequestState(logContext,
-                                        
ShareConsumeRequestManager.class.getSimpleName() + ":2",
-                                        deadlineMs,
-                                        retryBackoffMs,
-                                        retryBackoffMaxMs,
-                                        sessionHandler,
-                                        nodeId,
-                                        acknowledgementsMapForNode,
-                                        resultHandler,
-                                        AcknowledgeRequestType.COMMIT_ASYNC
-                                ));
-                            } else {
-                                Acknowledgements prevAcks = 
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
-                                if (prevAcks != null) {
-                                    
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
-                                }
-                            }
-                        } else {
-                            
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
+            Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements = 
acknowledgementsMapAllNodes.get(nodeId);
+            if (nodeAcknowledgements == null)
+                return;
+
+            Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode 
= new HashMap<>();
+
+            acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
+
+            for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
+                Acknowledgements acknowledgements = 
nodeAcknowledgements.remove(tip);
+                if (acknowledgements != null) {
+                    acknowledgementsMapForNode.put(tip, acknowledgements);
+
+                    
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                    log.debug("Added async acknowledge request for partition 
{} to node {}", tip.topicPartition(), nodeId);
+                    AcknowledgeRequestState asyncRequestState = 
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+                    if (asyncRequestState == null) {
+                        
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new 
AcknowledgeRequestState(logContext,
+                            ShareConsumeRequestManager.class.getSimpleName() + 
":2",
+                            deadlineMs,
+                            retryBackoffMs,
+                            retryBackoffMaxMs,
+                            sessionHandler,
+                            nodeId,
+                            acknowledgementsMapForNode,
+                            resultHandler,
+                            AcknowledgeRequestType.COMMIT_ASYNC
+                        ));
+                    } else {
+                        Acknowledgements prevAcks = 
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
+                        if (prevAcks != null) {
+                            
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
                         }
                     }
                 }
             }
+
+            nodeAcknowledgements.forEach((tip, acks) -> {
+                acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true, 
Optional.empty());
+            });
         });
 
         resultHandler.completeIfEmpty();
@@ -659,82 +702,83 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
      *
      * @return The future which completes when the acknowledgements finished
      */
-    public CompletableFuture<Void> acknowledgeOnClose(final 
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
-                                                      final long deadlineMs) {
+    public CompletableFuture<Void> acknowledgeOnClose(
+            final Map<TopicIdPartition, NodeAcknowledgements> 
acknowledgementsMap,
+            final long deadlineMs) {
         final Cluster cluster = metadata.fetch();
         final AtomicInteger resultCount = new AtomicInteger();
         final ResultHandler resultHandler = new ResultHandler(resultCount, 
Optional.empty());
 
         closing = true;
-        Map<Integer, Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapAllNodes = new HashMap<>();
 
+        Map<Integer, Map<TopicIdPartition, Acknowledgements>> 
acknowledgementsMapAllNodes = new HashMap<>();
         acknowledgementsMap.forEach((tip, nodeAcks) -> {
-            if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+            if ((cluster.nodeById(nodeAcks.nodeId()) == null) || 
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+                
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true, Optional.empty());
+            } else {
                 Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new 
HashMap<>());
                 Acknowledgements prevAcks = acksMap.putIfAbsent(tip, 
nodeAcks.acknowledgements());
                 if (prevAcks != null) {
-                    acksMap.get(tip).merge(nodeAcks.acknowledgements());
+                    prevAcks.merge(nodeAcks.acknowledgements());
                 }
-            } else {
-                
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                maybeSendShareAcknowledgementEvent(Map.of(tip, 
nodeAcks.acknowledgements()), true, Optional.empty());
             }
         });
 
-        sessionHandlers.forEach((nodeId, sessionHandler) -> {
-            Node node = cluster.nodeById(nodeId);
-            if (node != null) {
-                //Add any waiting piggyback acknowledgements for the node.
-                Map<TopicIdPartition, Acknowledgements> fetchAcks = 
fetchAcknowledgementsToSend.remove(nodeId);
-                if (fetchAcks != null) {
-                    fetchAcks.forEach((tip, acks) -> {
-                        if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
-                            Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
-                            Acknowledgements prevAcks = 
acksMap.putIfAbsent(tip, acks);
-                            if (prevAcks != null) {
-                                acksMap.get(tip).merge(acks);
-                            }
-                        } else {
-                            
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
-                            maybeSendShareAcknowledgementEvent(Map.of(tip, 
acks), true, Optional.empty());
-                        }
-                    });
-                }
-
-                Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId);
-                if (acknowledgementsMapForNode != null) {
-                    acknowledgementsMapForNode.forEach((tip, acknowledgements) 
-> {
-                        
metricsManager.recordAcknowledgementSent(acknowledgements.size());
-                        log.debug("Added closing acknowledge request for 
partition {} to node {}", tip.topicPartition(), node.id());
-                        resultCount.incrementAndGet();
-                    });
+        // Add any waiting piggyback acknowledgements.
+        fetchAcknowledgementsToSend.forEach((nodeId, nodeAcks) ->
+            nodeAcks.forEach((tip, acks) -> {
+                if ((cluster.nodeById(nodeId) == null) || 
isLeaderKnownToHaveChanged(nodeId, tip)) {
+                    acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                    maybeSendShareAcknowledgementEvent(Map.of(tip, acks), 
true, Optional.empty());
                 } else {
-                    acknowledgementsMapForNode = new HashMap<>();
+                    Map<TopicIdPartition, Acknowledgements> acksMap = 
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
+                    Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks);
+                    if (prevAcks != null) {
+                        prevAcks.merge(acks);
+                    }
                 }
+            })
+        );
 
-                acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
+        sessionHandlers.forEach((nodeId, sessionHandler) -> {
+            Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements = 
acknowledgementsMapAllNodes.get(nodeId);
+            if (nodeAcknowledgements != null) {
+                nodeAcknowledgements.forEach((tip, acknowledgements) -> {
+                    resultCount.incrementAndGet();
 
-                // Ensure there is no close() request already present as they 
are blocking calls
-                // and only one request can be active at a time.
-                if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != 
null && 
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
 {
-                    log.error("Attempt to call close() when there is an 
existing close request for node {}-{}", node.id(), 
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
-                    closeFuture.completeExceptionally(
-                            new IllegalStateException("Attempt to call close() 
when there is an existing close request for node : " + node.id()));
-                } else {
-                    // There can only be one close() happening at a time. So 
per node, there will be one acknowledge request state.
-                    acknowledgeRequestStates.get(nodeId).setCloseRequest(
-                        new AcknowledgeRequestState(logContext,
-                            ShareConsumeRequestManager.class.getSimpleName() + 
":3",
-                            deadlineMs,
-                            retryBackoffMs,
-                            retryBackoffMaxMs,
-                            sessionHandler,
-                            nodeId,
-                            acknowledgementsMapForNode,
-                            resultHandler,
-                            AcknowledgeRequestType.CLOSE
-                    ));
-                }
+                    
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+                    log.debug("Added closing acknowledge request for partition 
{} to node {}", tip.topicPartition(), nodeId);
+                });
+            } else {
+                nodeAcknowledgements = new HashMap<>();
+            }
+
+            acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null, 
null, null));
+
+            // Ensure there is no close() request already present as they are 
blocking calls
+            // and only one request can be active at a time.
+            if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null 
&& 
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
 {
+                log.error("Attempt to call close() when there is an existing 
close request for node {}-{}", nodeId, 
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
+                nodeAcknowledgements.forEach((tip, acks) -> {
+                    acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+                    maybeSendShareAcknowledgementEvent(Map.of(tip, acks), 
true, Optional.empty());
+                });
+                closeFuture.completeExceptionally(new 
IllegalStateException("Attempt to call close() when there is an existing close 
request for node " + nodeId));
+            } else {
+                // There can only be one close() happening at a time. So per 
node, there will be one acknowledge request state.
+                acknowledgeRequestStates.get(nodeId).setCloseRequest(
+                    new AcknowledgeRequestState(logContext,
+                        ShareConsumeRequestManager.class.getSimpleName() + 
":3",
+                        deadlineMs,
+                        retryBackoffMs,
+                        retryBackoffMaxMs,
+                        sessionHandler,
+                        nodeId,
+                        nodeAcknowledgements,
+                        resultHandler,
+                        AcknowledgeRequestType.CLOSE
+                ));
             }
         });
 
@@ -744,6 +788,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
 
     /**
      * The method checks whether the leader for a topicIdPartition has changed.
+     *
      * @param nodeId The previous leader for the partition.
      * @param topicIdPartition The TopicIdPartition to check.
      * @return Returns true if leader information is available and leader has 
changed.
@@ -1243,7 +1288,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             Node nodeToSend = metadata.fetch().nodeById(nodeId);
 
             if (requestBuilder == null) {
-                handleAcknowledgeShareSessionNotFound();
+                handleNewShareSessionNotLeaderOrFollower();
                 return null;
             } else if (nodeToSend != null) {
                 nodesWithPendingRequests.add(nodeId);
@@ -1344,17 +1389,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
         }
 
         /**
-         * Set the error code for all remaining acknowledgements in the event
-         * of a share session not found error which prevents the remaining 
acknowledgements from
-         * being sent.
+         * Set the error code for all remaining acknowledgements in the event 
that a new share session
+         * needs to be started which prevents the remaining acknowledgements 
from being sent.
          */
-        void handleAcknowledgeShareSessionNotFound() {
+        void handleNewShareSessionNotLeaderOrFollower() {
             Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToClear 
=
                 incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend 
: incompleteAcknowledgements;
 
             acknowledgementsMapToClear.forEach((tip, acks) -> {
                 if (acks != null) {
-                    acks.complete(Errors.SHARE_SESSION_NOT_FOUND.exception());
+                    acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
                 }
                 // We do not know whether this is a renew ack, but handling 
the error as if it were, will ensure
                 // that we do not leave dangling acknowledgements
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index a15231512fc..1f81e356343 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -249,7 +249,10 @@ public class ShareFetch<K, V> {
     public int renew(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) {
         int recordsRenewed = 0;
         for (Map.Entry<TopicIdPartition, Acknowledgements> entry : 
acknowledgementsMap.entrySet()) {
-            recordsRenewed += 
batches.get(entry.getKey()).renew(entry.getValue());
+            ShareInFlightBatch<K, V> batch = batches.get(entry.getKey());
+            if (batch != null) {
+                recordsRenewed += batch.renew(entry.getValue());
+            }
         }
         acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs;
         return recordsRenewed;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
index 1af62e44681..8cda95de0ee 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
@@ -33,7 +33,9 @@ public class ShareRequestMetadata {
     public static final int FINAL_EPOCH = -1;
 
     /**
+     * Whether this session is a new session.
      *
+     * @return Whether the session epoch is the initial epoch.
      */
     public boolean isNewSession() {
         return epoch == INITIAL_EPOCH;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index f3b2063c136..bdad2512e18 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
-import org.apache.kafka.common.errors.ShareSessionNotFoundException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.header.Header;
@@ -369,7 +369,7 @@ public class ShareConsumeRequestManagerTest {
         assertNull(shareConsumeRequestManager.requestStates(0));
         // The callback for these unsent acknowledgements will be invoked with 
an error code.
         assertEquals(Map.of(tip0, acknowledgements2), 
completedAcknowledgements.get(0));
-        assertInstanceOf(ShareSessionNotFoundException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        assertInstanceOf(NotLeaderOrFollowerException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
 
         // Attempt a normal fetch to check if nodesWithPendingRequests is 
empty.
         assertEquals(1, sendFetches());
@@ -1490,7 +1490,7 @@ public class ShareConsumeRequestManagerTest {
         assertEquals(0, 
builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size());
 
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
-        assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
     }
 
     @Test
@@ -1526,7 +1526,7 @@ public class ShareConsumeRequestManagerTest {
 
         // We should fail any waiting acknowledgements for tip-0 as it would 
have a share session epoch equal to 0.
         assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
-        assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
     }
 
     @Test
@@ -2226,24 +2226,22 @@ public class ShareConsumeRequestManagerTest {
         // We fail the acknowledgements for records which were received from 
node0 with NOT_LEADER_OR_FOLLOWER exception.
         shareConsumeRequestManager.commitSync(commitAcks, 
calculateDeadlineMs(time.timer(100)));
 
-        // Verify if the callback was invoked with the failed acknowledgements.
-        assertEquals(1, completedAcknowledgements.get(0).size());
-        assertEquals(acknowledgementsTp0, 
completedAcknowledgements.get(0).get(tip0));
-        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
-
         // We only send acknowledgements for tip1 to node1.
         assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
 
         client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
         networkClientDelegate.poll(time.timer(0));
 
-        assertEquals(1, completedAcknowledgements.get(1).size());
-        assertEquals(acknowledgementsTp1, 
completedAcknowledgements.get(1).get(tip1));
-        
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+        // Verify if the callback was invoked with the failed 
acknowledgements. The callback is called with the commitSync processing.
+        assertEquals(2, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0, 
completedAcknowledgements.get(0).get(tip0));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        assertEquals(acknowledgementsTp1, 
completedAcknowledgements.get(0).get(tip1));
+        
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
     }
 
     @Test
-    void testLeadershipChangeAfterFetchBeforeClose() {
+    void testLeadershipChangeAfterFetchBeforeCloseMove() {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
@@ -2320,6 +2318,164 @@ public class ShareConsumeRequestManagerTest {
         
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
     }
 
+    @Test
+    void testLeadershipChangeAfterFetchMoveBeforeClose() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);
+
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+
+        Cluster startingClusterMetadata = metadata.fetch();
+        assertFalse(metadata.updateRequested());
+
+        assertEquals(2, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
+            buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
+        partitionData = buildPartitionDataMap(tip1, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
partitionRecords.get(tp0);
+        assertEquals(1, fetchedRecords.size());
+
+        fetchedRecords = partitionRecords.get(tp1);
+        assertEquals(2, fetchedRecords.size());
+
+        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+        Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
+            AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+        // Move the leadership of tp1 onto node 0
+        metadata.updatePartitionLeadership(Map.of(tp1, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), 
Optional.of(validLeaderEpoch + 1))), List.of());
+
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+        // We fail the acknowledgements for records which were received from 
node0 with NOT_LEADER_OR_FOLLOWER exception.
+        shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgementsTp0)),
+            calculateDeadlineMs(time.timer(100)));
+
+        // Verify if the callback was invoked with the failed acknowledgements.
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(), 
completedAcknowledgements.get(0).get(tip1).getAcknowledgementsTypeMap());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
+        completedAcknowledgements.clear();
+
+        // As we are closing, we still send the request to both the nodes, but 
with empty acknowledgements to node1, as it is no longer the leader.
+        assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponseFrom(fullAcknowledgeResponse(tip0, Errors.NONE), 
nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+
+        client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0, 
completedAcknowledgements.get(0).get(tip0));
+        
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+    }
+
+    @Test
+    void testLeadershipChangeAfterFetchMoveBeforeCloseMove() {
+        buildRequestManager();
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(tp0);
+        partitions.add(tp1);
+        subscriptions.assignFromSubscribed(partitions);
+
+        client.updateMetadata(
+            RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
+                tp -> validLeaderEpoch, topicIds, false));
+        Node nodeId0 = metadata.fetch().nodeById(0);
+        Node nodeId1 = metadata.fetch().nodeById(1);
+
+        Cluster startingClusterMetadata = metadata.fetch();
+        assertFalse(metadata.updateRequested());
+
+        assertEquals(2, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
+            buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
+        partitionData = buildPartitionDataMap(tip1, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+        assertTrue(partitionRecords.containsKey(tp1));
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = 
partitionRecords.get(tp0);
+        assertEquals(1, fetchedRecords.size());
+
+        fetchedRecords = partitionRecords.get(tp1);
+        assertEquals(2, fetchedRecords.size());
+
+        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+        Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
+            AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
+
+        shareConsumeRequestManager.fetch(Map.of(tip1, new 
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+        // Move the leadership of tp1 onto node 0, and tp0 onto node 1
+        metadata.updatePartitionLeadership(Map.of(tp1, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), 
Optional.of(validLeaderEpoch + 1))), List.of());
+        metadata.updatePartitionLeadership(Map.of(tp0, new 
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), 
Optional.of(validLeaderEpoch + 1))), List.of());
+
+        assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+        // We fail the acknowledgements for records which were received from 
node0 and node 1 with NOT_LEADER_OR_FOLLOWER exception.
+        shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgementsTp0)),
+            calculateDeadlineMs(time.timer(100)));
+
+        // Verify if the callback was invoked with the failed acknowledgements.
+        assertEquals(1, completedAcknowledgements.get(0).size());
+        assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+        assertEquals(1, completedAcknowledgements.get(1).size());
+        assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(), 
completedAcknowledgements.get(1).get(tip1).getAcknowledgementsTypeMap());
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), 
completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+        completedAcknowledgements.clear();
+
+        // As we are closing, we still send the request to both the nodes, but 
with empty acknowledgements to node 0 and node1, as they are no longer the 
leader.
+        assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0);
+        networkClientDelegate.poll(time.timer(0));
+
+        client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1);
+        networkClientDelegate.poll(time.timer(0));
+
+        assertTrue(completedAcknowledgements.isEmpty());
+    }
+
     @Test
     void testWhenLeadershipChangedAfterDisconnected() {
         buildRequestManager();

Reply via email to