apoorvmittal10 commented on code in PR #16274:
URL: https://github.com/apache/kafka/pull/16274#discussion_r1636612042
##########
core/src/main/java/kafka/server/SharePartition.java:
##########
@@ -257,11 +333,110 @@ public CompletableFuture<List<AcquiredRecords>> acquire(
FetchPartitionData fetchPartitionData
) {
log.trace("Received acquire request for share partition: {}-{}",
memberId, fetchPartitionData);
+ RecordBatch lastBatch =
fetchPartitionData.records.lastBatch().orElse(null);
+ if (lastBatch == null) {
+ // Nothing to acquire.
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
- CompletableFuture<List<AcquiredRecords>> future = new
CompletableFuture<>();
- future.completeExceptionally(new UnsupportedOperationException("Not
implemented"));
+ // We require the first batch of records to get the base offset. Stop
parsing further
+ // batches.
+ RecordBatch firstBatch =
fetchPartitionData.records.batches().iterator().next();
+ lock.writeLock().lock();
+ try {
+ long baseOffset = firstBatch.baseOffset();
+ // Find the floor batch record for the request batch. The request
batch could be
+ // for a subset of the in-flight batch i.e. cached batch of offset
10-14 and request batch
+ // of 12-13. Hence, floor entry is fetched to find the sub-map.
+ Map.Entry<Long, InFlightBatch> floorOffset =
cachedState.floorEntry(baseOffset);
+ // We might find a batch with floor entry but not necessarily that
batch has an overlap,
+ // if the request batch base offset is ahead of last offset from
floor entry i.e. cached
+ // batch of 10-14 and request batch of 15-18, though floor entry
is found but no overlap.
+ if (floorOffset != null && floorOffset.getValue().lastOffset >=
baseOffset) {
+ baseOffset = floorOffset.getKey();
+ }
+ // Validate if the fetch records are already part of existing
batches and if available.
+ NavigableMap<Long, InFlightBatch> subMap =
cachedState.subMap(baseOffset, true, lastBatch.lastOffset(), true);
+ // No overlap with request offsets in the cache for in-flight
records. Acquire the complete
+ // batch.
+ if (subMap.isEmpty()) {
+ log.trace("No cached data exists for the share partition for
requested fetch batch: {}-{}",
+ groupId, topicIdPartition);
+ return
CompletableFuture.completedFuture(Collections.singletonList(
+ acquireNewBatchRecords(memberId, firstBatch.baseOffset(),
lastBatch.lastOffset())));
+ }
- return future;
+ log.trace("Overlap exists with in-flight records. Acquire the
records if available for"
+ + " the share group: {}-{}", groupId, topicIdPartition);
+ List<AcquiredRecords> result = new ArrayList<>();
+ // The fetched records are already part of the in-flight records.
The records might
+ // be available for re-delivery hence try acquiring same. The
request batches could
+ // be an exact match, subset or span over multiple already fetched
batches.
+ for (Map.Entry<Long, InFlightBatch> entry : subMap.entrySet()) {
+ InFlightBatch inFlightBatch = entry.getValue();
+ // Compute if the batch is a full match.
+ boolean fullMatch = checkForFullMatch(inFlightBatch,
firstBatch.baseOffset(), lastBatch.lastOffset());
+
+ if (!fullMatch || inFlightBatch.offsetState != null) {
+ log.trace("Subset or offset tracked batch record found for
share partition,"
+ + " batch: {} request offsets - first: {}, last:
{} for the share"
+ + " group: {}-{}", inFlightBatch,
firstBatch.baseOffset(),
+ lastBatch.lastOffset(), groupId, topicIdPartition);
+ if (inFlightBatch.offsetState == null) {
+ // Though the request is a subset of in-flight batch
but the offset
+ // tracking has not been initialized yet which means
that we could only
+ // acquire subset of offsets from the in-flight batch
but only if the
+ // complete batch is available yet. Hence, do a
pre-check to avoid exploding
+ // the in-flight offset tracking unnecessarily.
+ if (inFlightBatch.batchState() !=
RecordState.AVAILABLE) {
+ log.trace("The batch is not available to acquire
in share group: {}-{}, skipping: {}"
+ + " skipping offset tracking for batch as
well.", groupId,
+ topicIdPartition, inFlightBatch);
+ continue;
+ }
+ // The request batch is a subset or per offset state
is managed hence update
+ // the offsets state in the in-flight batch.
+ inFlightBatch.maybeInitializeOffsetStateUpdate();
+ }
+ acquireSubsetBatchRecords(memberId,
firstBatch.baseOffset(), lastBatch.lastOffset(), inFlightBatch, result);
+ continue;
+ }
+
+ // The in-flight batch is a full match hence change the state
of the complete.
Review Comment:
Missed it, thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]