This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61299dd01a0 KAFKA-20330: Ack handling improvement on broker restart
(#21824)
61299dd01a0 is described below
commit 61299dd01a067968d98532f92d0ed6ed0cbc4817
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 30 12:50:31 2026 +0100
KAFKA-20330: Ack handling improvement on broker restart (#21824)
A share consumer uses a share session to keep track of its acquired
records with each share-partition leader it is talking to. When the
connection breaks, the share session is lost and acknowledgements fail.
When the connection to a share-partition leader failed while there was
an outstanding request, the share consumer noticed the disconnection and
failed the acknowledgements as expected. Also, if the leader changed to
a different broker, again the share consumer noticed the leadership
change and failed the acknowledgements as expected.
In the situation where the share-partition leadership did not change
when the broker restarted AND there was no in-flight request, the share
consumer did not notice the disconnection and would try to continue the
share session. There were also a few situations in which
acknowledgements could be lost and not notified to the acknowledgement
commit callback to do with this kind of leadership non-transition.
This PR improves the situation by using a consistent exception
`NotLeaderOrFollowerException` regardless of when the disconnection was
noticed. It also makes sure that acknowledgements which cannot be sent
are completed properly in all cases.
It is possible that a tweak to the protocol will be needed to eliminate
`ShareSessionNotFoundException` in all edge cases, but that would take a
KIP.
---
.../kafka/clients/consumer/ShareConsumerTest.java | 141 +++++++++
.../internals/ShareConsumeRequestManager.java | 320 ++++++++++++---------
.../clients/consumer/internals/ShareFetch.java | 5 +-
.../common/requests/ShareRequestMetadata.java | 2 +
.../internals/ShareConsumeRequestManagerTest.java | 182 +++++++++++-
5 files changed, 498 insertions(+), 152 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 89ea1b38c50..34d072072ba 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -48,8 +48,10 @@ import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
@@ -1301,6 +1303,145 @@ public class ShareConsumerTest {
"Consumer close should not wait for full timeout when broker is
already shut down");
}
+ @ClusterTest
+ public void
testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementSync() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))) {
+
+ AtomicBoolean callbackCalled = new AtomicBoolean(false);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) -> {
+ assertInstanceOf(NotLeaderOrFollowerException.class,
exception);
+ callbackCalled.set(true);
+ });
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ producer.send(record);
+ producer.flush();
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(20000));
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> consumerRecord =
records.iterator().next();
+
+ // Shutdown the broker
+ assertEquals(1, cluster.brokers().size());
+ KafkaBroker broker = cluster.brokers().get(0);
+ cluster.shutdownBroker(0);
+
+ broker.awaitShutdown();
+
+ // Restart the broker
+ cluster.startBroker(0);
+
+ shareConsumer.acknowledge(consumerRecord);
+ Map<TopicIdPartition, Optional<KafkaException>> commitResult =
shareConsumer.commitSync(Duration.ofMillis(30000));
+ assertEquals(1, commitResult.size());
+ TopicIdPartition tidp = commitResult.keySet().iterator().next();
+ assertTrue(commitResult.get(tidp).isPresent());
+ assertInstanceOf(NotLeaderOrFollowerException.class,
commitResult.get(tidp).get());
+
+ assertTrue(callbackCalled.get());
+ }
+ }
+
+ @ClusterTest
+ public void
testLeaderRestartWithoutLeadershipChangeExplicitAcknowledgementAsync() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))) {
+
+ AtomicBoolean callbackCalled = new AtomicBoolean(false);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) -> {
+ assertInstanceOf(NotLeaderOrFollowerException.class,
exception);
+ callbackCalled.set(true);
+ });
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ producer.send(record);
+ producer.flush();
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(20000));
+ assertEquals(1, records.count());
+ ConsumerRecord<byte[], byte[]> consumerRecord =
records.iterator().next();
+
+ // Shutdown the broker
+ assertEquals(1, cluster.brokers().size());
+ KafkaBroker broker = cluster.brokers().get(0);
+ cluster.shutdownBroker(0);
+
+ broker.awaitShutdown();
+
+ // Restart the broker
+ cluster.startBroker(0);
+
+ shareConsumer.acknowledge(consumerRecord);
+ shareConsumer.commitAsync();
+
+ int maxRetries = 15;
+ int retries = 0;
+ while (retries < maxRetries) {
+ shareConsumer.poll(Duration.ofMillis(2000));
+ if (callbackCalled.get()) {
+ break;
+ }
+ retries++;
+ }
+
+ assertTrue(callbackCalled.get());
+ }
+ }
+
+ @ClusterTest
+ public void
testLeaderRestartWithoutLeadershipChangeImplicitAcknowledgement() {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
+ AtomicBoolean callbackCalled = new AtomicBoolean(false);
+
shareConsumer.setAcknowledgementCommitCallback((offsetsByTopicPartition,
exception) -> {
+ assertInstanceOf(ShareSessionNotFoundException.class,
exception);
+ callbackCalled.set(true);
+ });
+
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ producer.send(record);
+ producer.flush();
+
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(20000));
+ assertEquals(1, records.count());
+
+ // Shutdown the broker
+ assertEquals(1, cluster.brokers().size());
+ KafkaBroker broker = cluster.brokers().get(0);
+ cluster.shutdownBroker(0);
+
+ broker.awaitShutdown();
+
+ // Restart the broker
+ cluster.startBroker(0);
+
+ int maxRetries = 15;
+ int retries = 0;
+ while (retries < maxRetries) {
+ shareConsumer.poll(Duration.ofMillis(2000));
+ if (callbackCalled.get()) {
+ break;
+ }
+ retries++;
+ }
+
+ assertTrue(callbackCalled.get());
+ }
+ }
+
@ClusterTests({
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index dd1b829bcb3..104199a9d3b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -255,7 +255,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
// For record_limit mode, we only send a full ShareFetch to a
single node at a time.
// We prepare to build ShareFetch requests for all nodes with
session handlers to permit
- // piggy-backing of acknowledgements, and also to adjust the
topic-partitions
+ // piggybacking of acknowledgements, and also to adjust the
topic-partitions
// in the share session, but if the request would contain neither
of those, it can be skipped.
boolean canSkipIfRequestEmpty = isShareAcquireModeRecordLimit() &&
target.id() != fetchRecordsNodeId.get();
@@ -288,9 +288,11 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
/**
* Add acknowledgements for a topic-partition to the node's in-flight
acknowledgements.
+ * If we cannot add acknowledgements, they are completed with {@link
Errors#NOT_LEADER_OR_FOLLOWER} exception.
+ * This probably indicates the connection to the leader broker was lost,
but then re-established without a
+ * leadership change, in which case the acknowledgements fail.
*
* @return True if we can add acknowledgements to the share session.
- * If we cannot add acknowledgements, they are completed with {@link
Errors#INVALID_SHARE_SESSION_EPOCH} exception.
*/
private boolean maybeAddAcknowledgements(ShareSessionHandler handler,
Node node,
@@ -299,7 +301,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
if (handler.isNewSession()) {
// Failing the acknowledgements as we cannot have piggybacked
acknowledgements in the initial ShareFetchRequest.
log.debug("Cannot send acknowledgements on initial epoch for
ShareSession for partition {}", tip);
-
acknowledgements.complete(Errors.INVALID_SHARE_SESSION_EPOCH.exception());
+
acknowledgements.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
maybeSendShareAcknowledgementEvent(Map.of(tip, acknowledgements),
true, Optional.empty());
return false;
} else {
@@ -540,49 +542,75 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
public CompletableFuture<Map<TopicIdPartition, Acknowledgements>>
commitSync(
final Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap,
final long deadlineMs) {
+ final Cluster cluster = metadata.fetch();
final AtomicInteger resultCount = new AtomicInteger();
final CompletableFuture<Map<TopicIdPartition, Acknowledgements>>
future = new CompletableFuture<>();
final ResultHandler resultHandler = new ResultHandler(resultCount,
Optional.of(future));
- final Cluster cluster = metadata.fetch();
+ Map<Integer, Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapAllNodes = new HashMap<>();
+ Map<TopicIdPartition, Acknowledgements> acknowledgementsMapCannotSend
= new HashMap<>();
+ acknowledgementsMap.forEach((tip, nodeAcks) -> {
+ if ((cluster.nodeById(nodeAcks.nodeId()) == null) ||
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+ Acknowledgements prevAcks =
acknowledgementsMapCannotSend.putIfAbsent(tip, nodeAcks.acknowledgements());
+ if (prevAcks != null) {
+ prevAcks.merge(nodeAcks.acknowledgements());
+ }
+ } else {
+ Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new
HashMap<>());
+ Acknowledgements prevAcks = acksMap.putIfAbsent(tip,
nodeAcks.acknowledgements());
+ if (prevAcks != null) {
+ prevAcks.merge(nodeAcks.acknowledgements());
+ }
+ }
+ });
+
+ resultCount.addAndGet(acknowledgementsMapCannotSend.size());
sessionHandlers.forEach((nodeId, sessionHandler) -> {
- Node node = cluster.nodeById(nodeId);
- if (node != null) {
- acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
-
- // Add the incoming commitSync() request to the queue.
- Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = new HashMap<>();
- for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
- NodeAcknowledgements nodeAcknowledgements =
acknowledgementsMap.get(tip);
- if ((nodeAcknowledgements != null) &&
(nodeAcknowledgements.nodeId() == node.id())) {
- if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
- acknowledgementsMapForNode.put(tip,
nodeAcknowledgements.acknowledgements());
-
-
metricsManager.recordAcknowledgementSent(nodeAcknowledgements.acknowledgements().size());
- log.debug("Added sync acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- resultCount.incrementAndGet();
- } else {
-
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
- maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
- }
- }
- }
+ Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements =
acknowledgementsMapAllNodes.get(nodeId);
+ if (nodeAcknowledgements == null)
+ return;
- if (!acknowledgementsMapForNode.isEmpty()) {
- acknowledgeRequestStates.get(nodeId).addSyncRequest(new
AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":1",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- resultHandler,
- AcknowledgeRequestType.COMMIT_SYNC
- ));
+ Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToSend
= new HashMap<>();
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
+
+ // Add the incoming commitSync() request to the queue.
+ for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
+ Acknowledgements acknowledgements =
nodeAcknowledgements.remove(tip);
+ if (acknowledgements != null) {
+ acknowledgementsMapToSend.put(tip, acknowledgements);
+ resultCount.incrementAndGet();
+
+
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+ log.debug("Added sync acknowledge request for partition {}
to node {}", tip.topicPartition(), nodeId);
}
}
+
+ resultCount.addAndGet(nodeAcknowledgements.size());
+ nodeAcknowledgements.forEach((tip, acks) -> {
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ resultHandler.complete(tip, acks,
AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty());
+ });
+
+ if (!acknowledgementsMapToSend.isEmpty()) {
+ acknowledgeRequestStates.get(nodeId).addSyncRequest(new
AcknowledgeRequestState(logContext,
+ ShareConsumeRequestManager.class.getSimpleName() + ":1",
+ deadlineMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ sessionHandler,
+ nodeId,
+ acknowledgementsMapToSend,
+ resultHandler,
+ AcknowledgeRequestType.COMMIT_SYNC
+ ));
+ }
+ });
+
+ acknowledgementsMapCannotSend.forEach((tip, acks) -> {
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ resultHandler.complete(tip, acks,
AcknowledgeRequestType.COMMIT_SYNC, true, Optional.empty());
});
resultHandler.completeIfEmpty();
@@ -602,48 +630,63 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
final Cluster cluster = metadata.fetch();
final ResultHandler resultHandler = new
ResultHandler(Optional.empty());
+ Map<Integer, Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapAllNodes = new HashMap<>();
+ acknowledgementsMap.forEach((tip, nodeAcks) -> {
+ if ((cluster.nodeById(nodeAcks.nodeId()) == null) ||
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+ log.debug("Leader for the partition is down or has changed,
failing acknowledgements for partition {}", tip);
+
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcks.acknowledgements()), true, Optional.empty());
+ } else {
+ Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new
HashMap<>());
+ Acknowledgements prevAcks = acksMap.putIfAbsent(tip,
nodeAcks.acknowledgements());
+ if (prevAcks != null) {
+ prevAcks.merge(nodeAcks.acknowledgements());
+ }
+ }
+ });
+
sessionHandlers.forEach((nodeId, sessionHandler) -> {
- Node node = cluster.nodeById(nodeId);
- if (node != null) {
- Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = new HashMap<>();
-
- acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
-
- for (TopicIdPartition tip :
sessionHandler.sessionPartitions()) {
- NodeAcknowledgements nodeAcknowledgements =
acknowledgementsMap.get(tip);
- if ((nodeAcknowledgements != null) &&
(nodeAcknowledgements.nodeId() == node.id())) {
- if (!isLeaderKnownToHaveChanged(node.id(), tip)) {
- Acknowledgements acknowledgements =
nodeAcknowledgements.acknowledgements();
- acknowledgementsMapForNode.put(tip,
acknowledgements);
-
-
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- AcknowledgeRequestState asyncRequestState =
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
- if (asyncRequestState == null) {
-
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new
AcknowledgeRequestState(logContext,
-
ShareConsumeRequestManager.class.getSimpleName() + ":2",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- resultHandler,
- AcknowledgeRequestType.COMMIT_ASYNC
- ));
- } else {
- Acknowledgements prevAcks =
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
- if (prevAcks != null) {
-
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
- }
- }
- } else {
-
nodeAcknowledgements.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
- maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcknowledgements.acknowledgements()), true, Optional.empty());
+ Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements =
acknowledgementsMapAllNodes.get(nodeId);
+ if (nodeAcknowledgements == null)
+ return;
+
+ Map<TopicIdPartition, Acknowledgements> acknowledgementsMapForNode
= new HashMap<>();
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
+
+ for (TopicIdPartition tip : sessionHandler.sessionPartitions()) {
+ Acknowledgements acknowledgements =
nodeAcknowledgements.remove(tip);
+ if (acknowledgements != null) {
+ acknowledgementsMapForNode.put(tip, acknowledgements);
+
+
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+ log.debug("Added async acknowledge request for partition
{} to node {}", tip.topicPartition(), nodeId);
+ AcknowledgeRequestState asyncRequestState =
acknowledgeRequestStates.get(nodeId).getAsyncRequest();
+ if (asyncRequestState == null) {
+
acknowledgeRequestStates.get(nodeId).setAsyncRequest(new
AcknowledgeRequestState(logContext,
+ ShareConsumeRequestManager.class.getSimpleName() +
":2",
+ deadlineMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ sessionHandler,
+ nodeId,
+ acknowledgementsMapForNode,
+ resultHandler,
+ AcknowledgeRequestType.COMMIT_ASYNC
+ ));
+ } else {
+ Acknowledgements prevAcks =
asyncRequestState.acknowledgementsToSend.putIfAbsent(tip, acknowledgements);
+ if (prevAcks != null) {
+
asyncRequestState.acknowledgementsToSend.get(tip).merge(acknowledgements);
}
}
}
}
+
+ nodeAcknowledgements.forEach((tip, acks) -> {
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ maybeSendShareAcknowledgementEvent(Map.of(tip, acks), true,
Optional.empty());
+ });
});
resultHandler.completeIfEmpty();
@@ -659,82 +702,83 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
*
* @return The future which completes when the acknowledgements finished
*/
- public CompletableFuture<Void> acknowledgeOnClose(final
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap,
- final long deadlineMs) {
+ public CompletableFuture<Void> acknowledgeOnClose(
+ final Map<TopicIdPartition, NodeAcknowledgements>
acknowledgementsMap,
+ final long deadlineMs) {
final Cluster cluster = metadata.fetch();
final AtomicInteger resultCount = new AtomicInteger();
final ResultHandler resultHandler = new ResultHandler(resultCount,
Optional.empty());
closing = true;
- Map<Integer, Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapAllNodes = new HashMap<>();
+ Map<Integer, Map<TopicIdPartition, Acknowledgements>>
acknowledgementsMapAllNodes = new HashMap<>();
acknowledgementsMap.forEach((tip, nodeAcks) -> {
- if (!isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+ if ((cluster.nodeById(nodeAcks.nodeId()) == null) ||
isLeaderKnownToHaveChanged(nodeAcks.nodeId(), tip)) {
+
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcks.acknowledgements()), true, Optional.empty());
+ } else {
Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeAcks.nodeId(), k -> new
HashMap<>());
Acknowledgements prevAcks = acksMap.putIfAbsent(tip,
nodeAcks.acknowledgements());
if (prevAcks != null) {
- acksMap.get(tip).merge(nodeAcks.acknowledgements());
+ prevAcks.merge(nodeAcks.acknowledgements());
}
- } else {
-
nodeAcks.acknowledgements().complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
- maybeSendShareAcknowledgementEvent(Map.of(tip,
nodeAcks.acknowledgements()), true, Optional.empty());
}
});
- sessionHandlers.forEach((nodeId, sessionHandler) -> {
- Node node = cluster.nodeById(nodeId);
- if (node != null) {
- //Add any waiting piggyback acknowledgements for the node.
- Map<TopicIdPartition, Acknowledgements> fetchAcks =
fetchAcknowledgementsToSend.remove(nodeId);
- if (fetchAcks != null) {
- fetchAcks.forEach((tip, acks) -> {
- if (!isLeaderKnownToHaveChanged(nodeId, tip)) {
- Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
- Acknowledgements prevAcks =
acksMap.putIfAbsent(tip, acks);
- if (prevAcks != null) {
- acksMap.get(tip).merge(acks);
- }
- } else {
-
acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
- maybeSendShareAcknowledgementEvent(Map.of(tip,
acks), true, Optional.empty());
- }
- });
- }
-
- Map<TopicIdPartition, Acknowledgements>
acknowledgementsMapForNode = acknowledgementsMapAllNodes.get(nodeId);
- if (acknowledgementsMapForNode != null) {
- acknowledgementsMapForNode.forEach((tip, acknowledgements)
-> {
-
metricsManager.recordAcknowledgementSent(acknowledgements.size());
- log.debug("Added closing acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- resultCount.incrementAndGet();
- });
+ // Add any waiting piggyback acknowledgements.
+ fetchAcknowledgementsToSend.forEach((nodeId, nodeAcks) ->
+ nodeAcks.forEach((tip, acks) -> {
+ if ((cluster.nodeById(nodeId) == null) ||
isLeaderKnownToHaveChanged(nodeId, tip)) {
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ maybeSendShareAcknowledgementEvent(Map.of(tip, acks),
true, Optional.empty());
} else {
- acknowledgementsMapForNode = new HashMap<>();
+ Map<TopicIdPartition, Acknowledgements> acksMap =
acknowledgementsMapAllNodes.computeIfAbsent(nodeId, k -> new HashMap<>());
+ Acknowledgements prevAcks = acksMap.putIfAbsent(tip, acks);
+ if (prevAcks != null) {
+ prevAcks.merge(acks);
+ }
}
+ })
+ );
- acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
+ sessionHandlers.forEach((nodeId, sessionHandler) -> {
+ Map<TopicIdPartition, Acknowledgements> nodeAcknowledgements =
acknowledgementsMapAllNodes.get(nodeId);
+ if (nodeAcknowledgements != null) {
+ nodeAcknowledgements.forEach((tip, acknowledgements) -> {
+ resultCount.incrementAndGet();
- // Ensure there is no close() request already present as they
are blocking calls
- // and only one request can be active at a time.
- if (acknowledgeRequestStates.get(nodeId).getCloseRequest() !=
null &&
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
{
- log.error("Attempt to call close() when there is an
existing close request for node {}-{}", node.id(),
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
- closeFuture.completeExceptionally(
- new IllegalStateException("Attempt to call close()
when there is an existing close request for node : " + node.id()));
- } else {
- // There can only be one close() happening at a time. So
per node, there will be one acknowledge request state.
- acknowledgeRequestStates.get(nodeId).setCloseRequest(
- new AcknowledgeRequestState(logContext,
- ShareConsumeRequestManager.class.getSimpleName() +
":3",
- deadlineMs,
- retryBackoffMs,
- retryBackoffMaxMs,
- sessionHandler,
- nodeId,
- acknowledgementsMapForNode,
- resultHandler,
- AcknowledgeRequestType.CLOSE
- ));
- }
+
metricsManager.recordAcknowledgementSent(acknowledgements.size());
+ log.debug("Added closing acknowledge request for partition
{} to node {}", tip.topicPartition(), nodeId);
+ });
+ } else {
+ nodeAcknowledgements = new HashMap<>();
+ }
+
+ acknowledgeRequestStates.putIfAbsent(nodeId, new Tuple<>(null,
null, null));
+
+ // Ensure there is no close() request already present as they are
blocking calls
+ // and only one request can be active at a time.
+ if (acknowledgeRequestStates.get(nodeId).getCloseRequest() != null
&&
isRequestStateInProgress(acknowledgeRequestStates.get(nodeId).getCloseRequest()))
{
+ log.error("Attempt to call close() when there is an existing
close request for node {}-{}", nodeId,
acknowledgeRequestStates.get(nodeId).getSyncRequestQueue());
+ nodeAcknowledgements.forEach((tip, acks) -> {
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
+ maybeSendShareAcknowledgementEvent(Map.of(tip, acks),
true, Optional.empty());
+ });
+ closeFuture.completeExceptionally(new
IllegalStateException("Attempt to call close() when there is an existing close
request for node " + nodeId));
+ } else {
+ // There can only be one close() happening at a time. So per
node, there will be one acknowledge request state.
+ acknowledgeRequestStates.get(nodeId).setCloseRequest(
+ new AcknowledgeRequestState(logContext,
+ ShareConsumeRequestManager.class.getSimpleName() +
":3",
+ deadlineMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ sessionHandler,
+ nodeId,
+ nodeAcknowledgements,
+ resultHandler,
+ AcknowledgeRequestType.CLOSE
+ ));
}
});
@@ -744,6 +788,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
/**
* The method checks whether the leader for a topicIdPartition has changed.
+ *
* @param nodeId The previous leader for the partition.
* @param topicIdPartition The TopicIdPartition to check.
* @return Returns true if leader information is available and leader has
changed.
@@ -1243,7 +1288,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Node nodeToSend = metadata.fetch().nodeById(nodeId);
if (requestBuilder == null) {
- handleAcknowledgeShareSessionNotFound();
+ handleNewShareSessionNotLeaderOrFollower();
return null;
} else if (nodeToSend != null) {
nodesWithPendingRequests.add(nodeId);
@@ -1344,17 +1389,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
}
/**
- * Set the error code for all remaining acknowledgements in the event
- * of a share session not found error which prevents the remaining
acknowledgements from
- * being sent.
+ * Set the error code for all remaining acknowledgements in the event
that a new share session
+ * needs to be started which prevents the remaining acknowledgements
from being sent.
*/
- void handleAcknowledgeShareSessionNotFound() {
+ void handleNewShareSessionNotLeaderOrFollower() {
Map<TopicIdPartition, Acknowledgements> acknowledgementsMapToClear
=
incompleteAcknowledgements.isEmpty() ? acknowledgementsToSend
: incompleteAcknowledgements;
acknowledgementsMapToClear.forEach((tip, acks) -> {
if (acks != null) {
- acks.complete(Errors.SHARE_SESSION_NOT_FOUND.exception());
+ acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception());
}
// We do not know whether this is a renew ack, but handling
the error as if it were, will ensure
// that we do not leave dangling acknowledgements
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index a15231512fc..1f81e356343 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -249,7 +249,10 @@ public class ShareFetch<K, V> {
public int renew(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap, Optional<Integer> acquisitionLockTimeoutMs) {
int recordsRenewed = 0;
for (Map.Entry<TopicIdPartition, Acknowledgements> entry :
acknowledgementsMap.entrySet()) {
- recordsRenewed +=
batches.get(entry.getKey()).renew(entry.getValue());
+ ShareInFlightBatch<K, V> batch = batches.get(entry.getKey());
+ if (batch != null) {
+ recordsRenewed += batch.renew(entry.getValue());
+ }
}
acquisitionLockTimeoutMsRenewed = acquisitionLockTimeoutMs;
return recordsRenewed;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
index 1af62e44681..8cda95de0ee 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareRequestMetadata.java
@@ -33,7 +33,9 @@ public class ShareRequestMetadata {
public static final int FINAL_EPOCH = -1;
/**
+ * Whether this session is a new session.
*
+ * @return Whether the session epoch is the initial epoch.
*/
public boolean isNewSession() {
return epoch == INITIAL_EPOCH;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index f3b2063c136..bdad2512e18 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
-import org.apache.kafka.common.errors.ShareSessionNotFoundException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Header;
@@ -369,7 +369,7 @@ public class ShareConsumeRequestManagerTest {
assertNull(shareConsumeRequestManager.requestStates(0));
// The callback for these unsent acknowledgements will be invoked with
an error code.
assertEquals(Map.of(tip0, acknowledgements2),
completedAcknowledgements.get(0));
- assertInstanceOf(ShareSessionNotFoundException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ assertInstanceOf(NotLeaderOrFollowerException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
// Attempt a normal fetch to check if nodesWithPendingRequests is
empty.
assertEquals(1, sendFetches());
@@ -1490,7 +1490,7 @@ public class ShareConsumeRequestManagerTest {
assertEquals(0,
builder.data().topics().find(tip0.topicId()).partitions().find(0).acknowledgementBatches().size());
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
- assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
@Test
@@ -1526,7 +1526,7 @@ public class ShareConsumeRequestManagerTest {
// We should fail any waiting acknowledgements for tip-0 as it would
have a share session epoch equal to 0.
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
- assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
}
@Test
@@ -2226,24 +2226,22 @@ public class ShareConsumeRequestManagerTest {
// We fail the acknowledgements for records which were received from
node0 with NOT_LEADER_OR_FOLLOWER exception.
shareConsumeRequestManager.commitSync(commitAcks,
calculateDeadlineMs(time.timer(100)));
- // Verify if the callback was invoked with the failed acknowledgements.
- assertEquals(1, completedAcknowledgements.get(0).size());
- assertEquals(acknowledgementsTp0,
completedAcknowledgements.get(0).get(tip0));
- assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
-
// We only send acknowledgements for tip1 to node1.
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
- assertEquals(1, completedAcknowledgements.get(1).size());
- assertEquals(acknowledgementsTp1,
completedAcknowledgements.get(1).get(tip1));
-
assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+ // Verify if the callback was invoked with the failed
acknowledgements. The callback is called with the commitSync processing.
+ assertEquals(2, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0,
completedAcknowledgements.get(0).get(tip0));
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ assertEquals(acknowledgementsTp1,
completedAcknowledgements.get(0).get(tip1));
+
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
}
@Test
- void testLeadershipChangeAfterFetchBeforeClose() {
+ void testLeadershipChangeAfterFetchBeforeCloseMove() {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
@@ -2320,6 +2318,164 @@ public class ShareConsumeRequestManagerTest {
assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
}
+ @Test
+ void testLeadershipChangeAfterFetchMoveBeforeClose() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
+ tp -> validLeaderEpoch, topicIds, false));
+ Node nodeId0 = metadata.fetch().nodeById(0);
+ Node nodeId1 = metadata.fetch().nodeById(1);
+
+ Cluster startingClusterMetadata = metadata.fetch();
+ assertFalse(metadata.updateRequested());
+
+ assertEquals(2, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
+ buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
+ partitionData = buildPartitionDataMap(tip1, records,
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+ assertTrue(partitionRecords.containsKey(tp1));
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords =
partitionRecords.get(tp0);
+ assertEquals(1, fetchedRecords.size());
+
+ fetchedRecords = partitionRecords.get(tp1);
+ assertEquals(2, fetchedRecords.size());
+
+ Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+ acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+ Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
+ AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
+
+ shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+ // Move the leadership of tp1 onto node 0
+ metadata.updatePartitionLeadership(Map.of(tp1, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()),
Optional.of(validLeaderEpoch + 1))), List.of());
+
+ assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+ // We fail the acknowledgements for records which were received from
node0 with NOT_LEADER_OR_FOLLOWER exception.
+ shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgementsTp0)),
+ calculateDeadlineMs(time.timer(100)));
+
+ // Verify if the callback was invoked with the failed acknowledgements.
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(),
completedAcknowledgements.get(0).get(tip1).getAcknowledgementsTypeMap());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip1).getAcknowledgeException());
+ completedAcknowledgements.clear();
+
+ // As we are closing, we still send the request to both the nodes, but
with empty acknowledgements to node1, as it is no longer the leader.
+ assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponseFrom(fullAcknowledgeResponse(tip0, Errors.NONE),
nodeId0);
+ networkClientDelegate.poll(time.timer(0));
+
+ client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0,
completedAcknowledgements.get(0).get(tip0));
+
assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ }
+
+ @Test
+ void testLeadershipChangeAfterFetchMoveBeforeCloseMove() {
+ buildRequestManager();
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ Set<TopicPartition> partitions = new HashSet<>();
+ partitions.add(tp0);
+ partitions.add(tp1);
+ subscriptions.assignFromSubscribed(partitions);
+
+ client.updateMetadata(
+ RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
+ tp -> validLeaderEpoch, topicIds, false));
+ Node nodeId0 = metadata.fetch().nodeById(0);
+ Node nodeId1 = metadata.fetch().nodeById(1);
+
+ Cluster startingClusterMetadata = metadata.fetch();
+ assertFalse(metadata.updateRequested());
+
+ assertEquals(2, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
+ buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
+ partitionData = buildPartitionDataMap(tip1, records,
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
+ assertTrue(partitionRecords.containsKey(tp0));
+ assertTrue(partitionRecords.containsKey(tp1));
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords =
partitionRecords.get(tp0);
+ assertEquals(1, fetchedRecords.size());
+
+ fetchedRecords = partitionRecords.get(tp1);
+ assertEquals(2, fetchedRecords.size());
+
+ Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
+ acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
+
+ Acknowledgements acknowledgementsTp1 = getAcknowledgements(1,
+ AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
+
+ shareConsumeRequestManager.fetch(Map.of(tip1, new
NodeAcknowledgements(1, acknowledgementsTp1)));
+
+ // Move the leadership of tp1 onto node 0, and tp0 onto node 1
+ metadata.updatePartitionLeadership(Map.of(tp1, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()),
Optional.of(validLeaderEpoch + 1))), List.of());
+ metadata.updatePartitionLeadership(Map.of(tp0, new
Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()),
Optional.of(validLeaderEpoch + 1))), List.of());
+
+ assertNotEquals(startingClusterMetadata, metadata.fetch());
+
+ // We fail the acknowledgements for records which were received from
node0 and node 1 with NOT_LEADER_OR_FOLLOWER exception.
+ shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgementsTp0)),
+ calculateDeadlineMs(time.timer(100)));
+
+ // Verify if the callback was invoked with the failed acknowledgements.
+ assertEquals(1, completedAcknowledgements.get(0).size());
+ assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
+ assertEquals(1, completedAcknowledgements.get(1).size());
+ assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(),
completedAcknowledgements.get(1).get(tip1).getAcknowledgementsTypeMap());
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(),
completedAcknowledgements.get(1).get(tip1).getAcknowledgeException());
+ completedAcknowledgements.clear();
+
+ // As we are closing, we still send the request to both the nodes, but
with empty acknowledgements to node 0 and node1, as they are no longer the
leader.
+ assertEquals(2, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0);
+ networkClientDelegate.poll(time.timer(0));
+
+ client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1);
+ networkClientDelegate.poll(time.timer(0));
+
+ assertTrue(completedAcknowledgements.isEmpty());
+ }
+
@Test
void testWhenLeadershipChangedAfterDisconnected() {
buildRequestManager();