This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 5b7eafbbe6e Revert "KAFKA-13499: Avoid restoring outdated records 
(#21901)"
5b7eafbbe6e is described below

commit 5b7eafbbe6e892dc65098d9aba6ffed031ee1c5c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 22:38:08 2026 -0700

    Revert "KAFKA-13499: Avoid restoring outdated records (#21901)"
    
    This reverts commit 739d9c82562d93aa3d49da3853b4402e37224da0.
---
 .../processor/internals/ProcessorStateManager.java |  22 ----
 .../processor/internals/StoreChangelogReader.java  |  47 ++-------
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |   7 +-
 .../AbstractRocksDBSegmentedBytesStore.java        |   7 +-
 .../state/internals/InMemorySessionStore.java      |   9 +-
 .../state/internals/InMemoryWindowStore.java       |   7 +-
 .../state/internals/WithRetentionPeriod.java       |  21 ----
 .../internals/StoreChangelogReaderTest.java        | 112 ---------------------
 8 files changed, 14 insertions(+), 218 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 9c715752058..7b3483f8e68 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -36,8 +36,6 @@ import 
org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
-import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import org.slf4j.Logger;
 
@@ -105,8 +103,6 @@ public class ProcessorStateManager implements StateManager {
         // corrupted state store should not be included in checkpointing
         private boolean corrupted;
 
-        private final long retentionPeriod;
-
 
         private StateStoreMetadata(final StateStore stateStore,
                                    final CommitCallback commitCallback) {
@@ -117,7 +113,6 @@ public class ProcessorStateManager implements StateManager {
             this.changelogPartition = null;
             this.corrupted = false;
             this.offset = null;
-            this.retentionPeriod = -1L;
         }
 
         private StateStoreMetadata(final StateStore stateStore,
@@ -135,24 +130,12 @@ public class ProcessorStateManager implements 
StateManager {
             this.commitCallback = commitCallback;
             this.recordConverter = recordConverter;
             this.offset = null;
-            this.retentionPeriod = extractRetentionPeriod(stateStore);
         }
 
         private void setOffset(final Long offset) {
             this.offset = offset;
         }
 
-        private static long extractRetentionPeriod(final StateStore 
stateStore) {
-            StateStore current = stateStore;
-            while (current instanceof WrappedStateStore) {
-                current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
-            }
-            if (current instanceof WithRetentionPeriod) {
-                return ((WithRetentionPeriod) current).retentionPeriod();
-            }
-            return -1L;
-        }
-
         // the offset is exposed to the changelog reader to determine if 
restoration is completed
         Long offset() {
             return this.offset;
@@ -162,11 +145,6 @@ public class ProcessorStateManager implements StateManager 
{
             return this.endOffset;
         }
 
-        // the retentionPeriod is exposed to the changelog reader for window 
restoration
-        long retentionPeriod() {
-            return retentionPeriod;
-        }
-
         public void setEndOffset(final Long endOffset) {
             this.endOffset = endOffset;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index aa7098400a1..eab7da800d8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -971,7 +970,6 @@ public class StoreChangelogReader implements 
ChangelogReader {
                                    final Set<ChangelogMetadata> 
newPartitionsToRestore) {
         // separate those who do not have the current offset loaded from 
checkpoint
         final Set<TopicPartition> newPartitionsWithoutStartOffset = new 
HashSet<>();
-        final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new 
HashMap<>();
 
         for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
             final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
@@ -988,22 +986,18 @@ public class StoreChangelogReader implements 
ChangelogReader {
                 log.debug("Start restoring changelog partition {} from current 
offset {} to end offset {}.",
                     partition, currentOffset, recordEndOffset(endOffset));
             } else {
-                final long retentionPeriod = storeMetadata.retentionPeriod();
-                final long seekTimestamp = retentionPeriod > 0 && 
retentionPeriod != Long.MAX_VALUE
-                    ? time.milliseconds() - retentionPeriod : -1L;
-                if (seekTimestamp > 0) {
-                    newPartitionsWithTimestampSeek.put(partition, 
seekTimestamp);
-                    log.debug("Start restoring windowed changelog partition {} 
from timestamp {} to end offset {}.",
-                        partition, seekTimestamp, recordEndOffset(endOffset));
-                } else {
-                    log.debug("Start restoring changelog partition {} from the 
beginning offset to end offset {} " +
-                        "since we cannot find current offset.", partition, 
recordEndOffset(endOffset));
-                    newPartitionsWithoutStartOffset.add(partition);
-                }
+                log.debug("Start restoring changelog partition {} from the 
beginning offset to end offset {} " +
+                    "since we cannot find current offset.", partition, 
recordEndOffset(endOffset));
+
+                newPartitionsWithoutStartOffset.add(partition);
             }
         }
 
-        seekToTimestampOrBeginning(newPartitionsWithTimestampSeek, 
newPartitionsWithoutStartOffset);
+        // optimization: batch all seek-to-beginning offsets in a single 
request
+        //               seek is not a blocking call so there's nothing to 
capture
+        if (!newPartitionsWithoutStartOffset.isEmpty()) {
+            restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
+        }
 
         for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
             final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
@@ -1045,29 +1039,6 @@ public class StoreChangelogReader implements 
ChangelogReader {
         }
     }
 
-    private void seekToTimestampOrBeginning(final Map<TopicPartition, Long> 
partitionsWithTimestampSeek,
-                                            final Set<TopicPartition> 
partitionsWithoutStartOffset) {
-        // optimization: seek windowed stores by timestamp to skip expired data
-        if (!partitionsWithTimestampSeek.isEmpty()) {
-            final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
-                restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
-            offsetsByTimestamp.forEach((key, value) -> {
-                if (value != null) {
-                    restoreConsumer.seek(key, value.offset());
-                } else {
-                    // no offset found for the timestamp, fall back to seeking 
to the beginning
-                    partitionsWithoutStartOffset.add(key);
-                }
-            });
-        }
-
-        // optimization: batch all seek-to-beginning offsets in a single 
request
-        //               seek is not a blocking call so there's nothing to 
capture
-        if (!partitionsWithoutStartOffset.isEmpty()) {
-            restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
-        }
-    }
-
     @Override
     public void unregister(final Collection<TopicPartition> revokedChangelogs) 
{
         unregister(revokedChangelogs, 
StandbyUpdateListener.SuspendReason.MIGRATED);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 9dbb4210aa7..f353e56f7df 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -46,7 +46,7 @@ import java.util.Optional;
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 
-public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends 
Segment> implements SegmentedBytesStore, WithRetentionPeriod {
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends 
Segment> implements SegmentedBytesStore {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
 
     private final String name;
@@ -74,11 +74,6 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
         this.retentionPeriod = retentionPeriod;
     }
 
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index c11e276a19e..c6b57411067 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -48,7 +48,7 @@ import java.util.Map;
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 
-public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements 
SegmentedBytesStore, WithRetentionPeriod {
+public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements 
SegmentedBytesStore {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
 
     private final String name;
@@ -73,11 +73,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
         this.segments = segments;
     }
 
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                                  final long from,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 78b161a90f5..9d3936d0eb0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 
-public class InMemorySessionStore implements SessionStore<Bytes, byte[]>, 
WithRetentionPeriod {
+public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySessionStore.class);
 
@@ -91,11 +91,6 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]>, WithRe
         this.position = Position.emptyPosition();
     }
 
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
     @Override
     public String name() {
         return name;
@@ -597,4 +592,4 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]>, WithRe
         }
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 6f5b1095f47..8d2228db5c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -59,7 +59,7 @@ import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.extractSt
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
 
 
-public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>, 
WithRetentionPeriod {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryWindowStore.class);
     private static final int SEQNUM_SIZE = 4;
@@ -95,11 +95,6 @@ public class InMemoryWindowStore implements 
WindowStore<Bytes, byte[]>, WithRete
         this.position = Position.emptyPosition();
     }
 
-    @Override
-    public long retentionPeriod() {
-        return retentionPeriod;
-    }
-
     @Override
     public String name() {
         return name;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
deleted file mode 100644
index 0ffa460e8b1..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-public interface WithRetentionPeriod {
-    long retentionPeriod();
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e24b5c5c3e8..72954175f5b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -60,7 +59,6 @@ import org.mockito.quality.Strictness;
 
 import java.time.Duration;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -1432,116 +1430,6 @@ public class StoreChangelogReaderTest {
         }
     }
 
-    @Test
-    public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
-        final long retentionMs = Duration.ofHours(2).toMillis();
-        final long offsetForTimestamp = 42L;
-
-        // Use a MockConsumer subclass that supports offsetsForTimes
-        final MockConsumer<byte[], byte[]> timestampConsumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
-            @Override
-            public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
-                final Map<TopicPartition, OffsetAndTimestamp> result = new 
HashMap<>();
-                timestampsToSearch.forEach((key, value) -> result.put(key, new 
OffsetAndTimestamp(offsetForTimestamp, value)));
-                return result;
-            }
-        };
-
-        // Set up mocks - storeMetadata returns null offset (no checkpoint) 
and positive retentionPeriod
-        final StateStoreMetadata windowStoreMetadata = 
mock(StateStoreMetadata.class);
-        final ProcessorStateManager windowStateManager = 
mock(ProcessorStateManager.class);
-        final StateStore windowStore = mock(StateStore.class);
-        when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
-        when(windowStoreMetadata.store()).thenReturn(windowStore);
-        when(windowStoreMetadata.offset()).thenReturn(null);
-        when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
-        when(windowStore.name()).thenReturn(storeName);
-        
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
-        when(windowStateManager.taskType()).thenReturn(ACTIVE);
-
-        final TaskId taskId = new TaskId(0, 0);
-        when(windowStateManager.taskId()).thenReturn(taskId);
-
-        timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 
0L));
-        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
-        final StoreChangelogReader reader =
-            new StoreChangelogReader(time, config, logContext, adminClient, 
timestampConsumer, callback, standbyListener);
-
-        reader.register(tp, windowStateManager);
-        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
-        assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The 
consumer should be seeked to the offset returned by offsetsForTimes, not to the 
beginning");
-    }
-
-    @Test
-    public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes() 
{
-        final long retentionMs = Duration.ofHours(2).toMillis();
-
-        // Use a MockConsumer subclass that returns null for offsetsForTimes
-        final MockConsumer<byte[], byte[]> timestampConsumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
-            @Override
-            public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
-                final Map<TopicPartition, OffsetAndTimestamp> result = new 
HashMap<>();
-                timestampsToSearch.forEach((key, value) -> result.put(key, 
null));
-                return result;
-            }
-        };
-
-        final StateStoreMetadata windowStoreMetadata = 
mock(StateStoreMetadata.class);
-        final ProcessorStateManager windowStateManager = 
mock(ProcessorStateManager.class);
-        final StateStore windowStore = mock(StateStore.class);
-        when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
-        when(windowStoreMetadata.store()).thenReturn(windowStore);
-        when(windowStoreMetadata.offset()).thenReturn(null);
-        when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
-        when(windowStore.name()).thenReturn(storeName);
-        
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
-        when(windowStateManager.taskType()).thenReturn(ACTIVE);
-
-        final TaskId taskId = new TaskId(0, 0);
-        when(windowStateManager.taskId()).thenReturn(taskId);
-
-        timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 
0L));
-        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
-        final StoreChangelogReader reader =
-            new StoreChangelogReader(time, config, logContext, adminClient, 
timestampConsumer, callback, standbyListener);
-
-        reader.register(tp, windowStateManager);
-        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
-        assertEquals(0L, timestampConsumer.position(tp), "When broker returns 
null, should fall back to seeking to the beginning");
-    }
-
-    @Test
-    public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
-        final StateStoreMetadata kvStoreMetadata = 
mock(StateStoreMetadata.class);
-        final ProcessorStateManager kvStateManager = 
mock(ProcessorStateManager.class);
-        final StateStore kvStore = mock(StateStore.class);
-        when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
-        when(kvStoreMetadata.store()).thenReturn(kvStore);
-        when(kvStoreMetadata.offset()).thenReturn(null);
-        when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
-        when(kvStore.name()).thenReturn(storeName);
-        when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
-        when(kvStateManager.taskType()).thenReturn(ACTIVE);
-
-        final TaskId taskId = new TaskId(0, 0);
-        when(kvStateManager.taskId()).thenReturn(taskId);
-
-        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
-        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
-        final StoreChangelogReader reader =
-            new StoreChangelogReader(time, config, logContext, adminClient, 
consumer, callback, standbyListener);
-
-        reader.register(tp, kvStateManager);
-        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
-        assertEquals(0L, consumer.position(tp), "Non-windowed store should 
seek to beginning, not by timestamp");
-    }
-
     private void assignPartition(final long messages,
                                  final TopicPartition topicPartition) {
         consumer.updatePartitions(

Reply via email to