apoorvmittal10 commented on code in PR #20253:
URL: https://github.com/apache/kafka/pull/20253#discussion_r2237892481
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1449,6 +1461,46 @@ private void maybeUpdateReadGapFetchOffset(long offset) {
}
}
+ private LastOffsetAndMaxRecords lastOffsetAndMaxRecordsToAcquire(long
fetchOffset, int maxFetchRecords, long lastOffset) {
+ // There can always be records fetched exceeding the max in-flight
messages limit. Hence,
+ // we need to check if the share partition has reached the max
in-flight messages limit
+ // and only acquire limited records.
+ int maxRecordsToAcquire;
+ long lastOffsetToAcquire = lastOffset;
+ lock.readLock().lock();
+ try {
+ int inFlightRecordsCount = numInFlightRecords();
+ // Take minimum of maxFetchRecords and remaining capacity to fill
max in-flight messages limit.
+ maxRecordsToAcquire = Math.min(maxFetchRecords,
maxInFlightMessages - inFlightRecordsCount);
Review Comment:
I have noted this. I wll send a separate patch for this. As I have to change
a lot of comments and it will be a big refactor. I am avoiding it in this PR,
else the main change of the PR will be hard to determine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]