This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 32536a3a8bc KAFKA-20663: Return committed offset from newest segment
first (#22491)
32536a3a8bc is described below
commit 32536a3a8bcf80627258f409d98ad79ff9da330c
Author: Nick Telford <[email protected]>
AuthorDate: Sat Jun 6 18:46:50 2026 +0100
KAFKA-20663: Return committed offset from newest segment first (#22491)
## Summary
`AbstractSegments.committedOffset()` iterates the segment `TreeMap` in
ascending key order (oldest segment first) and returns the first
non-null offset it finds. Because inactive segments (past their time
window) stop receiving data writes, their 16MB data CF write buffer
never fills and never triggers an atomic flush. Their offset CF is
updated by every 30-second `maybeCheckpoint()` call, but those writes
sit in the memtable and are lost on an unclean JVM exit. The SST for an
inactive segment therefore holds the offset from its last data-driven
flush, which can be many hours old.
When restoration reads the committed offset on restart, the current code
returns the stale offset from the oldest segment rather than the current
offset from the active (most recently flushed) segment. This stale
offset may fall outside the changelog topic's retention window, causing
`OffsetOutOfRangeException`.
The fix iterates `segments.descendingMap().values()` so the newest
segment — whose SST is kept current by active data writes and atomic
flushes — is checked first. This is the same pattern already used in
`allSegments(forward=false)`.
## Test plan
- Existing `AbstractRocksDBSegmentedBytesStoreTest` /
`RocksDBSegmentedBytesStoreTest` suites pass.
- Add a test simulating stale SST on an older segment and asserting
`committedOffset()` returns the newer segment's value.
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>, Eduwer Camacaro <[email protected]>
---
.../java/org/apache/kafka/streams/state/internals/AbstractSegments.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index f386b24c18f..a46291175e2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -192,7 +192,7 @@ public abstract class AbstractSegments<S extends Segment>
implements Segments<S>
@Override
public Long committedOffset(final TopicPartition partition) {
- for (final S segment : segments.values()) {
+ for (final S segment : segments.descendingMap().values()) {
final Long offset = segment.committedOffset(partition);
if (offset != null) {
return offset;