This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6f8b7ba26a KAFKA-20663: Return committed offset from newest segment 
first (#22491)
f6f8b7ba26a is described below

commit f6f8b7ba26ac77363a6e6559efdbc161d216f204
Author: Nick Telford <[email protected]>
AuthorDate: Sat Jun 6 18:46:50 2026 +0100

    KAFKA-20663: Return committed offset from newest segment first (#22491)
    
    ## Summary
    
    `AbstractSegments.committedOffset()` iterates the segment `TreeMap` in
    ascending key order (oldest segment first) and returns the first
    non-null offset it finds. Because inactive segments (past their time
    window) stop receiving data writes, their 16MB data CF write buffer
    never fills and never triggers an atomic flush. Their offset CF is
    updated by every 30-second `maybeCheckpoint()` call, but those writes
    sit in the memtable and are lost on an unclean JVM exit. The SST for an
    inactive segment therefore holds the offset from its last data-driven
    flush, which can be many hours old.
    
    When restoration reads the committed offset on restart, the current code
    returns the stale offset from the oldest segment rather than the current
    offset from the active (most recently flushed) segment. This stale
    offset may fall outside the changelog topic's retention window, causing
    `OffsetOutOfRangeException`.
    
    The fix iterates `segments.descendingMap().values()` so the newest
    segment — whose SST is kept current by active data writes and atomic
    flushes — is checked first. This is the same pattern already used in
    `allSegments(forward=false)`.
    
    ## Test plan
    
    - Existing `AbstractRocksDBSegmentedBytesStoreTest` /
    `RocksDBSegmentedBytesStoreTest` suites pass.
    - Add a test simulating stale SST on an older segment and asserting
    `committedOffset()` returns the newer segment's value.
    
    Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
     <[email protected]>, Eduwer Camacaro <[email protected]>
---
 .../java/org/apache/kafka/streams/state/internals/AbstractSegments.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index f386b24c18f..a46291175e2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -192,7 +192,7 @@ public abstract class AbstractSegments<S extends Segment> 
implements Segments<S>
 
     @Override
     public Long committedOffset(final TopicPartition partition) {
-        for (final S segment : segments.values()) {
+        for (final S segment : segments.descendingMap().values()) {
             final Long offset = segment.committedOffset(partition);
             if (offset != null) {
                 return offset;

Reply via email to