This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94b6886b12b KAFKA-13499: Avoid restoring outdated records (#22115)
94b6886b12b is described below
commit 94b6886b12bf050ee4075584293355923e0223ac
Author: gabriellefu <[email protected]>
AuthorDate: Mon May 4 13:14:31 2026 -0400
KAFKA-13499: Avoid restoring outdated records (#22115)
1. Expose the retentionPeriod length to storeMetadata
2. In prepareChangelogs(), switch it from always seektobeginning if
checkpoint doesn't exist to seek to certain timestamp to avoid restoring
outdated records.
3. Change from [the ](https://github.com/apache/kafka/pull/21901):
Instead of the wall clock, use the latest timestamp in the changelog as
the latest time, and seek from the timestamp of
latest_changelog_stamp_time-rention_period.
Reviewers: TengYao Chi <[email protected]>, Bill Bejeck
<[email protected]>
---
.../processor/internals/ProcessorStateManager.java | 22 ++++
.../processor/internals/StoreChangelogReader.java | 115 ++++++++++++++++--
...stractDualSchemaRocksDBSegmentedBytesStore.java | 7 +-
.../AbstractRocksDBSegmentedBytesStore.java | 7 +-
.../state/internals/InMemorySessionStore.java | 7 +-
.../state/internals/InMemoryWindowStore.java | 7 +-
.../state/internals/WithRetentionPeriod.java | 21 ++++
.../internals/StoreChangelogReaderTest.java | 132 +++++++++++++++++++++
8 files changed, 304 insertions(+), 14 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index ab0a9806c45..f25d5268751 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -36,6 +36,8 @@ import
org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@@ -103,6 +105,8 @@ public class ProcessorStateManager implements StateManager {
// corrupted state store should not be included in checkpointing
private boolean corrupted;
+ private final long retentionPeriod;
+
private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
@@ -113,6 +117,7 @@ public class ProcessorStateManager implements StateManager {
this.changelogPartition = null;
this.corrupted = false;
this.offset = null;
+ this.retentionPeriod = -1L;
}
private StateStoreMetadata(final StateStore stateStore,
@@ -130,12 +135,24 @@ public class ProcessorStateManager implements
StateManager {
this.commitCallback = commitCallback;
this.recordConverter = recordConverter;
this.offset = null;
+ this.retentionPeriod = extractRetentionPeriod(stateStore);
}
private void setOffset(final Long offset) {
this.offset = offset;
}
+ private static long extractRetentionPeriod(final StateStore
stateStore) {
+ StateStore current = stateStore;
+ while (current instanceof WrappedStateStore) {
+ current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
+ }
+ if (current instanceof WithRetentionPeriod) {
+ return ((WithRetentionPeriod) current).retentionPeriod();
+ }
+ return -1L;
+ }
+
// the offset is exposed to the changelog reader to determine if
restoration is completed
Long offset() {
return this.offset;
@@ -145,6 +162,11 @@ public class ProcessorStateManager implements StateManager
{
return this.endOffset;
}
+ // the retentionPeriod is exposed to the changelog reader for window
restoration
+ long retentionPeriod() {
+ return retentionPeriod;
+ }
+
public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index db5b4156550..ebd8d6b97cd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -969,7 +970,8 @@ public class StoreChangelogReader implements
ChangelogReader {
private void prepareChangelogs(final Map<TaskId, Task> tasks,
final Set<ChangelogMetadata>
newPartitionsToRestore) {
// separate those who do not have the current offset loaded from
checkpoint
- final Set<TopicPartition> newPartitionsWithoutStartOffset = new
HashSet<>();
+ final Set<TopicPartition> newSeekToBeginningPartitions = new
HashSet<>();
+ final Map<TopicPartition, Long> newWindowedPartitionsRetention = new
HashMap<>();
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -986,18 +988,18 @@ public class StoreChangelogReader implements
ChangelogReader {
log.debug("Start restoring changelog partition {} from current
offset {} to end offset {}.",
partition, currentOffset, recordEndOffset(endOffset));
} else {
- log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
- "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
-
- newPartitionsWithoutStartOffset.add(partition);
+ final long retentionPeriod = storeMetadata.retentionPeriod();
+ if (retentionPeriod > 0 && retentionPeriod != Long.MAX_VALUE) {
+ newWindowedPartitionsRetention.put(partition,
retentionPeriod);
+ } else {
+ log.debug("Start restoring changelog partition {} from the
beginning offset to end offset {} " +
+ "since we cannot find current offset.", partition,
recordEndOffset(endOffset));
+ newSeekToBeginningPartitions.add(partition);
+ }
}
}
- // optimization: batch all seek-to-beginning offsets in a single
request
- // seek is not a blocking call so there's nothing to
capture
- if (!newPartitionsWithoutStartOffset.isEmpty()) {
- restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
- }
+ seekNewPartitions(newWindowedPartitionsRetention,
newSeekToBeginningPartitions);
for (final ChangelogMetadata changelogMetadata :
newPartitionsToRestore) {
final StateStoreMetadata storeMetadata =
changelogMetadata.storeMetadata;
@@ -1039,6 +1041,99 @@ public class StoreChangelogReader implements
ChangelogReader {
}
}
+ private void seekNewPartitions(final Map<TopicPartition, Long>
windowedPartitionsRetention,
+ final Set<TopicPartition>
seekToBeginningPartitions) {
+ // Seek non-windowed partitions to beginning.
+ if (!seekToBeginningPartitions.isEmpty()) {
+ restoreConsumer.seekToBeginning(seekToBeginningPartitions);
+ }
+
+ // Try to optimize windowed partitions by seeking past expired data.
+ if (!windowedPartitionsRetention.isEmpty()) {
+ final Set<TopicPartition> allAssigned =
restoreConsumer.assignment();
+ final Set<TopicPartition> previouslyPaused = new
HashSet<>(restoreConsumer.paused());
+
+ try {
+ restoreConsumer.pause(allAssigned);
+ restoreConsumer.resume(windowedPartitionsRetention.keySet());
+
+ final Map<TopicPartition, Long> endOffsets =
+
restoreConsumer.endOffsets(windowedPartitionsRetention.keySet());
+
+ for (final TopicPartition partition :
windowedPartitionsRetention.keySet()) {
+ final Long endOffset = endOffsets.get(partition);
+ if (endOffset != null && endOffset > 0) {
+ restoreConsumer.seek(partition, endOffset - 1);
+ } else {
+
restoreConsumer.seekToBeginning(Collections.singleton(partition));
+ seekToBeginningPartitions.add(partition);
+ }
+ }
+
windowedPartitionsRetention.keySet().removeAll(seekToBeginningPartitions);
+
+ final ConsumerRecords<byte[], byte[]> polledRecords =
restoreConsumer.poll(pollTime);
+
+ seekByRetentionFromPolledRecords(polledRecords,
windowedPartitionsRetention, seekToBeginningPartitions);
+ } catch (final TimeoutException e) {
+ log.debug("Could not seek by timestamp for changelog
partitions {}, falling back to seek-to-beginning",
+ windowedPartitionsRetention.keySet(), e);
+
seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet());
+ } catch (final KafkaException e) {
+ log.warn("Failed to seek by timestamp for changelog partitions
{}, falling back to seek-to-beginning",
+ windowedPartitionsRetention.keySet(), e);
+
seekToBeginningPartitions.addAll(windowedPartitionsRetention.keySet());
+ } finally {
+ restoreConsumer.pause(allAssigned);
+ final Set<TopicPartition> toResume = new
HashSet<>(allAssigned);
+ toResume.removeAll(previouslyPaused);
+ if (!toResume.isEmpty()) {
+ restoreConsumer.resume(toResume);
+ }
+ }
+ }
+
+ // Seek any windowed partitions that failed during the optimization
back to the beginning.
+ // Their position was moved by seek+poll above.
+ if (!seekToBeginningPartitions.isEmpty()) {
+ restoreConsumer.seekToBeginning(seekToBeginningPartitions);
+ }
+ }
+
+ private void seekByRetentionFromPolledRecords(final
ConsumerRecords<byte[], byte[]> polledRecords,
+ final Map<TopicPartition,
Long> windowedPartitionsRetention,
+ final Set<TopicPartition>
seekToBeginningPartitions) {
+ final Map<TopicPartition, Long> seekTimestamps = new HashMap<>();
+ for (final Map.Entry<TopicPartition, Long> entry :
windowedPartitionsRetention.entrySet()) {
+ final TopicPartition partition = entry.getKey();
+ final long retentionPeriod = entry.getValue();
+ final List<ConsumerRecord<byte[], byte[]>> records =
polledRecords.records(partition);
+ if (!records.isEmpty()) {
+ final long latestTimestamp = records.get(0).timestamp();
+ final long seekTimestamp = latestTimestamp - retentionPeriod;
+ if (seekTimestamp > 0) {
+ seekTimestamps.put(partition, seekTimestamp);
+ log.debug("Start restoring windowed changelog partition {}
from stream-time-based timestamp {} " +
+ "(maxStreamTime={}, retention={}).", partition,
seekTimestamp, latestTimestamp, retentionPeriod);
+ continue;
+ }
+ }
+ log.debug("Start restoring changelog partition {} from the
beginning.", partition);
+ seekToBeginningPartitions.add(partition);
+ }
+
+ if (!seekTimestamps.isEmpty()) {
+ final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
+ restoreConsumer.offsetsForTimes(seekTimestamps);
+ offsetsByTimestamp.forEach((partition, offsetAndTimestamp) -> {
+ if (offsetAndTimestamp != null) {
+ restoreConsumer.seek(partition,
offsetAndTimestamp.offset());
+ } else {
+ seekToBeginningPartitions.add(partition);
+ }
+ });
+ }
+ }
+
@Override
public void unregister(final Collection<TopicPartition> revokedChangelogs)
{
unregister(revokedChangelogs,
StandbyUpdateListener.SuspendReason.MIGRATED);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index f353e56f7df..9dbb4210aa7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -46,7 +46,7 @@ import java.util.Optional;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore {
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends
Segment> implements SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
private final String name;
@@ -74,6 +74,11 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
this.retentionPeriod = retentionPeriod;
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> all() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index c6b57411067..c11e276a19e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -48,7 +48,7 @@ import java.util.Map;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
-public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore {
+public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements
SegmentedBytesStore, WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
private final String name;
@@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
this.segments = segments;
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
final long from,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 9d3936d0eb0..dd3f083aa10 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import static
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
-public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
+public class InMemorySessionStore implements SessionStore<Bytes, byte[]>,
WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(InMemorySessionStore.class);
@@ -91,6 +91,11 @@ public class InMemorySessionStore implements
SessionStore<Bytes, byte[]> {
this.position = Position.emptyPosition();
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public String name() {
return name;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 8d2228db5c3..6f5b1095f47 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -59,7 +59,7 @@ import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractSt
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
-public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>,
WithRetentionPeriod {
private static final Logger LOG =
LoggerFactory.getLogger(InMemoryWindowStore.class);
private static final int SEQNUM_SIZE = 4;
@@ -95,6 +95,11 @@ public class InMemoryWindowStore implements
WindowStore<Bytes, byte[]> {
this.position = Position.emptyPosition();
}
+ @Override
+ public long retentionPeriod() {
+ return retentionPeriod;
+ }
+
@Override
public String name() {
return name;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
new file mode 100644
index 00000000000..44dee28eafa
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public interface WithRetentionPeriod {
+ long retentionPeriod();
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index f32ed66b71d..ad81af11494 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,11 +27,14 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.internals.LogContext;
@@ -59,7 +62,9 @@ import org.mockito.quality.Strictness;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1430,6 +1435,133 @@ public class StoreChangelogReaderTest {
}
}
+ @Test
+ public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
+ final long retentionMs = Duration.ofHours(2).toMillis();
+ final long offsetForTimestamp = 42L;
+ final long latestRecordTimestamp = 10_000_000L;
+ final long endOffset = 100L;
+
+ final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+ @Override
+ public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+ final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
+ timestampsToSearch.forEach((key, value) -> result.put(key, new
OffsetAndTimestamp(offsetForTimestamp, value)));
+ return result;
+ }
+ };
+
+ final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
+ final StateStore windowStore = mock(StateStore.class);
+ when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(windowStoreMetadata.store()).thenReturn(windowStore);
+ when(windowStoreMetadata.offset()).thenReturn(null);
+ when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+ when(windowStore.name()).thenReturn(storeName);
+
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+ when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(windowStateManager.taskId()).thenReturn(taskId);
+
+ timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
+ timestampConsumer.updateEndOffsets(Collections.singletonMap(tp,
endOffset));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, endOffset));
+
+ // schedule adding the record during poll, after the partition is
assigned
+ timestampConsumer.schedulePollTask(() ->
timestampConsumer.addRecord(new ConsumerRecord<>(
+ tp.topic(), tp.partition(), endOffset - 1,
+ latestRecordTimestamp, TimestampType.CREATE_TIME,
+ 0, 0, new byte[0], new byte[0],
+ new RecordHeaders(), Optional.empty())));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
+
+ reader.register(tp, windowStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The
consumer should be seeked to the offset returned by offsetsForTimes, not to the
beginning");
+ }
+
+ @Test
+ public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes()
{
+ final long retentionMs = Duration.ofHours(2).toMillis();
+ final long latestRecordTimestamp = 10_000_000L;
+ final long endOffset = 100L;
+
+ final MockConsumer<byte[], byte[]> timestampConsumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+ @Override
+ public synchronized Map<TopicPartition, OffsetAndTimestamp>
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+ final Map<TopicPartition, OffsetAndTimestamp> result = new
HashMap<>();
+ timestampsToSearch.forEach((key, value) -> result.put(key,
null));
+ return result;
+ }
+ };
+
+ final StateStoreMetadata windowStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager windowStateManager =
mock(ProcessorStateManager.class);
+ final StateStore windowStore = mock(StateStore.class);
+ when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(windowStoreMetadata.store()).thenReturn(windowStore);
+ when(windowStoreMetadata.offset()).thenReturn(null);
+ when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+ when(windowStore.name()).thenReturn(storeName);
+
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+ when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(windowStateManager.taskId()).thenReturn(taskId);
+
+ timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp,
0L));
+ timestampConsumer.updateEndOffsets(Collections.singletonMap(tp,
endOffset));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, endOffset));
+
+ // schedule adding the record during poll, after the partition is
assigned
+ timestampConsumer.schedulePollTask(() ->
timestampConsumer.addRecord(new ConsumerRecord<>(
+ tp.topic(), tp.partition(), endOffset - 1,
+ latestRecordTimestamp, TimestampType.CREATE_TIME,
+ 0, 0, new byte[0], new byte[0],
+ new RecordHeaders(), Optional.empty())));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
timestampConsumer, callback, standbyListener);
+
+ reader.register(tp, windowStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(0L, timestampConsumer.position(tp), "When broker returns
null, should fall back to seeking to the beginning");
+ }
+
+ @Test
+ public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
+ final StateStoreMetadata kvStoreMetadata =
mock(StateStoreMetadata.class);
+ final ProcessorStateManager kvStateManager =
mock(ProcessorStateManager.class);
+ final StateStore kvStore = mock(StateStore.class);
+ when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
+ when(kvStoreMetadata.store()).thenReturn(kvStore);
+ when(kvStoreMetadata.offset()).thenReturn(null);
+ when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
+ when(kvStore.name()).thenReturn(storeName);
+ when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
+ when(kvStateManager.taskType()).thenReturn(ACTIVE);
+
+ final TaskId taskId = new TaskId(0, 0);
+ when(kvStateManager.taskId()).thenReturn(taskId);
+
+ consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader reader =
+ new StoreChangelogReader(time, config, logContext, adminClient,
consumer, callback, standbyListener);
+
+ reader.register(tp, kvStateManager);
+ reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+ assertEquals(0L, consumer.position(tp), "Non-windowed store should
seek to beginning, not by timestamp");
+ }
+
private void assignPartition(final long messages,
final TopicPartition topicPartition) {
consumer.updatePartitions(