This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 5b7eafbbe6e Revert "KAFKA-13499: Avoid restoring outdated records
(#21901)"
5b7eafbbe6e is described below
commit 5b7eafbbe6e892dc65098d9aba6ffed031ee1c5c
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 22:38:08 2026 -0700
Revert "KAFKA-13499: Avoid restoring outdated records (#21901)"
This reverts commit 739d9c82562d93aa3d49da3853b4402e37224da0.
---
.../processor/internals/ProcessorStateManager.java | 22 ----
.../processor/internals/StoreChangelogReader.java | 47 ++-------
...stractDualSchemaRocksDBSegmentedBytesStore.java | 7 +-
.../AbstractRocksDBSegmentedBytesStore.java | 7 +-
.../state/internals/InMemorySessionStore.java | 9 +-
.../state/internals/InMemoryWindowStore.java | 7 +-
.../state/internals/WithRetentionPeriod.java | 21 ----
.../internals/StoreChangelogReaderTest.java | 112 ---------------------
8 files changed, 14 insertions(+), 218 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 9c715752058..7b3483f8e68 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -36,8 +36,6 @@ import
org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
-import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@@ -105,8 +103,6 @@ public class ProcessorStateManager implements StateManager {
// corrupted state store should not be included in checkpointing
private boolean corrupted;
- private final long retentionPeriod;
-
private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
@@ -117,7 +113,6 @@ public class ProcessorStateManager implements StateManager {
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
- this.retentionPeriod = -1L;
}
private StateStoreMetadata(final StateStore stateStore,
@@ -135,24 +130,12 @@ public class ProcessorStateManager implements
StateManager {
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
- this.retentionPeriod = extractRetentionPeriod(stateStore);
}
private void setOffset(final Long offset) {
this.offset = offset;
}
- private static long extractRetentionPeriod(final StateStore
stateStore) {
- StateStore current = stateStore;
- while (current instanceof WrappedStateStore) {
- current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
- }
- if (current instanceof WithRetentionPeriod) {
- return ((WithRetentionPeriod) current).retentionPeriod();
- }
- return -1L;
- }
-
// the offset is exposed to the changelog reader to determine if
restoration is completed
Long offset() {
return this.offset;
@@ -162,11 +145,6 @@ public class ProcessorStateManager implements StateManager
{
return this.endOffset;
}
- // the retentionPeriod is exposed to the changelog reader for window
restoration
- long retentionPeriod() {
- return retentionPeriod;
- }
-
public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index aa7098400a1..eab7da800d8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -971,7 +970,6 @@ public class StoreChangelogReader implements
ChangelogReader {
final Set<ChangelogMetadata>
newPartitionsToRestore) {
// separate those who do not have the current offset loaded from
checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new
HashSet<>();
- final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new
HashMap<>();
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -988,22 +986,18 @@ public class StoreChangelogReader implements
ChangelogReader {
log.debug("Start restoring changelog partition {} from current
offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
- final long retentionPeriod = storeMetadata.retentionPeriod();
- final long seekTimestamp = retentionPeriod > 0 &&
retentionPeriod != Long.MAX_VALUE
- ? time.milliseconds() - retentionPeriod : -1L;
- if (seekTimestamp > 0) {
- newPartitionsWithTimestampSeek.put(partition,
seekTimestamp);
- log.debug("Start restoring windowed changelog partition {}
from timestamp {} to end offset {}.",
- partition, seekTimestamp, recordEndOffset(endOffset));
- } else {
- log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
- "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
- newPartitionsWithoutStartOffset.add(partition);
- }
+ log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
+ "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
+
+ newPartitionsWithoutStartOffset.add(partition);
}
}
- seekToTimestampOrBeginning(newPartitionsWithTimestampSeek,
newPartitionsWithoutStartOffset);
+ // optimization: batch all seek-to-beginning offsets in a single
request
+ // seek is not a blocking call so there's nothing to
capture
+ if (!newPartitionsWithoutStartOffset.isEmpty()) {
+ restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
+ }
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -1045,29 +1039,6 @@ public class StoreChangelogReader implements
ChangelogReader {
}
}
- private void seekToTimestampOrBeginning(final Map<TopicPartition, Long>
partitionsWithTimestampSeek,
- final Set<TopicPartition>
partitionsWithoutStartOffset) {
- // optimization: seek windowed stores by timestamp to skip expired data
- if (!partitionsWithTimestampSeek.isEmpty()) {
- final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
- restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
- offsetsByTimestamp.forEach((key, value) -> {
- if (value != null) {
- restoreConsumer.seek(key, value.offset());
- } else {
- // no offset found for the timestamp, fall back to seeking
to the beginning
- partitionsWithoutStartOffset.add(key);
- }
- });
- }
-
- // optimization: batch all seek-to-beginning offsets in a single
request
- // seek is not a blocking call so there's nothing to
capture
- if (!partitionsWithoutStartOffset.isEmpty()) {
- restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
- }
- }
-
@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs)
{
unregister(revokedChangelogs,
StandbyUpdateListener.SuspendReason.MIGRATED);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index 9dbb4210aa7..f353e56f7df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -46,7 +46,7 @@ import java.util.Optional;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore, WithRetentionPeriod {
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
private final String name;
@@ -74,11 +74,6 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}
- @Override
- public long retentionPeriod() {
- return retentionPeriod;
- }
-
@Override
public KeyValueIterator<Bytes, byte[]> all() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index c11e276a19e..c6b57411067 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -48,7 +48,7 @@ import java.util.Map;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore, WithRetentionPeriod {
+public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
private final String name;
@@ -73,11 +73,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
this.segments = segments;
}
- @Override
- public long retentionPeriod() {
- return retentionPeriod;
- }
-
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 78b161a90f5..9d3936d0eb0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
-public class InMemorySessionStore implements SessionStore<Bytes, byte[]>,
WithRetentionPeriod {
+public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(InMemorySessionStore.class);
@@ -91,11 +91,6 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]>, WithRe
this.position = Position.emptyPosition();
}
- @Override
- public long retentionPeriod() {
- return retentionPeriod;
- }
-
@Override
public String name() {
return name;
@@ -597,4 +592,4 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]>, WithRe
}
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 6f5b1095f47..8d2228db5c3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -59,7 +59,7 @@ import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractSt
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
-public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>,
WithRetentionPeriod {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryWindowStore.class);
private static final int SEQNUM_SIZE = 4;
@@ -95,11 +95,6 @@ public class InMemoryWindowStore implements
WindowStore<Bytes, byte[]>, WithRete
this.position = Position.emptyPosition();
}
- @Override
- public long retentionPeriod() {
- return retentionPeriod;
- }
-
@Override
public String name() {
return name;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
deleted file mode 100644
index 0ffa460e8b1..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-public interface WithRetentionPeriod {
- long retentionPeriod();
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e24b5c5c3e8..72954175f5b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
@@ -60,7 +59,6 @@ import org.mockito.quality.Strictness;
import java.time.Duration;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -1432,116 +1430,6 @@ public class StoreChangelogReaderTest {
}
}
- @Test
- public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
- final long retentionMs = Duration.ofHours(2).toMillis();
- final long offsetForTimestamp = 42L;
-
- // Use a MockConsumer subclass that supports offsetsForTimes
- final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
- @Override
- public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
- final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
- timestampsToSearch.forEach((key, value) -> result.put(key, new
OffsetAndTimestamp(offsetForTimestamp, value)));
- return result;
- }
- };
-
- // Set up mocks - storeMetadata returns null offset (no checkpoint)
and positive retentionPeriod
- final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
- final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
- final StateStore windowStore = mock(StateStore.class);
- when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
- when(windowStoreMetadata.store()).thenReturn(windowStore);
- when(windowStoreMetadata.offset()).thenReturn(null);
- when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
- when(windowStore.name()).thenReturn(storeName);
-
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
- when(windowStateManager.taskType()).thenReturn(ACTIVE);
-
- final TaskId taskId = new TaskId(0, 0);
- when(windowStateManager.taskId()).thenReturn(taskId);
-
- timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
- adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
- final StoreChangelogReader reader =
- new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
-
- reader.register(tp, windowStateManager);
- reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
- assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The
consumer should be seeked to the offset returned by offsetsForTimes, not to the
beginning");
- }
-
- @Test
- public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes()
{
- final long retentionMs = Duration.ofHours(2).toMillis();
-
- // Use a MockConsumer subclass that returns null for offsetsForTimes
- final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
- @Override
- public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
- final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
- timestampsToSearch.forEach((key, value) -> result.put(key,
null));
- return result;
- }
- };
-
- final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
- final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
- final StateStore windowStore = mock(StateStore.class);
- when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
- when(windowStoreMetadata.store()).thenReturn(windowStore);
- when(windowStoreMetadata.offset()).thenReturn(null);
- when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
- when(windowStore.name()).thenReturn(storeName);
-
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
- when(windowStateManager.taskType()).thenReturn(ACTIVE);
-
- final TaskId taskId = new TaskId(0, 0);
- when(windowStateManager.taskId()).thenReturn(taskId);
-
- timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
- adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
- final StoreChangelogReader reader =
- new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
-
- reader.register(tp, windowStateManager);
- reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
- assertEquals(0L, timestampConsumer.position(tp), "When broker returns
null, should fall back to seeking to the beginning");
- }
-
- @Test
- public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
- final StateStoreMetadata kvStoreMetadata =
mock(StateStoreMetadata.class);
- final ProcessorStateManager kvStateManager =
mock(ProcessorStateManager.class);
- final StateStore kvStore = mock(StateStore.class);
- when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
- when(kvStoreMetadata.store()).thenReturn(kvStore);
- when(kvStoreMetadata.offset()).thenReturn(null);
- when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
- when(kvStore.name()).thenReturn(storeName);
- when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
- when(kvStateManager.taskType()).thenReturn(ACTIVE);
-
- final TaskId taskId = new TaskId(0, 0);
- when(kvStateManager.taskId()).thenReturn(taskId);
-
- consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
- adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
-
- final StoreChangelogReader reader =
- new StoreChangelogReader(time, config, logContext, adminClient,
consumer, callback, standbyListener);
-
- reader.register(tp, kvStateManager);
- reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
-
- assertEquals(0L, consumer.position(tp), "Non-windowed store should
seek to beginning, not by timestamp");
- }
-
private void assignPartition(final long messages,
final TopicPartition topicPartition) {
consumer.updatePartitions(