This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29254d6816a KAFKA-20425: Optimize purgatory triggers when no data is
available for share fetch (#22005)
29254d6816a is described below
commit 29254d6816a97854e7ceaf4e94d8e564e47a4443
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Apr 9 10:09:32 2026 +0100
KAFKA-20425: Optimize purgatory triggers when no data is available for
share fetch (#22005)
The delayed share fetch triggers waiting requests for same topic
partition in purgatory for which the request has been completed. This is
needed as for same sahre partition there can be requests in purgatory
waiting to acquire lock. However, the triggers are always not needed
i.e. when no data is available in partition hence these triggers can be
avoided.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 49 ++++--
.../kafka/server/share/DelayedShareFetchTest.java | 192 +++++++++++++++++++++
.../server/share/SharePartitionManagerTest.java | 13 +-
3 files changed, 238 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index bc8146910ad..af37723c421 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -263,6 +264,7 @@ public class DelayedShareFetch extends DelayedOperation {
}
private void
processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition,
Long> topicPartitionData) {
+ List<ShareFetchPartitionData> shareFetchPartitionDataList = new
ArrayList<>();
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (localPartitionsAlreadyFetched.isEmpty())
@@ -276,7 +278,6 @@ public class DelayedShareFetch extends DelayedOperation {
resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData,
responseData);
- List<ShareFetchPartitionData> shareFetchPartitionDataList = new
ArrayList<>();
responseData.forEach((topicIdPartition, logReadResult) -> {
if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {
shareFetchPartitionDataList.add(new
ShareFetchPartitionData(
@@ -298,7 +299,7 @@ public class DelayedShareFetch extends DelayedOperation {
log.error("Error processing delayed share fetch request", e);
handleFetchException(shareFetch, topicPartitionData.keySet(), e);
} finally {
-
releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet());
+
releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet(),
partitionsWithData(shareFetchPartitionDataList));
}
}
@@ -691,7 +692,7 @@ public class DelayedShareFetch extends DelayedOperation {
});
// Release fetch lock for the topic partitions that were acquired but
were not a part of remote fetch and add
// them to the delayed actions queue.
-
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
+
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions,
nonRemoteFetchTopicPartitions);
processRemoteFetchOrException(remoteStorageFetchInfoMap);
// Check if remote fetch can be completed.
return maybeCompletePendingRemoteFetch();
@@ -809,18 +810,38 @@ public class DelayedShareFetch extends DelayedOperation {
try {
handleFetchException(shareFetch, partitionsAcquired.keySet(),
remoteStorageFetchException.get());
} finally {
-
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet(),
partitionsAcquired.keySet());
}
}
- private void
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition>
topicIdPartitions) {
- if (topicIdPartitions.isEmpty()) {
+ private Set<TopicIdPartition>
partitionsWithData(List<ShareFetchPartitionData> shareFetchPartitionDataList) {
+ if (shareFetchPartitionDataList == null ||
shareFetchPartitionDataList.isEmpty()) {
+ return Set.of();
+ }
+ Set<TopicIdPartition> partitionsWithData = new HashSet<>();
+ shareFetchPartitionDataList.forEach(shareFetchPartitionData -> {
+ if (shareFetchPartitionData.fetchPartitionData() != null &&
+ shareFetchPartitionData.fetchPartitionData().records != null &&
+
shareFetchPartitionData.fetchPartitionData().records.sizeInBytes() > 0) {
+
partitionsWithData.add(shareFetchPartitionData.topicIdPartition());
+ }
+ });
+ return partitionsWithData;
+ }
+
+ private void
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition>
allAcquiredTopicIdPartitions,
+
Set<TopicIdPartition> topicIdPartitionsWithData) {
+ if (allAcquiredTopicIdPartitions.isEmpty()) {
+ // topicIdPartitionsWithData set should be a subset of
allAcquiredTopicIdPartitions, hence it is safe to return.
return;
}
// Releasing the lock to move ahead with the next request in queue.
- releasePartitionLocks(topicIdPartitions);
- replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition -> {
+ releasePartitionLocks(allAcquiredTopicIdPartitions);
+ if (topicIdPartitionsWithData.isEmpty()) {
+ return;
+ }
+ replicaManager.addToActionQueue(() ->
topicIdPartitionsWithData.forEach(topicIdPartition -> {
// If we have a fetch request completed for a share-partition, we
release the locks for that partition,
// then we should check if there is a pending share fetch request
for the share-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
@@ -844,15 +865,15 @@ public class DelayedShareFetch extends DelayedOperation {
*/
private void completeRemoteStorageShareFetchRequest() {
LinkedHashMap<TopicIdPartition, Long>
acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
+ List<ShareFetchPartitionData> shareFetchPartitionDataList = new
ArrayList<>();
try {
- List<ShareFetchPartitionData> shareFetchPartitionData = new
ArrayList<>();
int readableBytes = 0;
for (RemoteFetch remoteFetch :
pendingRemoteFetchesOpt.get().remoteFetches()) {
if (remoteFetch.remoteFetchResult().isDone()) {
RemoteLogReadResult remoteLogReadResult =
remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error().isPresent()) {
// If there is any error for the remote fetch topic
partition, we populate the error accordingly.
- shareFetchPartitionData.add(
+ shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
@@ -863,7 +884,7 @@ public class DelayedShareFetch extends DelayedOperation {
FetchDataInfo info =
remoteLogReadResult.fetchDataInfo().get();
TopicIdPartition topicIdPartition =
remoteFetch.topicIdPartition();
LogReadResult logReadResult =
remoteFetch.logReadResult();
- shareFetchPartitionData.add(
+ shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
topicIdPartition,
partitionsAcquired.get(remoteFetch.topicIdPartition()),
@@ -907,7 +928,7 @@ public class DelayedShareFetch extends DelayedOperation {
resetFetchOffsetMetadataForRemoteFetchPartitions(acquiredNonRemoteFetchTopicPartitionData,
responseData);
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
responseData.entrySet()) {
if
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
- shareFetchPartitionData.add(
+ shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
entry.getKey(),
acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()),
@@ -925,7 +946,7 @@ public class DelayedShareFetch extends DelayedOperation {
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int)
(acquiredRatio * 100));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
- shareFetch, shareFetchPartitionData, sharePartitions,
replicaManager, exceptionHandler);
+ shareFetch, shareFetchPartitionDataList, sharePartitions,
replicaManager, exceptionHandler);
shareFetch.maybeComplete(remoteFetchResponse);
log.trace("Remote share fetch request completed successfully,
response: {}", remoteFetchResponse);
} catch (InterruptedException | ExecutionException e) {
@@ -937,7 +958,7 @@ public class DelayedShareFetch extends DelayedOperation {
} finally {
Set<TopicIdPartition> topicIdPartitions = new
LinkedHashSet<>(partitionsAcquired.keySet());
topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet());
- releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
+ releasePartitionLocksAndAddToActionQueue(topicIdPartitions,
partitionsWithData(shareFetchPartitionDataList));
}
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 0f5c3b2a545..e34589f5a7b 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -86,6 +86,7 @@ import scala.jdk.javaapi.CollectionConverters;
import static kafka.server.share.PendingRemoteFetches.RemoteFetch;
import static
kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL;
import static
kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS;
+import static
kafka.server.share.SharePartitionManagerTest.buildEmptyLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static
kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -700,6 +701,112 @@ public class DelayedShareFetchTest {
}
}
+ @Test
+ public void
testForceCompleteNotTriggersDelayedActionsQueueWhenFetchDataIsEmpty() {
+ String groupId = "grp";
+ Uuid topicId = Uuid.randomUuid();
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
+ TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
+ TopicIdPartition tp2 = new TopicIdPartition(topicId, new
TopicPartition("foo", 2));
+ List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1);
+
+ SharePartition sp0 = mock(SharePartition.class);
+ SharePartition sp1 = mock(SharePartition.class);
+ SharePartition sp2 = mock(SharePartition.class);
+
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions1 = new
LinkedHashMap<>();
+ sharePartitions1.put(tp0, sp0);
+ sharePartitions1.put(tp1, sp1);
+ sharePartitions1.put(tp2, sp2);
+
+ ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), topicIdPartitions1, BATCH_OPTIMIZED,
BATCH_SIZE, MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+
+ DelayedOperationPurgatory<DelayedShareFetch>
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+ "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+ DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true);
+ mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
+
+ List<DelayedOperationKey> delayedShareFetchWatchKeys = new
ArrayList<>();
+ topicIdPartitions1.forEach(topicIdPartition ->
delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId,
topicIdPartition.topicId(), topicIdPartition.partition())));
+
+ Uuid fetchId1 = Uuid.randomUuid();
+ DelayedShareFetch delayedShareFetch1 =
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch1)
+ .withReplicaManager(replicaManager)
+ .withSharePartitions(sharePartitions1)
+ .withFetchId(fetchId1)
+ .build();
+
+ // No share partition is available for acquiring initially.
+ when(sp0.maybeAcquireFetchLock(fetchId1)).thenReturn(false);
+ when(sp1.maybeAcquireFetchLock(fetchId1)).thenReturn(false);
+ when(sp2.maybeAcquireFetchLock(fetchId1)).thenReturn(false);
+
+ // We add a delayed share fetch entry to the purgatory which will be
waiting for completion since neither of the
+ // partitions in the share fetch request can be acquired.
+ delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1,
delayedShareFetchWatchKeys);
+
+ assertEquals(2, delayedShareFetchPurgatory.watched());
+ assertFalse(shareFetch1.isCompleted());
+ assertTrue(delayedShareFetch1.lock().tryLock());
+ delayedShareFetch1.lock().unlock();
+
+ ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
+ new CompletableFuture<>(), List.of(tp0, tp1), BATCH_OPTIMIZED,
BATCH_SIZE, MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+
+ doAnswer(invocation ->
buildEmptyLogReadResult(List.of(tp1))).when(replicaManager).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+
+ PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mockPartitionMaxBytes(Set.of(tp1));
+
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions2 = new
LinkedHashMap<>();
+ sharePartitions2.put(tp0, sp0);
+ sharePartitions2.put(tp1, sp1);
+ sharePartitions2.put(tp2, sp2);
+
+ Uuid fetchId2 = Uuid.randomUuid();
+ BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
+ DelayedShareFetch delayedShareFetch2 =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch2)
+ .withReplicaManager(replicaManager)
+ .withSharePartitions(sharePartitions2)
+ .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy)
+ .withFetchId(fetchId2)
+ .withExceptionHandler(exceptionHandler)
+ .build());
+
+ // sp1 can be acquired now.
+ when(sp1.maybeAcquireFetchLock(fetchId2)).thenReturn(true);
+ when(sp1.canAcquireRecords()).thenReturn(true);
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ ShareFetchResponseData.PartitionData mockedPartitionData =
mock(ShareFetchResponseData.PartitionData.class);
+ // Empty fetched data.
+
when(mockedPartitionData.records()).thenReturn(MemoryRecords.EMPTY);
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))
+ .thenReturn(Map.of(tp0, mockedPartitionData));
+
+ // when forceComplete is called for delayedShareFetch2, since tp1
is common in between delayed share fetch
+ // requests, it will not add a "check and complete" action for
request key tp1 on the purgatory since no
+ // data was returned in tp1.
+ delayedShareFetch2.forceComplete();
+ assertTrue(delayedShareFetch2.isCompleted());
+ assertTrue(shareFetch2.isCompleted());
+ Mockito.verify(exceptionHandler, never()).accept(any(), any());
+ Mockito.verify(replicaManager, times(1)).readFromLog(
+ any(), any(), any(ReplicaQuota.class), anyBoolean());
+ assertFalse(delayedShareFetch1.isCompleted());
+ Mockito.verify(replicaManager, times(0)).addToActionQueue(any());
+ Mockito.verify(replicaManager, times(0)).tryCompleteActions();
+ Mockito.verify(delayedShareFetch2,
times(1)).releasePartitionLocks(any());
+ assertTrue(delayedShareFetch2.lock().tryLock());
+ delayedShareFetch2.lock().unlock();
+ }
+ }
+
@Test
public void testCombineLogReadResponse() {
String groupId = "grp";
@@ -2162,6 +2269,91 @@ public class DelayedShareFetchTest {
}
}
+ @Test
+ public void testRemoteStorageFetchCompletionNotTriggerActionsQueue() {
+ ReplicaManager replicaManager = mock(ReplicaManager.class);
+ TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new
TopicPartition("foo", 0));
+ SharePartition sp0 = mock(SharePartition.class);
+
+ when(sp0.canAcquireRecords()).thenReturn(true);
+ when(sp0.nextFetchOffset()).thenReturn(10L);
+
+ LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new
LinkedHashMap<>();
+ sharePartitions.put(tp0, sp0);
+
+ CompletableFuture<Map<TopicIdPartition,
ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>();
+ ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp",
Uuid.randomUuid().toString(),
+ future, List.of(tp0), BATCH_OPTIMIZED, BATCH_SIZE,
MAX_FETCH_RECORDS,
+ BROKER_TOPIC_STATS);
+
+ PendingRemoteFetches pendingRemoteFetches =
mock(PendingRemoteFetches.class);
+ Uuid fetchId = Uuid.randomUuid();
+ DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
+ .withShareFetchData(shareFetch)
+ .withReplicaManager(replicaManager)
+ .withSharePartitions(sharePartitions)
+
.withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM))
+ .withPendingRemoteFetches(pendingRemoteFetches)
+ .withFetchId(fetchId)
+ .build());
+
+ LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new
LinkedHashMap<>();
+ partitionsAcquired.put(tp0, 10L);
+ // Manually update acquired partitions maps.
+ delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
+
+ // Mock remote fetch result.
+ RemoteFetch remoteFetch = mock(RemoteFetch.class);
+ when(remoteFetch.topicIdPartition()).thenReturn(tp0);
+
when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(
+ new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO),
Optional.empty()))
+ );
+ when(remoteFetch.logReadResult()).thenReturn(new LogReadResult(
+ REMOTE_FETCH_INFO,
+ Optional.empty(),
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ -1L,
+ OptionalLong.empty(),
+ OptionalInt.empty(),
+ Errors.NONE
+ ));
+
when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
+ when(pendingRemoteFetches.isDone()).thenReturn(false);
+
+ doAnswer(invocationOnMock ->
null).when(pendingRemoteFetches).invokeCallbackOnCompletion(any());
+ // Mock the behaviour of replica manager such that remote storage
fetch completion timer task completes on adding it to the watch queue.
+ doAnswer(invocationOnMock -> {
+ TimerTask timerTask = invocationOnMock.getArgument(0);
+ timerTask.run();
+ return null;
+ }).when(replicaManager).addShareFetchTimerRequest(any());
+
+ try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils =
Mockito.mockStatic(ShareFetchUtils.class)) {
+ Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap = new LinkedHashMap<>();
+ // 0 bytes fetched data for tp0.
+ ShareFetchResponseData.PartitionData mockedPartitionData0 =
mock(ShareFetchResponseData.PartitionData.class);
+
when(mockedPartitionData0.records()).thenReturn(MemoryRecords.EMPTY);
+ partitionDataMap.put(tp0, mockedPartitionData0);
+ mockedShareFetchUtils.when(() ->
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(),
any())).thenReturn(partitionDataMap);
+
+ assertFalse(delayedShareFetch.isCompleted());
+ delayedShareFetch.forceComplete();
+ assertTrue(delayedShareFetch.isCompleted());
+ // The future of shareFetch completes.
+ assertTrue(shareFetch.isCompleted());
+ assertEquals(Set.of(tp0), future.join().keySet());
+ // Verify the locks are released for tp0 but not added to action
queue since no records are fetched from remote storage.
+ Mockito.verify(delayedShareFetch,
times(1)).releasePartitionLocks(Set.of(tp0));
+ Mockito.verify(replicaManager, times(0)).addToActionQueue(any());
+ assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock());
+ assertTrue(delayedShareFetch.lock().tryLock());
+ delayedShareFetch.lock().unlock();
+ }
+ }
+
static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager
replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1,
minBytes);
LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1,
mock(LogOffsetMetadata.class),
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d4699b5dc09..c6c96f79230 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
@@ -3185,10 +3186,18 @@ public class SharePartitionManagerTest {
}
static Seq<Tuple2<TopicIdPartition, LogReadResult>>
buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
+ return buildLogReadResult(topicIdPartitions, MemoryRecords.withRecords(
+ Compression.NONE, new SimpleRecord("test-key".getBytes(),
"test-value".getBytes())));
+ }
+
+ static Seq<Tuple2<TopicIdPartition, LogReadResult>>
buildEmptyLogReadResult(List<TopicIdPartition> topicIdPartitions) {
+ return buildLogReadResult(topicIdPartitions, MemoryRecords.EMPTY);
+ }
+
+ static Seq<Tuple2<TopicIdPartition, LogReadResult>>
buildLogReadResult(List<TopicIdPartition> topicIdPartitions, Records records) {
List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new
ArrayList<>();
topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new
Tuple2<>(topicIdPartition, new LogReadResult(
- new FetchDataInfo(new LogOffsetMetadata(0, 0, 0),
MemoryRecords.withRecords(
- Compression.NONE, new SimpleRecord("test-key".getBytes(),
"test-value".getBytes()))),
+ new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), records),
Optional.empty(),
-1L,
-1L,