This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 739d9c82562 KAFKA-13499: Avoid restoring outdated records (#21901)
739d9c82562 is described below

commit 739d9c82562d93aa3d49da3853b4402e37224da0
Author: gabriellefu <[email protected]>
AuthorDate: Tue Mar 31 13:42:19 2026 -0400

    KAFKA-13499: Avoid restoring outdated records (#21901)
    
    1. Expose the retentionPeriod length to storeMetadata
    2. In prepareChangelogs(), switch it from always seektobeginning if
    checkpoint doesn't exist to seek to certain timestamp to avoid restoring
    outdated records.
    
    Reviewers: TengYao Chi <[email protected]> , Bill Bejeck
     <[email protected]>
---
 .../processor/internals/ProcessorStateManager.java |  22 ++++
 .../processor/internals/StoreChangelogReader.java  |  47 +++++++--
 ...stractDualSchemaRocksDBSegmentedBytesStore.java |   7 +-
 .../AbstractRocksDBSegmentedBytesStore.java        |   7 +-
 .../state/internals/InMemorySessionStore.java      |   9 +-
 .../state/internals/InMemoryWindowStore.java       |   7 +-
 .../state/internals/WithRetentionPeriod.java       |  21 ++++
 .../internals/StoreChangelogReaderTest.java        | 112 +++++++++++++++++++++
 8 files changed, 218 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 7b3483f8e68..9c715752058 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -36,6 +36,8 @@ import 
org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.LegacyCheckpointingStateStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+import org.apache.kafka.streams.state.internals.WithRetentionPeriod;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import org.slf4j.Logger;
 
@@ -103,6 +105,8 @@ public class ProcessorStateManager implements StateManager {
         // corrupted state store should not be included in checkpointing
         private boolean corrupted;
 
+        private final long retentionPeriod;
+
 
         private StateStoreMetadata(final StateStore stateStore,
                                    final CommitCallback commitCallback) {
@@ -113,6 +117,7 @@ public class ProcessorStateManager implements StateManager {
             this.changelogPartition = null;
             this.corrupted = false;
             this.offset = null;
+            this.retentionPeriod = -1L;
         }
 
         private StateStoreMetadata(final StateStore stateStore,
@@ -130,12 +135,24 @@ public class ProcessorStateManager implements 
StateManager {
             this.commitCallback = commitCallback;
             this.recordConverter = recordConverter;
             this.offset = null;
+            this.retentionPeriod = extractRetentionPeriod(stateStore);
         }
 
         private void setOffset(final Long offset) {
             this.offset = offset;
         }
 
+        private static long extractRetentionPeriod(final StateStore 
stateStore) {
+            StateStore current = stateStore;
+            while (current instanceof WrappedStateStore) {
+                current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
+            }
+            if (current instanceof WithRetentionPeriod) {
+                return ((WithRetentionPeriod) current).retentionPeriod();
+            }
+            return -1L;
+        }
+
         // the offset is exposed to the changelog reader to determine if 
restoration is completed
         Long offset() {
             return this.offset;
@@ -145,6 +162,11 @@ public class ProcessorStateManager implements StateManager 
{
             return this.endOffset;
         }
 
+        // the retentionPeriod is exposed to the changelog reader for window 
restoration
+        long retentionPeriod() {
+            return retentionPeriod;
+        }
+
         public void setEndOffset(final Long endOffset) {
             this.endOffset = endOffset;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index eab7da800d8..aa7098400a1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -970,6 +971,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
                                    final Set<ChangelogMetadata> 
newPartitionsToRestore) {
         // separate those who do not have the current offset loaded from 
checkpoint
         final Set<TopicPartition> newPartitionsWithoutStartOffset = new 
HashSet<>();
+        final Map<TopicPartition, Long> newPartitionsWithTimestampSeek = new 
HashMap<>();
 
         for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
             final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
@@ -986,18 +988,22 @@ public class StoreChangelogReader implements 
ChangelogReader {
                 log.debug("Start restoring changelog partition {} from current 
offset {} to end offset {}.",
                     partition, currentOffset, recordEndOffset(endOffset));
             } else {
-                log.debug("Start restoring changelog partition {} from the 
beginning offset to end offset {} " +
-                    "since we cannot find current offset.", partition, 
recordEndOffset(endOffset));
-
-                newPartitionsWithoutStartOffset.add(partition);
+                final long retentionPeriod = storeMetadata.retentionPeriod();
+                final long seekTimestamp = retentionPeriod > 0 && 
retentionPeriod != Long.MAX_VALUE
+                    ? time.milliseconds() - retentionPeriod : -1L;
+                if (seekTimestamp > 0) {
+                    newPartitionsWithTimestampSeek.put(partition, 
seekTimestamp);
+                    log.debug("Start restoring windowed changelog partition {} 
from timestamp {} to end offset {}.",
+                        partition, seekTimestamp, recordEndOffset(endOffset));
+                } else {
+                    log.debug("Start restoring changelog partition {} from the 
beginning offset to end offset {} " +
+                        "since we cannot find current offset.", partition, 
recordEndOffset(endOffset));
+                    newPartitionsWithoutStartOffset.add(partition);
+                }
             }
         }
 
-        // optimization: batch all seek-to-beginning offsets in a single 
request
-        //               seek is not a blocking call so there's nothing to 
capture
-        if (!newPartitionsWithoutStartOffset.isEmpty()) {
-            restoreConsumer.seekToBeginning(newPartitionsWithoutStartOffset);
-        }
+        seekToTimestampOrBeginning(newPartitionsWithTimestampSeek, 
newPartitionsWithoutStartOffset);
 
         for (final ChangelogMetadata changelogMetadata : 
newPartitionsToRestore) {
             final StateStoreMetadata storeMetadata = 
changelogMetadata.storeMetadata;
@@ -1039,6 +1045,29 @@ public class StoreChangelogReader implements 
ChangelogReader {
         }
     }
 
+    private void seekToTimestampOrBeginning(final Map<TopicPartition, Long> 
partitionsWithTimestampSeek,
+                                            final Set<TopicPartition> 
partitionsWithoutStartOffset) {
+        // optimization: seek windowed stores by timestamp to skip expired data
+        if (!partitionsWithTimestampSeek.isEmpty()) {
+            final Map<TopicPartition, OffsetAndTimestamp> offsetsByTimestamp =
+                restoreConsumer.offsetsForTimes(partitionsWithTimestampSeek);
+            offsetsByTimestamp.forEach((key, value) -> {
+                if (value != null) {
+                    restoreConsumer.seek(key, value.offset());
+                } else {
+                    // no offset found for the timestamp, fall back to seeking 
to the beginning
+                    partitionsWithoutStartOffset.add(key);
+                }
+            });
+        }
+
+        // optimization: batch all seek-to-beginning offsets in a single 
request
+        //               seek is not a blocking call so there's nothing to 
capture
+        if (!partitionsWithoutStartOffset.isEmpty()) {
+            restoreConsumer.seekToBeginning(partitionsWithoutStartOffset);
+        }
+    }
+
     @Override
     public void unregister(final Collection<TopicPartition> revokedChangelogs) 
{
         unregister(revokedChangelogs, 
StandbyUpdateListener.SuspendReason.MIGRATED);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index f353e56f7df..9dbb4210aa7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -46,7 +46,7 @@ import java.util.Optional;
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 
-public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends 
Segment> implements SegmentedBytesStore {
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends 
Segment> implements SegmentedBytesStore, WithRetentionPeriod {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
 
     private final String name;
@@ -74,6 +74,11 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
         this.retentionPeriod = retentionPeriod;
     }
 
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index c6b57411067..c11e276a19e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -48,7 +48,7 @@ import java.util.Map;
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 
-public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements 
SegmentedBytesStore {
+public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements 
SegmentedBytesStore, WithRetentionPeriod {
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
 
     private final String name;
@@ -73,6 +73,11 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
         this.segments = segments;
     }
 
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key,
                                                  final long from,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index 8bd7185139a..e0cd49b562d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -54,7 +54,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
 
-public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
+public class InMemorySessionStore implements SessionStore<Bytes, byte[]>, 
WithRetentionPeriod {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemorySessionStore.class);
 
@@ -89,6 +89,11 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]> {
         this.position = Position.emptyPosition();
     }
 
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+
     @Override
     public String name() {
         return name;
@@ -590,4 +595,4 @@ public class InMemorySessionStore implements 
SessionStore<Bytes, byte[]> {
         }
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 8d2228db5c3..6f5b1095f47 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -59,7 +59,7 @@ import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.extractSt
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
 
 
-public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]>, 
WithRetentionPeriod {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryWindowStore.class);
     private static final int SEQNUM_SIZE = 4;
@@ -95,6 +95,11 @@ public class InMemoryWindowStore implements 
WindowStore<Bytes, byte[]> {
         this.position = Position.emptyPosition();
     }
 
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
+
     @Override
     public String name() {
         return name;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
new file mode 100644
index 00000000000..0ffa460e8b1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WithRetentionPeriod.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public interface WithRetentionPeriod {
+    long retentionPeriod();
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 72954175f5b..e24b5c5c3e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
@@ -59,6 +60,7 @@ import org.mockito.quality.Strictness;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -1430,6 +1432,116 @@ public class StoreChangelogReaderTest {
         }
     }
 
+    @Test
+    public void shouldSeekByTimestampForWindowedStoreWithoutCheckpoint() {
+        final long retentionMs = Duration.ofHours(2).toMillis();
+        final long offsetForTimestamp = 42L;
+
+        // Use a MockConsumer subclass that supports offsetsForTimes
+        final MockConsumer<byte[], byte[]> timestampConsumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+            @Override
+            public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+                final Map<TopicPartition, OffsetAndTimestamp> result = new 
HashMap<>();
+                timestampsToSearch.forEach((key, value) -> result.put(key, new 
OffsetAndTimestamp(offsetForTimestamp, value)));
+                return result;
+            }
+        };
+
+        // Set up mocks - storeMetadata returns null offset (no checkpoint) 
and positive retentionPeriod
+        final StateStoreMetadata windowStoreMetadata = 
mock(StateStoreMetadata.class);
+        final ProcessorStateManager windowStateManager = 
mock(ProcessorStateManager.class);
+        final StateStore windowStore = mock(StateStore.class);
+        when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+        when(windowStoreMetadata.store()).thenReturn(windowStore);
+        when(windowStoreMetadata.offset()).thenReturn(null);
+        when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+        when(windowStore.name()).thenReturn(storeName);
+        
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+        when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+        final TaskId taskId = new TaskId(0, 0);
+        when(windowStateManager.taskId()).thenReturn(taskId);
+
+        timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 
0L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+        final StoreChangelogReader reader =
+            new StoreChangelogReader(time, config, logContext, adminClient, 
timestampConsumer, callback, standbyListener);
+
+        reader.register(tp, windowStateManager);
+        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+        assertEquals(offsetForTimestamp, timestampConsumer.position(tp), "The 
consumer should be seeked to the offset returned by offsetsForTimes, not to the 
beginning");
+    }
+
+    @Test
+    public void shouldSeekToBeginningWhenBrokerReturnsNullForOffsetsForTimes() 
{
+        final long retentionMs = Duration.ofHours(2).toMillis();
+
+        // Use a MockConsumer subclass that returns null for offsetsForTimes
+        final MockConsumer<byte[], byte[]> timestampConsumer = new 
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+            @Override
+            public synchronized Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(final Map<TopicPartition, Long> timestampsToSearch) {
+                final Map<TopicPartition, OffsetAndTimestamp> result = new 
HashMap<>();
+                timestampsToSearch.forEach((key, value) -> result.put(key, 
null));
+                return result;
+            }
+        };
+
+        final StateStoreMetadata windowStoreMetadata = 
mock(StateStoreMetadata.class);
+        final ProcessorStateManager windowStateManager = 
mock(ProcessorStateManager.class);
+        final StateStore windowStore = mock(StateStore.class);
+        when(windowStoreMetadata.changelogPartition()).thenReturn(tp);
+        when(windowStoreMetadata.store()).thenReturn(windowStore);
+        when(windowStoreMetadata.offset()).thenReturn(null);
+        when(windowStoreMetadata.retentionPeriod()).thenReturn(retentionMs);
+        when(windowStore.name()).thenReturn(storeName);
+        
when(windowStateManager.storeMetadata(tp)).thenReturn(windowStoreMetadata);
+        when(windowStateManager.taskType()).thenReturn(ACTIVE);
+
+        final TaskId taskId = new TaskId(0, 0);
+        when(windowStateManager.taskId()).thenReturn(taskId);
+
+        timestampConsumer.updateBeginningOffsets(Collections.singletonMap(tp, 
0L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+        final StoreChangelogReader reader =
+            new StoreChangelogReader(time, config, logContext, adminClient, 
timestampConsumer, callback, standbyListener);
+
+        reader.register(tp, windowStateManager);
+        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+        assertEquals(0L, timestampConsumer.position(tp), "When broker returns 
null, should fall back to seeking to the beginning");
+    }
+
+    @Test
+    public void shouldSeekToBeginningForNonWindowedStoreWithoutCheckpoint() {
+        final StateStoreMetadata kvStoreMetadata = 
mock(StateStoreMetadata.class);
+        final ProcessorStateManager kvStateManager = 
mock(ProcessorStateManager.class);
+        final StateStore kvStore = mock(StateStore.class);
+        when(kvStoreMetadata.changelogPartition()).thenReturn(tp);
+        when(kvStoreMetadata.store()).thenReturn(kvStore);
+        when(kvStoreMetadata.offset()).thenReturn(null);
+        when(kvStoreMetadata.retentionPeriod()).thenReturn(-1L);
+        when(kvStore.name()).thenReturn(storeName);
+        when(kvStateManager.storeMetadata(tp)).thenReturn(kvStoreMetadata);
+        when(kvStateManager.taskType()).thenReturn(ACTIVE);
+
+        final TaskId taskId = new TaskId(0, 0);
+        when(kvStateManager.taskId()).thenReturn(taskId);
+
+        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 0L));
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+        final StoreChangelogReader reader =
+            new StoreChangelogReader(time, config, logContext, adminClient, 
consumer, callback, standbyListener);
+
+        reader.register(tp, kvStateManager);
+        reader.restore(Collections.singletonMap(taskId, mock(Task.class)));
+
+        assertEquals(0L, consumer.position(tp), "Non-windowed store should 
seek to beginning, not by timestamp");
+    }
+
     private void assignPartition(final long messages,
                                  final TopicPartition topicPartition) {
         consumer.updatePartitions(

Reply via email to