This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b96273ba00 KAFKA-19345: Use ShareFetchUtils mock for
DelayedShareFetchTest tests (#20765)
6b96273ba00 is described below
commit 6b96273ba007562fc1eb3af6bcc33abc92782b3d
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Oct 29 19:03:18 2025 +0800
KAFKA-19345: Use ShareFetchUtils mock for DelayedShareFetchTest tests
(#20765)
This patch wraps ShareFetchUtils static method `processFetchResponse`
with MockedStatic to improve test isolation and also fixes some
incorrect test results.
Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 1 -
.../kafka/server/share/DelayedShareFetchTest.java | 290 +++++++++++----------
2 files changed, 154 insertions(+), 137 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 7b061a28bd5..bc8146910ad 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -526,7 +526,6 @@ public class DelayedShareFetch extends DelayedOperation {
return offsetSnapshot.highWatermark();
else
return offsetSnapshot.lastStableOffset();
-
}
private LinkedHashMap<TopicIdPartition, LogReadResult>
readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 9585714deb8..0f7a82f9e15 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -42,7 +42,6 @@ import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
-import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -88,7 +87,6 @@ import static
kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_P
import static
kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static
kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
-import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -98,11 +96,11 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;
@@ -212,17 +210,14 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// We are testing the case when the share partition is getting fetched
for the first time, so for the first time
- // the fetchOffsetMetadata will return empty. Post the readFromLog
call, the fetchOffsetMetadata will be
+ // the fetchOffsetMetadata will return empty. Post the first
readFromLog call, the fetchOffsetMetadata will be
// populated for the share partition, which has 1 as the positional
difference, so it doesn't satisfy the minBytes(2).
when(sp0.fetchOffsetMetadata(anyLong()))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
- mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
@@ -230,6 +225,7 @@ public class DelayedShareFetchTest {
PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Set.of(tp0));
Partition p0 = mock(Partition.class);
+ mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0);
when(p0.isLeader()).thenReturn(true);
Partition p1 = mock(Partition.class);
@@ -258,7 +254,7 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.isCompleted());
- // Since sp1 cannot be acquired, tryComplete should return false.
+ // Since minBytes(2) is not satisfied (only 1 byte available from
sp0), and sp1 cannot be acquired, tryComplete should return false.
assertFalse(delayedShareFetch.tryComplete());
assertFalse(delayedShareFetch.isCompleted());
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
@@ -270,7 +266,7 @@ public class DelayedShareFetchTest {
assertNull(shareGroupMetrics.topicPartitionsFetchRatio(groupId));
delayedShareFetch.lock().unlock();
- Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
}
@Test
@@ -296,18 +292,16 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
// We are testing the case when the share partition has been fetched
before, hence we are mocking positionDiff
// functionality to give the file position difference as 1 byte, so it
doesn't satisfy the minBytes(2).
LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
- mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
Partition p0 = mock(Partition.class);
+ mockTopicIdPartitionFetchBytes(hwmOffsetMetadata, p0);
when(p0.isLeader()).thenReturn(true);
Partition p1 = mock(Partition.class);
@@ -336,7 +330,7 @@ public class DelayedShareFetchTest {
Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
assertTrue(delayedShareFetch.lock().tryLock());
delayedShareFetch.lock().unlock();
- Mockito.verify(exceptionHandler, times(1)).accept(any(), any());
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
}
@Test
@@ -360,8 +354,6 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
@@ -373,6 +365,7 @@ public class DelayedShareFetchTest {
when(time.hiResClockMs()).thenReturn(120L).thenReturn(140L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
Uuid fetchId = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
@@ -381,6 +374,7 @@ public class DelayedShareFetchTest {
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.withFetchId(fetchId)
+ .withExceptionHandler(exceptionHandler)
.build());
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -388,17 +382,23 @@ public class DelayedShareFetchTest {
assertFalse(delayedShareFetch.isCompleted());
- // Since sp1 can be acquired, tryComplete should return true.
- assertTrue(delayedShareFetch.tryComplete());
- assertTrue(delayedShareFetch.isCompleted());
- Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
- assertTrue(delayedShareFetch.lock().tryLock());
- assertEquals(1,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
- assertEquals(20,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
- assertEquals(1,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
- assertEquals(50,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(tp0,
mock(ShareFetchResponseData.PartitionData.class)));
- delayedShareFetch.lock().unlock();
+ // Since sp0 can be acquired, tryComplete should return true.
+ assertTrue(delayedShareFetch.tryComplete());
+ assertTrue(delayedShareFetch.isCompleted());
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ assertEquals(1,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
+ assertEquals(20,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
+ assertEquals(1,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
+ assertEquals(50,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+
+ delayedShareFetch.lock().unlock();
+ }
}
@Test
@@ -479,8 +479,6 @@ public class DelayedShareFetchTest {
when(sp0.canAcquireRecords()).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(false);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Set.of(tp0));
@@ -489,6 +487,7 @@ public class DelayedShareFetchTest {
when(time.hiResClockMs()).thenReturn(10L).thenReturn(140L);
ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
Uuid fetchId = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withReplicaManager(replicaManager)
@@ -497,29 +496,36 @@ public class DelayedShareFetchTest {
.withShareGroupMetrics(shareGroupMetrics)
.withTime(time)
.withFetchId(fetchId)
+ .withExceptionHandler(exceptionHandler)
.build());
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(true);
- assertFalse(delayedShareFetch.isCompleted());
- delayedShareFetch.forceComplete();
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(tp0,
mock(ShareFetchResponseData.PartitionData.class)));
- // Since we can acquire records from sp0, replicaManager.readFromLog
should be called once and only for sp0.
- Mockito.verify(replicaManager, times(1)).readFromLog(
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ // Since we can acquire records from sp0,
replicaManager.readFromLog should be called once and only for sp0.
+ Mockito.verify(replicaManager, times(1)).readFromLog(
any(), any(), any(ReplicaQuota.class), anyBoolean());
- Mockito.verify(sp0, times(1)).nextFetchOffset();
- Mockito.verify(sp1, times(0)).nextFetchOffset();
- assertTrue(delayedShareFetch.isCompleted());
- assertTrue(shareFetch.isCompleted());
- Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
- assertTrue(delayedShareFetch.lock().tryLock());
- assertEquals(1,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
- assertEquals(130,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
- assertEquals(1,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
- assertEquals(50,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
+ Mockito.verify(sp0, times(1)).nextFetchOffset();
+ Mockito.verify(sp1, times(0)).nextFetchOffset();
+ assertTrue(delayedShareFetch.isCompleted());
+ assertTrue(shareFetch.isCompleted());
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ assertEquals(1,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
+ assertEquals(130,
shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
+ assertEquals(1,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
+ assertEquals(50,
shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
- delayedShareFetch.lock().unlock();
+ delayedShareFetch.lock().unlock();
+ }
}
@Test
@@ -658,33 +664,38 @@ public class DelayedShareFetchTest {
sharePartitions2.put(tp2, sp2);
Uuid fetchId2 = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch2 =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch2)
.withReplicaManager(replicaManager)
.withSharePartitions(sharePartitions2)
.withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
.withFetchId(fetchId2)
+ .withExceptionHandler(exceptionHandler)
.build());
// sp1 can be acquired now
when(sp1.maybeAcquireFetchLock(fetchId2)).thenReturn(true);
when(sp1.canAcquireRecords()).thenReturn(true);
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
-
- // when forceComplete is called for delayedShareFetch2, since tp1 is
common in between delayed share fetch
- // requests, it should add a "check and complete" action for request
key tp1 on the purgatory.
- delayedShareFetch2.forceComplete();
- assertTrue(delayedShareFetch2.isCompleted());
- assertTrue(shareFetch2.isCompleted());
- Mockito.verify(replicaManager, times(1)).readFromLog(
- any(), any(), any(ReplicaQuota.class), anyBoolean());
- assertFalse(delayedShareFetch1.isCompleted());
- Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
- Mockito.verify(replicaManager, times(0)).tryCompleteActions();
- Mockito.verify(delayedShareFetch2,
times(1)).releasePartitionLocks(any());
- assertTrue(delayedShareFetch2.lock().tryLock());
- delayedShareFetch2.lock().unlock();
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(tp0,
mock(ShareFetchResponseData.PartitionData.class)));
+
+ // when forceComplete is called for delayedShareFetch2, since tp1
is common in between delayed share fetch
+ // requests, it should add a "check and complete" action for
request key tp1 on the purgatory.
+ delayedShareFetch2.forceComplete();
+ assertTrue(delayedShareFetch2.isCompleted());
+ assertTrue(shareFetch2.isCompleted());
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ assertFalse(delayedShareFetch1.isCompleted());
+ Mockito.verify(replicaManager, times(1)).addToActionQueue(any());
+ Mockito.verify(replicaManager, times(0)).tryCompleteActions();
+ Mockito.verify(delayedShareFetch2,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch2.lock().tryLock());
+ delayedShareFetch2.lock().unlock();
+ }
}
@Test
@@ -763,8 +774,6 @@ public class DelayedShareFetchTest {
BROKER_TOPIC_STATS);
when(sp0.canAcquireRecords()).thenReturn(true);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
// Mocking partition object to throw an exception during min bytes
calculation while calling fetchOffsetSnapshot
@@ -1033,17 +1042,6 @@ public class DelayedShareFetchTest {
Uuid.randomUuid().toString(), new CompletableFuture<>(),
List.of(tp0, tp1, tp2, tp3, tp4), BATCH_SIZE,
MAX_FETCH_RECORDS, BROKER_TOPIC_STATS);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp2.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp3.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp4.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
-
// All 5 partitions are acquirable.
doAnswer(invocation ->
buildLogReadResult(sharePartitions.keySet().stream().toList())).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
@@ -1060,12 +1058,14 @@ public class DelayedShareFetchTest {
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp4,
1);
Uuid fetchId = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
.withFetchId(fetchId)
+ .withExceptionHandler(exceptionHandler)
.build());
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -1074,29 +1074,40 @@ public class DelayedShareFetchTest {
when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(true);
- assertTrue(delayedShareFetch.tryComplete());
- assertTrue(delayedShareFetch.isCompleted());
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(
+ tp0, mock(ShareFetchResponseData.PartitionData.class),
+ tp1, mock(ShareFetchResponseData.PartitionData.class),
+ tp2, mock(ShareFetchResponseData.PartitionData.class),
+ tp3, mock(ShareFetchResponseData.PartitionData.class),
+ tp4, mock(ShareFetchResponseData.PartitionData.class)));
- // Since all partitions are acquirable, maxbytes per partition =
requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5)
- int expectedPartitionMaxBytes = 1024 * 1020 / 5;
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
expectedReadPartitionInfo = new LinkedHashMap<>();
- sharePartitions.keySet().forEach(topicIdPartition ->
expectedReadPartitionInfo.put(topicIdPartition,
- new FetchRequest.PartitionData(
- topicIdPartition.topicId(),
- 0,
- 0,
- expectedPartitionMaxBytes,
- Optional.empty()
- )));
+ assertTrue(delayedShareFetch.tryComplete());
+ assertTrue(delayedShareFetch.isCompleted());
- Mockito.verify(replicaManager, times(1)).readFromLog(
- shareFetch.fetchParams(),
- CollectionConverters.asScala(
- sharePartitions.keySet().stream().map(topicIdPartition ->
- new Tuple2<>(topicIdPartition,
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
+ // Since all partitions are acquirable, maxbytes per partition =
requestMaxBytes(i.e. 1024*1020) / acquiredTopicPartitions(i.e. 5)
+ int expectedPartitionMaxBytes = 1024 * 1020 / 5;
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
expectedReadPartitionInfo = new LinkedHashMap<>();
+ sharePartitions.keySet().forEach(topicIdPartition ->
expectedReadPartitionInfo.put(topicIdPartition,
+ new FetchRequest.PartitionData(
+ topicIdPartition.topicId(),
+ 0,
+ 0,
+ expectedPartitionMaxBytes,
+ Optional.empty()
+ )));
+
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ shareFetch.fetchParams(),
+ CollectionConverters.asScala(
+ sharePartitions.keySet().stream().map(topicIdPartition ->
+ new Tuple2<>(topicIdPartition,
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
+ ),
+ QuotaFactory.UNBOUNDED_QUOTA,
+ true);
+ }
}
@Test
@@ -1132,11 +1143,6 @@ public class DelayedShareFetchTest {
new CompletableFuture<>(), List.of(tp0, tp1, tp2, tp3, tp4),
BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
- when(sp0.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
- when(sp1.acquire(anyString(), anyInt(), anyInt(), anyLong(),
any(FetchPartitionData.class), any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
-
// Only 2 out of 5 partitions are acquirable.
Set<TopicIdPartition> acquirableTopicPartitions = new
LinkedHashSet<>();
acquirableTopicPartitions.add(tp0);
@@ -1150,12 +1156,14 @@ public class DelayedShareFetchTest {
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1,
1);
Uuid fetchId = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
.withFetchId(fetchId)
+ .withExceptionHandler(exceptionHandler)
.build());
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
@@ -1164,29 +1172,37 @@ public class DelayedShareFetchTest {
when(sp3.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp4.maybeAcquireFetchLock(fetchId)).thenReturn(false);
- assertTrue(delayedShareFetch.tryComplete());
- assertTrue(delayedShareFetch.isCompleted());
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(
+ tp0, mock(ShareFetchResponseData.PartitionData.class),
+ tp1, mock(ShareFetchResponseData.PartitionData.class)));
- // Since only 2 partitions are acquirable, maxbytes per partition =
requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2)
- int expectedPartitionMaxBytes = 1024 * 1024 / 2;
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
expectedReadPartitionInfo = new LinkedHashMap<>();
- acquirableTopicPartitions.forEach(topicIdPartition ->
expectedReadPartitionInfo.put(topicIdPartition,
- new FetchRequest.PartitionData(
- topicIdPartition.topicId(),
- 0,
- 0,
- expectedPartitionMaxBytes,
- Optional.empty()
- )));
+ assertTrue(delayedShareFetch.tryComplete());
+ assertTrue(delayedShareFetch.isCompleted());
- Mockito.verify(replicaManager, times(1)).readFromLog(
- shareFetch.fetchParams(),
- CollectionConverters.asScala(
- acquirableTopicPartitions.stream().map(topicIdPartition ->
- new Tuple2<>(topicIdPartition,
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
+ // Since only 2 partitions are acquirable, maxbytes per partition
= requestMaxBytes(i.e. 1024*1024) / acquiredTopicPartitions(i.e. 2)
+ int expectedPartitionMaxBytes = 1024 * 1024 / 2;
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
expectedReadPartitionInfo = new LinkedHashMap<>();
+ acquirableTopicPartitions.forEach(topicIdPartition ->
expectedReadPartitionInfo.put(topicIdPartition,
+ new FetchRequest.PartitionData(
+ topicIdPartition.topicId(),
+ 0,
+ 0,
+ expectedPartitionMaxBytes,
+ Optional.empty()
+ )));
+
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ shareFetch.fetchParams(),
+ CollectionConverters.asScala(
+ acquirableTopicPartitions.stream().map(topicIdPartition ->
+ new Tuple2<>(topicIdPartition,
expectedReadPartitionInfo.get(topicIdPartition))).collect(Collectors.toList())
+ ),
+ QuotaFactory.UNBOUNDED_QUOTA,
+ true);
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ }
}
@Test
@@ -1665,34 +1681,39 @@ public class DelayedShareFetchTest {
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
.withSharePartitions(sharePartitions)
.withReplicaManager(replicaManager)
.withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0,
tp1)))
.withFetchId(fetchId)
+ .withExceptionHandler(exceptionHandler)
.build());
// sp0 is acquirable, sp1 is not acquirable.
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(false);
- when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(),
any())).thenReturn(
- createShareAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(tp0, new
ShareFetchResponseData.PartitionData().setErrorCode(Errors.REQUEST_TIMED_OUT.code())));
- assertFalse(delayedShareFetch.isCompleted());
- assertTrue(delayedShareFetch.tryComplete());
+ assertFalse(delayedShareFetch.isCompleted());
+ assertTrue(delayedShareFetch.tryComplete());
- assertTrue(delayedShareFetch.isCompleted());
- // Pending remote fetch object gets created for delayed share fetch.
- assertNotNull(delayedShareFetch.pendingRemoteFetches());
- // Verify the locks are released for tp0.
- Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(Set.of(tp0));
- assertTrue(shareFetch.isCompleted());
- assertEquals(Set.of(tp0), future.join().keySet());
- assertEquals(Errors.REQUEST_TIMED_OUT.code(),
future.join().get(tp0).errorCode());
- assertTrue(delayedShareFetch.lock().tryLock());
- delayedShareFetch.lock().unlock();
+ assertTrue(delayedShareFetch.isCompleted());
+ // Pending remote fetch object gets created for delayed share
fetch.
+ assertNotNull(delayedShareFetch.pendingRemoteFetches());
+ // Verify the locks are released for tp0.
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(Set.of(tp0));
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ assertTrue(shareFetch.isCompleted());
+ assertEquals(Set.of(tp0), future.join().keySet());
+ assertEquals(Errors.REQUEST_TIMED_OUT.code(),
future.join().get(tp0).errorCode());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
+ }
}
@Test
@@ -1702,7 +1723,6 @@ public class DelayedShareFetchTest {
SharePartition sp0 = mock(SharePartition.class);
-
when(sp0.canAcquireRecords()).thenReturn(true);
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
@@ -2151,12 +2171,10 @@ public class DelayedShareFetchTest {
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
}
- private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager,
TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) {
+ private void mockTopicIdPartitionFetchBytes(LogOffsetMetadata
hwmOffsetMetadata, Partition partition) {
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1,
mock(LogOffsetMetadata.class),
hwmOffsetMetadata, mock(LogOffsetMetadata.class));
- Partition partition = mock(Partition.class);
when(partition.fetchOffsetSnapshot(any(),
anyBoolean())).thenReturn(endOffsetSnapshot);
-
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
}
private PartitionMaxBytesStrategy
mockPartitionMaxBytes(Set<TopicIdPartition> partitions) {