This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 739d9c82562 KAFKA-13499: Avoid restoring outdated records (#21901)
739d9c82562 is described below
commit 739d9c82562d93aa3d49da3853b4402e37224da0
Author: gabriellefu <[email protected]>
AuthorDate: Tue Mar 31 13:42:19 2026 -0400
KAFKA-13499: Avoid restoring outdated records (#21901)
1. Expose the retentionPeriod length to storeMetadata
2. In prepareChangelogs(), switch it from always seektobeginning if
checkpoint doesn't exist to seek to certain timestamp to avoid restoring
outdated records.
Reviewers: TengYao Chi <[email protected]> , Bill Bejeck
<[email protected]>
---
.../processor/internals/ProcessorStateManager.java | 22 ++++
.../processor/internals/StoreChangelogReader.java | 47 +++++++--
...stractDualSchemaRocksDBSegmentedBytesStore.java | 7 +-
.../AbstractRocksDBSegmentedBytesStore.java | 7 +-
.../state/internals/InMemorySessionStore.java | 9 +-
.../state/internals/InMemoryWindowStore.java | 7 +-
.../state/internals/WithRetentionPeriod.java | 21 ++++
.../internals/StoreChangelogReaderTest.java | 112 +++++++++++++++++++++
8 files changed, 218 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 7b3483f8e68..9c715752058 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -36,6 +36,8 @@ import
org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@@ -103,6 +105,8 @@ public class ProcessorStateManager implements StateManager {
// corrupted state store should not be included in checkpointing
private boolean corrupted;
+ private final long retentionPeriod;
+
private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
@@ -113,6 +117,7 @@ public class ProcessorStateManager implements StateManager {
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
+ this.retentionPeriod = -1L;
}
private StateStoreMetadata(final StateStore stateStore,
@@ -130,12 +135,24 @@ public class ProcessorStateManager implements
StateManager {
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
+ this.retentionPeriod = extractRetentionPeriod(stateStore);
}
private void setOffset(final Long offset) {
this.offset = offset;
}
+ private static long extractRetentionPeriod(final StateStore
stateStore) {
+ StateStore current = stateStore;
+ while (current instanceof WrappedStateStore) {
+ current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
+ }
+ if (current instanceof WithRetentionPeriod) {
+ return ((WithRetentionPeriod) current).retentionPeriod();
+ }
+ return -1L;
+ }
+
// the offset is exposed to the changelog reader to determine if
restoration is completed
Long offset() {
return this.offset;
@@ -145,6 +162,11 @@ public class ProcessorStateManager implements StateManager
{
return this.endOffset;
}
+ // the retentionPeriod is exposed to the changelog reader for window
restoration
+ long retentionPeriod() {
+ return retentionPeriod;
+ }
+
public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index eab7da800d8..aa7098400a1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -970,6 +971,7 @@ public class StoreChangelogReader implements
ChangelogReader {
final Set<ChangelogMetadata>
newPartitionsToRestore) {
// separate those who do not have the current offset loaded from
checkpoint
final Set<TopicPartition> newPartitionsWithoutStartOffset = new
HashSet<>();
+ final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new
HashMap<>();
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -986,18 +988,22 @@ public class StoreChangelogReader implements
ChangelogReader {
log.debug("Start restoring changelog partition {} from current
offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
- log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
- "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
-
- newPartitionsWithoutStartOffset.add(partition);
+ final long retentionPeriod = storeMetadata.retentionPeriod();
+ final long seekTimestamp = retentionPeriod > 0 &&
retentionPeriod != Long.MAX_VALUE
+ ? time.milliseconds() - retentionPeriod : -1L;
+ if (seekTimestamp > 0) {
+ newPartitionsWithTimestampSeek.put(partition,
seekTimestamp);
+ log.debug("Start restoring windowed changelog partition {}
from timestamp {} to end offset {}.",
+ partition, seekTimestamp, recordEndOffset(endOffset));
+ } else {
+ log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
+ "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
+ newPartitionsWithoutStartOffset.add(partition);
+ }
}
}
- // optimization: batch all seek-to-beginning offsets in a single
request
- // seek is not a blocking call so there's nothing to
capture
- if (!newPartitionsWithoutStartOffset.isEmpty()) {
- restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
- }
+ seekToTimestampOrBeginning(newPartitionsWithTimestampSeek,
newPartitionsWithoutStartOffset);
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -1039,6 +1045,29 @@ public class StoreChangelogReader implements
ChangelogReader {
}
}
+ private void seekToTimestampOrBeginning(final Map<TopicPartition, Long>
partitionsWithTimestampSeek,
+ final Set<TopicPartition>
partitionsWithoutStartOffset) {
+ // optimization: seek windowed stores by timestamp to skip expired data
+ if (!partitionsWithTimestampSeek.isEmpty()) {
+ final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
+ restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
+ offsetsByTimestamp.forEach((key, value) -> {
+ if (value != null) {
+ restoreConsumer.seek(key, value.offset());
+ } else {
+ // no offset found for the timestamp, fall back to seeking
to the beginning
+ partitionsWithoutStartOffset.add(key);
+ }
+ });
+ }
+
+ // optimization: batch all seek-to-beginning offsets in a single
request
+ // seek is not a blocking call so there's nothing to
capture
+ if (!partitionsWithoutStartOffset.isEmpty()) {
+ restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
+ }
+ }
+
@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs)
{
unregister(revokedChangelogs,
StandbyUpdateListener.SuspendReason.MIGRATED);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index f353e56f7df..9dbb4210aa7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -46,7 +46,7 @@ import java.util.Optional;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore {
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
private final String name;
@@ -74,6 +74,11 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> all() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index c6b57411067..c11e276a19e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -48,7 +48,7 @@ import java.util.Map;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore {
+public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
private final String name;
@@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
this.segments = segments;
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 8bd7185139a..e0cd49b562d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
-public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
+public class InMemorySessionStore implements SessionStore<Bytes, byte[]>,
WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(InMemorySessionStore.class);
@@ -89,6 +89,11 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]> {
this.position = Position.emptyPosition();
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public String name() {
return name;
@@ -590,4 +595,4 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]> {
}
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 8d2228db5c3..6f5b1095f47 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -59,7 +59,7 @@ import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractSt
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
-public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>,
WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryWindowStore.class);
private static final int SEQNUM_SIZE = 4;
@@ -95,6 +95,11 @@ public class InMemoryWindowStore implements
WindowStore<Bytes, byte[]> {
this.position = Position.emptyPosition();
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public String name() {
return name;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
new file mode 100644
index 00000000000..0ffa460e8b1
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public interface WithRetentionPeriod {
+ long retentionPeriod();
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 72954175f5b..e24b5c5c3e8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
@@ -59,6 +60,7 @@ import org.mockito.quality.Strictness;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -1430,6 +1432,116 @@ public class StoreChangelogReaderTest {
}
}
+ @Test
+ public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
+ final long retentionMs = Duration.ofHours(2).toMillis();
+ final long offsetForTimestamp = 42L;
+
+ // Use a MockConsumer subclass that supports offsetsForTimes
+ final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+ @Override
+ public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+ final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
+ timestampsToSearch.forEach((key, value) -> result.put(key, new
OffsetAndTimestamp(offsetForTimestamp, value)));
+ return result;
+ }
+ };
+
+ // Set up mocks - storeMetadata returns null offset (no checkpoint)
and positive retentionPeriod
+ final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
+ final StateStore windowStore = mock(StateStore.class);
+ when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(windowStoreMetadata.store()).thenReturn(windowStore);
+ when(windowStoreMetadata.offset()).thenReturn(null);
+ when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+ when(windowStore.name()).thenReturn(storeName);
+
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+ when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(windowStateManager.taskId()).thenReturn(taskId);
+
+ timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
+
+ reader.register(tp, windowStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The
consumer should be seeked to the offset returned by offsetsForTimes, not to the
beginning");
+ }
+
+ @Test
+ public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes()
{
+ final long retentionMs = Duration.ofHours(2).toMillis();
+
+ // Use a MockConsumer subclass that returns null for offsetsForTimes
+ final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+ @Override
+ public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+ final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
+ timestampsToSearch.forEach((key, value) -> result.put(key,
null));
+ return result;
+ }
+ };
+
+ final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
+ final StateStore windowStore = mock(StateStore.class);
+ when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(windowStoreMetadata.store()).thenReturn(windowStore);
+ when(windowStoreMetadata.offset()).thenReturn(null);
+ when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+ when(windowStore.name()).thenReturn(storeName);
+
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+ when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(windowStateManager.taskId()).thenReturn(taskId);
+
+ timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
+
+ reader.register(tp, windowStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(0L, timestampConsumer.position(tp), "When broker returns
null, should fall back to seeking to the beginning");
+ }
+
+ @Test
+ public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
+ final StateStoreMetadata kvStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager kvStateManager =
mock(ProcessorStateManager.class);
+ final StateStore kvStore = mock(StateStore.class);
+ when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(kvStoreMetadata.store()).thenReturn(kvStore);
+ when(kvStoreMetadata.offset()).thenReturn(null);
+ when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
+ when(kvStore.name()).thenReturn(storeName);
+ when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
+ when(kvStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(kvStateManager.taskId()).thenReturn(taskId);
+
+ consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
consumer, callback, standbyListener);
+
+ reader.register(tp, kvStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(0L, consumer.position(tp), "Non-windowed store should
seek to beginning, not by timestamp");
+ }
+
private void assignPartition(final long messages,
final TopicPartition topicPartition) {
consumer.updatePartitions(