This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f12a9d84137 KAFKA-19464: Remove unnecessary update for find next fetch
offset (#20315)
f12a9d84137 is described below
commit f12a9d84137c31e401ef3dd7959df0b51b6f7ae9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Aug 7 13:11:07 2025 +0100
KAFKA-19464: Remove unnecessary update for find next fetch offset (#20315)
The PR removes unnecessary updates for find next fetch offset. When the
state is in transition and not yet completed then anyways respective
offsets should not be considered for acquisition. The find next fetch
offset is updated finally when transition is completed.
Reviewers: Manikumar Reddy <[email protected]>, Abhinav Dixit
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 33 ++++------------------
1 file changed, 6 insertions(+), 27 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index db9b862839c..b55c990fd4b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -803,7 +803,7 @@ public class SharePartition {
}
InFlightState updateResult =
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
- if (updateResult == null) {
+ if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.info("Unable to acquire records for the batch: {} in
share partition: {}-{}",
inFlightBatch, groupId, topicIdPartition);
continue;
@@ -1009,12 +1009,7 @@ public class SharePartition {
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(),
updateResult.state().id(), (short)
updateResult.deliveryCount()));
-
- // If the maxDeliveryCount limit has been exceeded, the record
will be transitioned to ARCHIVED state.
- // This should not change the next fetch offset because the
record is not available for acquisition
- if (updateResult.state() != RecordState.ARCHIVED) {
- updateFindNextFetchOffset(true);
- }
+ // Do not update the next fetch offset as the offset has not
completed the transition yet.
}
}
return Optional.empty();
@@ -1054,12 +1049,7 @@ public class SharePartition {
updatedStates.add(updateResult);
stateBatches.add(new
PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state().id(), (short)
updateResult.deliveryCount()));
-
- // If the maxDeliveryCount limit has been exceeded, the record
will be transitioned to ARCHIVED state.
- // This should not change the next fetch offset because the record
is not available for acquisition
- if (updateResult.state() != RecordState.ARCHIVED) {
- updateFindNextFetchOffset(true);
- }
+ // Do not update the next fetch offset as the batch has not
completed the transition yet.
}
return Optional.empty();
}
@@ -1641,7 +1631,7 @@ public class SharePartition {
InFlightState updateResult =
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED,
DeliveryCountOps.INCREASE,
maxDeliveryCount, memberId);
- if (updateResult == null) {
+ if (updateResult == null || updateResult.state() !=
RecordState.ACQUIRED) {
log.trace("Unable to acquire records for the offset: {} in
batch: {}"
+ " for the share partition: {}-{}",
offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition);
@@ -1941,12 +1931,7 @@ public class SharePartition {
updatedStates.add(updateResult);
stateBatches.add(new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(),
updateResult.state().id(), (short)
updateResult.deliveryCount()));
- // If the maxDeliveryCount limit has been exceeded, the record
will be transitioned to ARCHIVED state.
- // This should not change the next fetch offset because the
record is not available for acquisition
- if (recordState == RecordState.AVAILABLE
- && updateResult.state() != RecordState.ARCHIVED) {
- updateFindNextFetchOffset(true);
- }
+ // Do not update the nextFetchOffset as the offset has not
completed the transition yet.
}
} finally {
lock.writeLock().unlock();
@@ -1996,13 +1981,7 @@ public class SharePartition {
stateBatches.add(
new PersisterStateBatch(inFlightBatch.firstOffset(),
inFlightBatch.lastOffset(),
updateResult.state().id(), (short)
updateResult.deliveryCount()));
-
- // If the maxDeliveryCount limit has been exceeded, the record
will be transitioned to ARCHIVED state.
- // This should not change the nextFetchOffset because the record
is not available for acquisition
- if (recordState == RecordState.AVAILABLE
- && updateResult.state() != RecordState.ARCHIVED) {
- updateFindNextFetchOffset(true);
- }
+ // Do not update the next fetch offset as the batch has not
completed the transition yet.
} finally {
lock.writeLock().unlock();
}