This is an automated email from the ASF dual-hosted git repository.

AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27689488232 KAFKA-20575: Add onPartitionsLost support for MockConsume 
(#22273)
27689488232 is described below

commit 2768948823287b81fba91d7c071caf2d4839fe4f
Author: Aditya Kousik <[email protected]>
AuthorDate: Fri May 15 02:40:18 2026 -0700

    KAFKA-20575: Add onPartitionsLost support for MockConsume (#22273)
    
    MockConsumer.rebalance() only supports graceful path with
    onPartitionsRevoked. There is no way to test partitions lost due to
    session timeout/failed heartbeat etc
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/MockConsumer.java       |  28 ++++++
 .../kafka/clients/consumer/MockConsumerTest.java   | 105 +++++++++++++++++++++
 2 files changed, 133 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9702cd24220..2a3e0cdc53d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -142,6 +142,34 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         this.subscriptions.rebalanceListener().ifPresent(crl -> 
crl.onPartitionsAssigned(added));
     }
 
+    /**
+     * Simulates a partition loss event. Calls {@link 
ConsumerRebalanceListener#onPartitionsLost}
+     * for the specified partitions and removes them from the current 
assignment. Unlike
+     * {@link #rebalance(Collection)}, which calls {@link 
ConsumerRebalanceListener#onPartitionsRevoked},
+     * this method models the case where the consumer loses partitions without 
a graceful revoke..
+     *
+     * <p>Only records belonging to the lost partitions are cleared; records 
for retained
+     * partitions are unaffected.
+     *
+     * @param partitionsLost the partitions to lose; all must be currently 
assigned
+     * @throws IllegalStateException if any partition is not currently assigned
+     */
+    public synchronized void losePartitions(Collection<TopicPartition> 
partitionsLost) {
+        Set<TopicPartition> currentAssignment = 
this.subscriptions.assignedPartitions();
+        Set<TopicPartition> lost = new HashSet<>(partitionsLost);
+        List<TopicPartition> notAssigned = lost.stream()
+            .filter(tp -> !currentAssignment.contains(tp))
+            .collect(Collectors.toList());
+        if (!notAssigned.isEmpty())
+            throw new IllegalStateException("Cannot lose partitions that are 
not currently assigned: " + notAssigned);
+        lost.forEach(records::remove);
+        this.subscriptions.rebalanceListener().ifPresent(crl -> 
crl.onPartitionsLost(lost));
+        Set<TopicPartition> remaining = currentAssignment.stream()
+            .filter(tp -> !lost.contains(tp))
+            .collect(Collectors.toSet());
+        this.subscriptions.assignFromSubscribed(remaining);
+    }
+
     @Override
     public synchronized Set<String> subscription() {
         return this.subscriptions.subscription();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6968b45a57b..d3fa859cf9f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -230,4 +231,108 @@ public class MockConsumerTest {
         assertTrue(records.isEmpty());
     }
 
+    @Test
+    public void testLosePartitionsCallsOnPartitionsLost() {
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        TopicPartition tp1 = new TopicPartition("test", 1);
+        List<TopicPartition> assigned = List.of(tp0, tp1);
+
+        List<TopicPartition> lost = new ArrayList<>();
+        List<TopicPartition> revoked = new ArrayList<>();
+        consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
+                revoked.addAll(partitions);
+            }
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {}
+            @Override
+            public void onPartitionsLost(Collection<TopicPartition> 
partitions) {
+                lost.addAll(partitions);
+            }
+        });
+
+        consumer.rebalance(assigned);
+        consumer.losePartitions(List.of(tp0));
+
+        assertEquals(List.of(tp0), lost);
+        assertTrue(revoked.isEmpty());
+    }
+
+    @Test
+    public void testLosePartitionsRemovesFromAssignment() {
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        TopicPartition tp1 = new TopicPartition("test", 1);
+
+        consumer.subscribe(List.of("test"));
+        consumer.rebalance(List.of(tp0, tp1));
+        consumer.losePartitions(List.of(tp0));
+
+        assertFalse(consumer.assignment().contains(tp0));
+        assertTrue(consumer.assignment().contains(tp1));
+    }
+
+    @Test
+    public void testLosePartitionsThrowsIfNotAssigned() {
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        TopicPartition tp1 = new TopicPartition("test", 1);
+
+        consumer.subscribe(List.of("test"));
+        consumer.rebalance(List.of(tp0));
+
+        assertThrows(IllegalStateException.class,
+                () -> consumer.losePartitions(List.of(tp1)));
+    }
+
+    @Test
+    public void testLosePartitionsClearsOnlyLostRecords() {
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        TopicPartition tp1 = new TopicPartition("test", 1);
+
+        consumer.subscribe(List.of("test"));
+        consumer.rebalance(List.of(tp0, tp1));
+        consumer.updateBeginningOffsets(new HashMap<>() {{
+                put(tp0, 0L);
+                put(tp1, 0L);
+            }});
+        consumer.seek(tp0, 0);
+        consumer.seek(tp1, 0);
+
+        consumer.addRecord(new ConsumerRecord<>("test", 0, 0, null, null));
+        consumer.addRecord(new ConsumerRecord<>("test", 1, 0, null, null));
+
+        consumer.losePartitions(List.of(tp0));
+
+        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(1));
+        assertEquals(1, records.count());
+
+        var record = records.iterator().next();
+        assertEquals(tp1, new TopicPartition(record.topic(), 
record.partition()));
+    }
+
+    @Test
+    public void testLosePartitionsThenRebalance() {
+        TopicPartition tp0 = new TopicPartition("test", 0);
+        TopicPartition tp1 = new TopicPartition("test", 1);
+        TopicPartition tp2 = new TopicPartition("test", 2);
+
+        List<TopicPartition> assigned = new ArrayList<>();
+        consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {}
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
+                assigned.addAll(partitions);
+            }
+        });
+
+        consumer.rebalance(List.of(tp0, tp1));
+        assigned.clear();
+
+        consumer.losePartitions(List.of(tp0));
+        consumer.rebalance(Arrays.asList(tp1, tp2));
+
+        assertEquals(List.of(tp2), assigned);
+        assertEquals(Set.of(tp1, tp2), consumer.assignment());
+    }
 }

Reply via email to