This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f12a9d84137 KAFKA-19464: Remove unnecessary update for find next fetch 
offset (#20315)
f12a9d84137 is described below

commit f12a9d84137c31e401ef3dd7959df0b51b6f7ae9
Author: Apoorv Mittal <[email protected]>
AuthorDate: Thu Aug 7 13:11:07 2025 +0100

    KAFKA-19464: Remove unnecessary update for find next fetch offset (#20315)
    
    The PR removes unnecessary updates for find next fetch offset. When the
    state is in transition and not yet completed then anyways respective
    offsets should not be considered for acquisition. The find next fetch
    offset is updated finally when transition is completed.
    
    Reviewers: Manikumar Reddy <[email protected]>, Abhinav Dixit
    <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 33 ++++------------------
 1 file changed, 6 insertions(+), 27 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index db9b862839c..b55c990fd4b 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -803,7 +803,7 @@ public class SharePartition {
                 }
 
                 InFlightState updateResult = 
inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
-                if (updateResult == null) {
+                if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.info("Unable to acquire records for the batch: {} in 
share partition: {}-{}",
                         inFlightBatch, groupId, topicIdPartition);
                     continue;
@@ -1009,12 +1009,7 @@ public class SharePartition {
                 updatedStates.add(updateResult);
                 stateBatches.add(new PersisterStateBatch(offsetState.getKey(), 
offsetState.getKey(),
                         updateResult.state().id(), (short) 
updateResult.deliveryCount()));
-
-                // If the maxDeliveryCount limit has been exceeded, the record 
will be transitioned to ARCHIVED state.
-                // This should not change the next fetch offset because the 
record is not available for acquisition
-                if (updateResult.state() != RecordState.ARCHIVED) {
-                    updateFindNextFetchOffset(true);
-                }
+                // Do not update the next fetch offset as the offset has not 
completed the transition yet.
             }
         }
         return Optional.empty();
@@ -1054,12 +1049,7 @@ public class SharePartition {
             updatedStates.add(updateResult);
             stateBatches.add(new 
PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
                     updateResult.state().id(), (short) 
updateResult.deliveryCount()));
-
-            // If the maxDeliveryCount limit has been exceeded, the record 
will be transitioned to ARCHIVED state.
-            // This should not change the next fetch offset because the record 
is not available for acquisition
-            if (updateResult.state() != RecordState.ARCHIVED) {
-                updateFindNextFetchOffset(true);
-            }
+            // Do not update the next fetch offset as the batch has not 
completed the transition yet.
         }
         return Optional.empty();
     }
@@ -1641,7 +1631,7 @@ public class SharePartition {
 
                 InFlightState updateResult =  
offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, 
DeliveryCountOps.INCREASE,
                     maxDeliveryCount, memberId);
-                if (updateResult == null) {
+                if (updateResult == null || updateResult.state() != 
RecordState.ACQUIRED) {
                     log.trace("Unable to acquire records for the offset: {} in 
batch: {}"
                             + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
                         groupId, topicIdPartition);
@@ -1941,12 +1931,7 @@ public class SharePartition {
                 updatedStates.add(updateResult);
                 stateBatches.add(new PersisterStateBatch(offsetState.getKey(), 
offsetState.getKey(),
                     updateResult.state().id(), (short) 
updateResult.deliveryCount()));
-                // If the maxDeliveryCount limit has been exceeded, the record 
will be transitioned to ARCHIVED state.
-                // This should not change the next fetch offset because the 
record is not available for acquisition
-                if (recordState == RecordState.AVAILABLE
-                    && updateResult.state() != RecordState.ARCHIVED) {
-                    updateFindNextFetchOffset(true);
-                }
+                // Do not update the nextFetchOffset as the offset has not 
completed the transition yet.
             }
         } finally {
             lock.writeLock().unlock();
@@ -1996,13 +1981,7 @@ public class SharePartition {
             stateBatches.add(
                 new PersisterStateBatch(inFlightBatch.firstOffset(), 
inFlightBatch.lastOffset(),
                     updateResult.state().id(), (short) 
updateResult.deliveryCount()));
-
-            // If the maxDeliveryCount limit has been exceeded, the record 
will be transitioned to ARCHIVED state.
-            // This should not change the nextFetchOffset because the record 
is not available for acquisition
-            if (recordState == RecordState.AVAILABLE
-                && updateResult.state() != RecordState.ARCHIVED) {
-                updateFindNextFetchOffset(true);
-            }
+            // Do not update the next fetch offset as the batch has not 
completed the transition yet.
         } finally {
             lock.writeLock().unlock();
         }

Reply via email to