This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b96273ba00 KAFKA-19345: Use ShareFetchUtils mock for 
DelayedShareFetchTest tests (#20765)
6b96273ba00 is described below

commit 6b96273ba007562fc1eb3af6bcc33abc92782b3d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 29 19:03:18 2025 +0800

    KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests 
(#20765)
    
    This patch wraps ShareFetchUtils static method `processFetchResponse`
    with MockedStatic  to improve test isolation and also fixes some
    incorrect test results.
    
    Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java |   1 -
 .../kafka/server/share/DelayedShareFetchTest.java  | 290 +++++++++++----------
 2 files changed, 154 insertions(+), 137 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 7b061a28bd5..bc8146910ad 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -526,7 +526,6 @@ public class DelayedShareFetch extends DelayedOperation {
             return offsetSnapshot.highWatermark();
         else
             return offsetSnapshot.lastStableOffset();
-
     }
 
     private LinkedHashMap<TopicIdPartition, LogReadResult> 
readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 9585714deb8..0f7a82f9e15 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.server.util.timer.SystemTimer;
 import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -88,7 +87,6 @@ import static 
kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_P
 import static 
kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS;
 import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
 import static 
kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
-import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -98,11 +96,11 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
@@ -212,17 +210,14 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
-        // the fetchOffsetMetadata will return empty. Post the readFromLog 
call, the fetchOffsetMetadata will be
+        // the fetchOffsetMetadata will return empty. Post the first 
readFromLog call, the fetchOffsetMetadata will be
         // populated for the share partition, which has 1 as the positional 
difference, so it doesn't satisfy the minBytes(2).
         when(sp0.fetchOffsetMetadata(anyLong()))
             .thenReturn(Optional.empty())
             .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
         LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
-        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
 
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
@@ -230,6 +225,7 @@ public class DelayedShareFetchTest {
         PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Set.of(tp0));
 
         Partition p0 = mock(Partition.class);
+        mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0);
         when(p0.isLeader()).thenReturn(true);
 
         Partition p1 = mock(Partition.class);
@@ -258,7 +254,7 @@ public class DelayedShareFetchTest {
 
         assertFalse(delayedShareFetch.isCompleted());
 
-        // Since sp1 cannot be acquired, tryComplete should return false.
+        // Since minBytes(2) is not satisfied (only 1 byte available from 
sp0), and sp1 cannot be acquired, tryComplete should return false.
         assertFalse(delayedShareFetch.tryComplete());
         assertFalse(delayedShareFetch.isCompleted());
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
@@ -270,7 +266,7 @@ public class DelayedShareFetchTest {
         assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
 
         delayedShareFetch.lock().unlock();
-        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        Mockito.verify(exceptionHandler, never()).accept(any(), any());
     }
 
     @Test
@@ -296,18 +292,16 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition has been fetched 
before, hence we are mocking positionDiff
         // functionality to give the file position difference as 1 byte, so it 
doesn't satisfy the minBytes(2).
         LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
         when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
         
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
-        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
 
         Partition p0 = mock(Partition.class);
+        mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0);
         when(p0.isLeader()).thenReturn(true);
 
         Partition p1 = mock(Partition.class);
@@ -336,7 +330,7 @@ public class DelayedShareFetchTest {
         Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
         assertTrue(delayedShareFetch.lock().tryLock());
         delayedShareFetch.lock().unlock();
-        Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+        Mockito.verify(exceptionHandler, never()).accept(any(), any());
     }
 
     @Test
@@ -360,8 +354,6 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
         when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
@@ -373,6 +365,7 @@ public class DelayedShareFetchTest {
         when(time.hiResClockMs()).thenReturn(120L).thenReturn(140L);
         ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
         Uuid fetchId = Uuid.randomUuid();
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
@@ -381,6 +374,7 @@ public class DelayedShareFetchTest {
             .withShareGroupMetrics(shareGroupMetrics)
             .withTime(time)
             .withFetchId(fetchId)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -388,17 +382,23 @@ public class DelayedShareFetchTest {
 
         assertFalse(delayedShareFetch.isCompleted());
 
-        // Since sp1 can be acquired, tryComplete should return true.
-        assertTrue(delayedShareFetch.tryComplete());
-        assertTrue(delayedShareFetch.isCompleted());
-        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
-        assertTrue(delayedShareFetch.lock().tryLock());
-        assertEquals(1, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
-        assertEquals(20, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
-        assertEquals(1, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
-        assertEquals(50, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(tp0, 
mock(ShareFetchResponseData.PartitionData.class)));
 
-        delayedShareFetch.lock().unlock();
+            // Since sp0 can be acquired, tryComplete should return true.
+            assertTrue(delayedShareFetch.tryComplete());
+            assertTrue(delayedShareFetch.isCompleted());
+            Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+            assertTrue(delayedShareFetch.lock().tryLock());
+            assertEquals(1, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
+            assertEquals(20, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
+            assertEquals(1, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
+            assertEquals(50, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+
+            delayedShareFetch.lock().unlock();
+        }
     }
 
     @Test
@@ -479,8 +479,6 @@ public class DelayedShareFetchTest {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
         PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Set.of(tp0));
@@ -489,6 +487,7 @@ public class DelayedShareFetchTest {
         when(time.hiResClockMs()).thenReturn(10L).thenReturn(140L);
         ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
         Uuid fetchId = Uuid.randomUuid();
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withReplicaManager(replicaManager)
@@ -497,29 +496,36 @@ public class DelayedShareFetchTest {
             .withShareGroupMetrics(shareGroupMetrics)
             .withTime(time)
             .withFetchId(fetchId)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
         when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(true);
 
-        assertFalse(delayedShareFetch.isCompleted());
-        delayedShareFetch.forceComplete();
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(tp0, 
mock(ShareFetchResponseData.PartitionData.class)));
 
-        // Since we can acquire records from sp0, replicaManager.readFromLog 
should be called once and only for sp0.
-        Mockito.verify(replicaManager, times(1)).readFromLog(
+            assertFalse(delayedShareFetch.isCompleted());
+            delayedShareFetch.forceComplete();
+
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+            // Since we can acquire records from sp0, 
replicaManager.readFromLog should be called once and only for sp0.
+            Mockito.verify(replicaManager, times(1)).readFromLog(
                 any(), any(), any(ReplicaQuota.class), anyBoolean());
-        Mockito.verify(sp0, times(1)).nextFetchOffset();
-        Mockito.verify(sp1, times(0)).nextFetchOffset();
-        assertTrue(delayedShareFetch.isCompleted());
-        assertTrue(shareFetch.isCompleted());
-        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
-        assertTrue(delayedShareFetch.lock().tryLock());
-        assertEquals(1, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
-        assertEquals(130, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
-        assertEquals(1, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
-        assertEquals(50, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+            Mockito.verify(sp0, times(1)).nextFetchOffset();
+            Mockito.verify(sp1, times(0)).nextFetchOffset();
+            assertTrue(delayedShareFetch.isCompleted());
+            assertTrue(shareFetch.isCompleted());
+            Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(any());
+            assertTrue(delayedShareFetch.lock().tryLock());
+            assertEquals(1, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
+            assertEquals(130, 
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
+            assertEquals(1, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
+            assertEquals(50, 
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
 
-        delayedShareFetch.lock().unlock();
+            delayedShareFetch.lock().unlock();
+        }
     }
 
     @Test
@@ -658,33 +664,38 @@ public class DelayedShareFetchTest {
         sharePartitions2.put(tp2, sp2);
 
         Uuid fetchId2 = Uuid.randomUuid();
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch2 = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch2)
             .withReplicaManager(replicaManager)
             .withSharePartitions(sharePartitions2)
             .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
             .withFetchId(fetchId2)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         // sp1 can be acquired now
         when(sp1.maybeAcquireFetchLock(fetchId2)).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(true);
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-
-        // when forceComplete is called for delayedShareFetch2, since tp1 is 
common in between delayed share fetch
-        // requests, it should add a "check and complete" action for request 
key tp1 on the purgatory.
-        delayedShareFetch2.forceComplete();
-        assertTrue(delayedShareFetch2.isCompleted());
-        assertTrue(shareFetch2.isCompleted());
-        Mockito.verify(replicaManager, times(1)).readFromLog(
-            any(), any(), any(ReplicaQuota.class), anyBoolean());
-        assertFalse(delayedShareFetch1.isCompleted());
-        Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
-        Mockito.verify(replicaManager, times(0)).tryCompleteActions();
-        Mockito.verify(delayedShareFetch2, 
times(1)).releasePartitionLocks(any());
-        assertTrue(delayedShareFetch2.lock().tryLock());
-        delayedShareFetch2.lock().unlock();
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(tp0, 
mock(ShareFetchResponseData.PartitionData.class)));
+
+            // when forceComplete is called for delayedShareFetch2, since tp1 
is common in between delayed share fetch
+            // requests, it should add a "check and complete" action for 
request key tp1 on the purgatory.
+            delayedShareFetch2.forceComplete();
+            assertTrue(delayedShareFetch2.isCompleted());
+            assertTrue(shareFetch2.isCompleted());
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+            Mockito.verify(replicaManager, times(1)).readFromLog(
+                any(), any(), any(ReplicaQuota.class), anyBoolean());
+            assertFalse(delayedShareFetch1.isCompleted());
+            Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
+            Mockito.verify(replicaManager, times(0)).tryCompleteActions();
+            Mockito.verify(delayedShareFetch2, 
times(1)).releasePartitionLocks(any());
+            assertTrue(delayedShareFetch2.lock().tryLock());
+            delayedShareFetch2.lock().unlock();
+        }
     }
 
     @Test
@@ -763,8 +774,6 @@ public class DelayedShareFetchTest {
             BROKER_TOPIC_STATS);
 
         when(sp0.canAcquireRecords()).thenReturn(true);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
 
         // Mocking partition object to throw an exception during min bytes 
calculation while calling fetchOffsetSnapshot
@@ -1033,17 +1042,6 @@ public class DelayedShareFetchTest {
             Uuid.randomUuid().toString(), new CompletableFuture<>(), 
List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE,
             MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-
         // All 5 partitions are acquirable.
         doAnswer(invocation -> 
buildLogReadResult(sharePartitions.keySet().stream().toList())).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
@@ -1060,12 +1058,14 @@ public class DelayedShareFetchTest {
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp4, 
1);
 
         Uuid fetchId = Uuid.randomUuid();
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
             .withFetchId(fetchId)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -1074,29 +1074,40 @@ public class DelayedShareFetchTest {
         when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true);
         when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(true);
 
-        assertTrue(delayedShareFetch.tryComplete());
-        assertTrue(delayedShareFetch.isCompleted());
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(
+                    tp0, mock(ShareFetchResponseData.PartitionData.class),
+                    tp1, mock(ShareFetchResponseData.PartitionData.class),
+                    tp2, mock(ShareFetchResponseData.PartitionData.class),
+                    tp3, mock(ShareFetchResponseData.PartitionData.class),
+                    tp4, mock(ShareFetchResponseData.PartitionData.class)));
 
-        // Since all partitions are acquirable, maxbytes per partition = 
requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5)
-        int expectedPartitionMaxBytes = 1024 * 1020 / 5;
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
expectedReadPartitionInfo = new LinkedHashMap<>();
-        sharePartitions.keySet().forEach(topicIdPartition -> 
expectedReadPartitionInfo.put(topicIdPartition,
-            new FetchRequest.PartitionData(
-                topicIdPartition.topicId(),
-                0,
-                0,
-                expectedPartitionMaxBytes,
-                Optional.empty()
-            )));
+            assertTrue(delayedShareFetch.tryComplete());
+            assertTrue(delayedShareFetch.isCompleted());
 
-        Mockito.verify(replicaManager, times(1)).readFromLog(
-            shareFetch.fetchParams(),
-            CollectionConverters.asScala(
-                sharePartitions.keySet().stream().map(topicIdPartition ->
-                    new Tuple2<>(topicIdPartition, 
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
-            ),
-            QuotaFactory.UNBOUNDED_QUOTA,
-            true);
+            // Since all partitions are acquirable, maxbytes per partition = 
requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5)
+            int expectedPartitionMaxBytes = 1024 * 1020 / 5;
+            LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
expectedReadPartitionInfo = new LinkedHashMap<>();
+            sharePartitions.keySet().forEach(topicIdPartition -> 
expectedReadPartitionInfo.put(topicIdPartition,
+                new FetchRequest.PartitionData(
+                    topicIdPartition.topicId(),
+                    0,
+                    0,
+                    expectedPartitionMaxBytes,
+                    Optional.empty()
+                )));
+
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+            Mockito.verify(replicaManager, times(1)).readFromLog(
+                shareFetch.fetchParams(),
+                CollectionConverters.asScala(
+                    sharePartitions.keySet().stream().map(topicIdPartition ->
+                        new Tuple2<>(topicIdPartition, 
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
+                ),
+                QuotaFactory.UNBOUNDED_QUOTA,
+                true);
+        }
     }
 
     @Test
@@ -1132,11 +1143,6 @@ public class DelayedShareFetchTest {
             new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4), 
BATCH_SIZE, MAX_FETCH_RECORDS,
             BROKER_TOPIC_STATS);
 
-        when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-        when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(), 
any(FetchPartitionData.class), any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
-
         // Only 2 out of 5 partitions are acquirable.
         Set<TopicIdPartition> acquirableTopicPartitions = new 
LinkedHashSet<>();
         acquirableTopicPartitions.add(tp0);
@@ -1150,12 +1156,14 @@ public class DelayedShareFetchTest {
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 
1);
 
         Uuid fetchId = Uuid.randomUuid();
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
             .withFetchId(fetchId)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -1164,29 +1172,37 @@ public class DelayedShareFetchTest {
         when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true);
         when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(false);
 
-        assertTrue(delayedShareFetch.tryComplete());
-        assertTrue(delayedShareFetch.isCompleted());
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(
+                    tp0, mock(ShareFetchResponseData.PartitionData.class),
+                    tp1, mock(ShareFetchResponseData.PartitionData.class)));
 
-        // Since only 2 partitions are acquirable, maxbytes per partition = 
requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2)
-        int expectedPartitionMaxBytes = 1024 * 1024 / 2;
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
expectedReadPartitionInfo = new LinkedHashMap<>();
-        acquirableTopicPartitions.forEach(topicIdPartition -> 
expectedReadPartitionInfo.put(topicIdPartition,
-            new FetchRequest.PartitionData(
-                topicIdPartition.topicId(),
-                0,
-                0,
-                expectedPartitionMaxBytes,
-                Optional.empty()
-            )));
+            assertTrue(delayedShareFetch.tryComplete());
+            assertTrue(delayedShareFetch.isCompleted());
 
-        Mockito.verify(replicaManager, times(1)).readFromLog(
-            shareFetch.fetchParams(),
-            CollectionConverters.asScala(
-                acquirableTopicPartitions.stream().map(topicIdPartition ->
-                    new Tuple2<>(topicIdPartition, 
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
-            ),
-            QuotaFactory.UNBOUNDED_QUOTA,
-            true);
+            // Since only 2 partitions are acquirable, maxbytes per partition 
= requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2)
+            int expectedPartitionMaxBytes = 1024 * 1024 / 2;
+            LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
expectedReadPartitionInfo = new LinkedHashMap<>();
+            acquirableTopicPartitions.forEach(topicIdPartition -> 
expectedReadPartitionInfo.put(topicIdPartition,
+                new FetchRequest.PartitionData(
+                    topicIdPartition.topicId(),
+                    0,
+                    0,
+                    expectedPartitionMaxBytes,
+                    Optional.empty()
+                )));
+
+            Mockito.verify(replicaManager, times(1)).readFromLog(
+                shareFetch.fetchParams(),
+                CollectionConverters.asScala(
+                    acquirableTopicPartitions.stream().map(topicIdPartition ->
+                        new Tuple2<>(topicIdPartition, 
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
+                ),
+                QuotaFactory.UNBOUNDED_QUOTA,
+                true);
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+        }
     }
 
     @Test
@@ -1665,34 +1681,39 @@ public class DelayedShareFetchTest {
         
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
         
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
 
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
             .withSharePartitions(sharePartitions)
             .withReplicaManager(replicaManager)
             .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, 
tp1)))
             .withFetchId(fetchId)
+            .withExceptionHandler(exceptionHandler)
             .build());
 
         // sp0 is acquirable, sp1 is not acquirable.
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
         when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(false);
 
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+                .thenReturn(Map.of(tp0, new 
ShareFetchResponseData.PartitionData().setErrorCode(Errors.REQUEST_TIMED_OUT.code())));
 
-        assertFalse(delayedShareFetch.isCompleted());
-        assertTrue(delayedShareFetch.tryComplete());
+            assertFalse(delayedShareFetch.isCompleted());
+            assertTrue(delayedShareFetch.tryComplete());
 
-        assertTrue(delayedShareFetch.isCompleted());
-        // Pending remote fetch object gets created for delayed share fetch.
-        assertNotNull(delayedShareFetch.pendingRemoteFetches());
-        // Verify the locks are released for tp0.
-        Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
-        assertTrue(shareFetch.isCompleted());
-        assertEquals(Set.of(tp0), future.join().keySet());
-        assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
-        assertTrue(delayedShareFetch.lock().tryLock());
-        delayedShareFetch.lock().unlock();
+            assertTrue(delayedShareFetch.isCompleted());
+            // Pending remote fetch object gets created for delayed share 
fetch.
+            assertNotNull(delayedShareFetch.pendingRemoteFetches());
+            // Verify the locks are released for tp0.
+            Mockito.verify(delayedShareFetch, 
times(1)).releasePartitionLocks(Set.of(tp0));
+            Mockito.verify(exceptionHandler, never()).accept(any(), any());
+            assertTrue(shareFetch.isCompleted());
+            assertEquals(Set.of(tp0), future.join().keySet());
+            assertEquals(Errors.REQUEST_TIMED_OUT.code(), 
future.join().get(tp0).errorCode());
+            assertTrue(delayedShareFetch.lock().tryLock());
+            delayedShareFetch.lock().unlock();
+        }
     }
 
     @Test
@@ -1702,7 +1723,6 @@ public class DelayedShareFetchTest {
 
         SharePartition sp0 = mock(SharePartition.class);
 
-
         when(sp0.canAcquireRecords()).thenReturn(true);
 
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
@@ -2151,12 +2171,10 @@ public class DelayedShareFetchTest {
         
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
     }
 
-    private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, 
TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) {
+    private void mockTopicIdPartitionFetchBytes(LogOffsetMetadata 
hwmOffsetMetadata, Partition partition) {
         LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, 
mock(LogOffsetMetadata.class),
             hwmOffsetMetadata, mock(LogOffsetMetadata.class));
-        Partition partition = mock(Partition.class);
         when(partition.fetchOffsetSnapshot(any(), 
anyBoolean())).thenReturn(endOffsetSnapshot);
-        
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
     }
 
     private PartitionMaxBytesStrategy 
mockPartitionMaxBytes(Set<TopicIdPartition> partitions) {


Reply via email to