guozhangwang commented on code in PR #12030:
URL: https://github.com/apache/kafka/pull/12030#discussion_r850788768


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java:
##########
@@ -34,13 +34,17 @@ public class PrefixedWindowKeySchemas {
 
     private static final int PREFIX_SIZE = 1;
     private static final byte TIME_FIRST_PREFIX = 0;
-    private static final byte KEY_FIRST_PREFIX = 1;
+    public static final byte KEY_FIRST_PREFIX = 1;
     private static final int SEQNUM_SIZE = 4;
 
     private static byte extractPrefix(final byte[] binaryBytes) {
         return binaryBytes[0];
     }
 
+    public static boolean isTimeFirstSchemaKey(final byte[] binaryBytes) {

Review Comment:
   I did not find where is this function used?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.function.Function;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.StoreKeyToWindowKey;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.WindowKeyToBytes;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
+
+class TimeOrderedCachingWindowStore
+    extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
+    implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimeOrderedCachingWindowStore.class);
+
+    private final long windowSize;
+    private final SegmentedCacheFunction baseKeyCacheFunction;
+    private final SegmentedCacheFunction indexKeyCacheFunction;
+    private final TimeFirstWindowKeySchema baseKeySchema = new 
TimeFirstWindowKeySchema();
+    private final KeyFirstWindowKeySchema indexKeySchema = new 
KeyFirstWindowKeySchema();
+
+    private String cacheName;
+    private boolean hasIndex;
+    private boolean sendOldValues;
+    private InternalProcessorContext<?, ?> context;
+    private StateSerdes<Bytes, byte[]> bytesSerdes;
+    private CacheFlushListener<byte[], byte[]> flushListener;
+
+    private final AtomicLong maxObservedTimestamp;
+
+    TimeOrderedCachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
+                       final long windowSize,
+                       final long segmentInterval) {
+        super(underlying);
+        this.windowSize = windowSize;
+        this.baseKeyCacheFunction = new SegmentedCacheFunction(baseKeySchema, 
segmentInterval);
+        this.indexKeyCacheFunction = new 
SegmentedCacheFunction(indexKeySchema, segmentInterval);
+        this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
+        enforceWrappedStore(underlying);
+    }
+
+    private void enforceWrappedStore(final WindowStore<Bytes, byte[]> 
underlying) {
+        final RocksDBTimeOrderedWindowStore timeOrderedWindowStore = 
getWrappedStore(underlying);
+        if (timeOrderedWindowStore == null) {
+            throw new IllegalArgumentException("TimeOrderedCachingWindowStore 
only supports RocksDBTimeOrderedWindowStore backed store");
+        }
+
+        hasIndex = timeOrderedWindowStore.hasIndex();
+    }
+
+    private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore 
wrapped) {
+        if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
+            return (RocksDBTimeOrderedWindowStore) wrapped;
+        }
+        if (wrapped instanceof WrappedStateStore) {
+            return getWrappedStore(((WrappedStateStore) wrapped).wrapped());
+        }
+        return null;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(
+            context.appConfigs(),
+            StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,

Review Comment:
   I think we'd need this config here for now.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.function.Function;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.StoreKeyToWindowKey;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.WindowKeyToBytes;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
+
+class TimeOrderedCachingWindowStore
+    extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
+    implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimeOrderedCachingWindowStore.class);
+
+    private final long windowSize;
+    private final SegmentedCacheFunction baseKeyCacheFunction;
+    private final SegmentedCacheFunction indexKeyCacheFunction;
+    private final TimeFirstWindowKeySchema baseKeySchema = new 
TimeFirstWindowKeySchema();
+    private final KeyFirstWindowKeySchema indexKeySchema = new 
KeyFirstWindowKeySchema();
+
+    private String cacheName;
+    private boolean hasIndex;
+    private boolean sendOldValues;
+    private InternalProcessorContext<?, ?> context;
+    private StateSerdes<Bytes, byte[]> bytesSerdes;
+    private CacheFlushListener<byte[], byte[]> flushListener;
+
+    private final AtomicLong maxObservedTimestamp;
+
+    TimeOrderedCachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
+                       final long windowSize,
+                       final long segmentInterval) {
+        super(underlying);
+        this.windowSize = windowSize;
+        this.baseKeyCacheFunction = new SegmentedCacheFunction(baseKeySchema, 
segmentInterval);
+        this.indexKeyCacheFunction = new 
SegmentedCacheFunction(indexKeySchema, segmentInterval);
+        this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
+        enforceWrappedStore(underlying);
+    }
+
+    private void enforceWrappedStore(final WindowStore<Bytes, byte[]> 
underlying) {
+        final RocksDBTimeOrderedWindowStore timeOrderedWindowStore = 
getWrappedStore(underlying);
+        if (timeOrderedWindowStore == null) {
+            throw new IllegalArgumentException("TimeOrderedCachingWindowStore 
only supports RocksDBTimeOrderedWindowStore backed store");
+        }
+
+        hasIndex = timeOrderedWindowStore.hasIndex();
+    }
+
+    private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore 
wrapped) {
+        if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
+            return (RocksDBTimeOrderedWindowStore) wrapped;
+        }
+        if (wrapped instanceof WrappedStateStore) {
+            return getWrappedStore(((WrappedStateStore) wrapped).wrapped());
+        }
+        return null;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(
+            context.appConfigs(),
+            StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,

Review Comment:
   This is introduced for allowing customized internal topic prefix (atm it 
would only be used by ksql's shared runtime) than the default "appID".



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java:
##########
@@ -34,13 +34,17 @@ public class PrefixedWindowKeySchemas {
 
     private static final int PREFIX_SIZE = 1;
     private static final byte TIME_FIRST_PREFIX = 0;
-    private static final byte KEY_FIRST_PREFIX = 1;
+    public static final byte KEY_FIRST_PREFIX = 1;
     private static final int SEQNUM_SIZE = 4;
 
     private static byte extractPrefix(final byte[] binaryBytes) {
         return binaryBytes[0];
     }
 
+    public static boolean isTimeFirstSchemaKey(final byte[] binaryBytes) {

Review Comment:
   EDIT: nvm, found it now.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java:
##########
@@ -0,0 +1,694 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.function.Function;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
+import org.apache.kafka.streams.processor.internals.RecordQueue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.StoreKeyToWindowKey;
+import 
org.apache.kafka.streams.state.internals.MergedSortedCacheWindowStoreKeyValueIterator.WindowKeyToBytes;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.apache.kafka.streams.state.internals.SegmentedBytesStore.KeySchema;
+import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll;
+import static 
org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed;
+
+class TimeOrderedCachingWindowStore
+    extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]>
+    implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TimeOrderedCachingWindowStore.class);
+
+    private final long windowSize;
+    private final SegmentedCacheFunction baseKeyCacheFunction;
+    private final SegmentedCacheFunction indexKeyCacheFunction;
+    private final TimeFirstWindowKeySchema baseKeySchema = new 
TimeFirstWindowKeySchema();
+    private final KeyFirstWindowKeySchema indexKeySchema = new 
KeyFirstWindowKeySchema();
+
+    private String cacheName;
+    private boolean hasIndex;
+    private boolean sendOldValues;
+    private InternalProcessorContext<?, ?> context;
+    private StateSerdes<Bytes, byte[]> bytesSerdes;
+    private CacheFlushListener<byte[], byte[]> flushListener;
+
+    private final AtomicLong maxObservedTimestamp;
+
+    TimeOrderedCachingWindowStore(final WindowStore<Bytes, byte[]> underlying,
+                                  final long windowSize,
+                                  final long segmentInterval) {
+        super(underlying);
+        this.windowSize = windowSize;
+        this.baseKeyCacheFunction = new SegmentedCacheFunction(baseKeySchema, 
segmentInterval);
+        this.indexKeyCacheFunction = new 
SegmentedCacheFunction(indexKeySchema, segmentInterval);
+        this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN);
+        enforceWrappedStore(underlying);
+    }
+
+    private void enforceWrappedStore(final WindowStore<Bytes, byte[]> 
underlying) {
+        final RocksDBTimeOrderedWindowStore timeOrderedWindowStore = 
getWrappedStore(underlying);
+        if (timeOrderedWindowStore == null) {
+            throw new IllegalArgumentException("TimeOrderedCachingWindowStore 
only supports RocksDBTimeOrderedWindowStore backed store");
+        }
+
+        hasIndex = timeOrderedWindowStore.hasIndex();
+    }
+
+    private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore 
wrapped) {
+        if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
+            return (RocksDBTimeOrderedWindowStore) wrapped;
+        }
+        if (wrapped instanceof WrappedStateStore) {
+            return getWrappedStore(((WrappedStateStore) wrapped).wrapped());
+        }
+        return null;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        initInternal(asInternalProcessorContext(context));
+        super.init(context, root);
+    }
+
+    private void initInternal(final InternalProcessorContext<?, ?> context) {
+        final String prefix = StreamsConfig.InternalConfig.getString(
+            context.appConfigs(),
+            StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
+            context.applicationId()
+        );
+        this.context = context;
+        final String topic = ProcessorStateManager.storeChangelogTopic(prefix, 
name(),  context.taskId().topologyName());
+
+        bytesSerdes = new StateSerdes<>(
+            topic,
+            Serdes.Bytes(),
+            Serdes.ByteArray());
+        cacheName = context.taskId() + "-" + name();
+
+        context.registerCacheFlushListener(cacheName, entries -> {
+            for (final ThreadCache.DirtyEntry entry : entries) {
+                putAndMaybeForward(entry, context);
+            }
+        });
+    }
+
+    private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
+                                    final InternalProcessorContext<?, ?> 
context) {
+        final byte[] binaryWindowKey = 
baseKeyCacheFunction.key(entry.key()).get();
+        final boolean isBaseKey = 
PrefixedWindowKeySchemas.isTimeFirstSchemaKey(binaryWindowKey);
+
+        final Windowed<Bytes> windowedKeyBytes;
+        if (isBaseKey) {
+            windowedKeyBytes = 
TimeFirstWindowKeySchema.fromStoreBytesKey(binaryWindowKey, windowSize);
+        } else {
+            windowedKeyBytes = 
KeyFirstWindowKeySchema.fromStoreBytesKey(binaryWindowKey, windowSize);
+        }
+
+        final long windowStartTimestamp = windowedKeyBytes.window().start();
+        final Bytes binaryKey = windowedKeyBytes.key();
+
+        final DirtyEntry finalEntry;
+        if (!isBaseKey) {
+            final Bytes baseKey = 
indexKeyToBaseKey(Bytes.wrap(binaryWindowKey));
+            final Bytes cachedBaseKey = baseKeyCacheFunction.cacheKey(baseKey);
+            final LRUCacheEntry value = context.cache().get(cacheName, 
cachedBaseKey);
+            // Base key value is already evicted, which should be handled 
already
+            if (value == null) {
+                return;
+            }
+
+            finalEntry = new DirtyEntry(entry.key(), value.value(), value);
+        } else {
+            finalEntry = entry;
+        }
+
+        if (flushListener != null) {
+            final byte[] rawNewValue = finalEntry.newValue();
+            final byte[] rawOldValue = rawNewValue == null || sendOldValues ?
+                wrapped().fetch(binaryKey, windowStartTimestamp) : null;
+
+            // this is an optimization: if this key did not exist in 
underlying store and also not in the cache,
+            // we can skip flushing to downstream as well as writing to 
underlying store
+            if (rawNewValue != null || rawOldValue != null) {
+                // we need to get the old values if needed, and then put to 
store, and then flush
+                final ProcessorRecordContext current = context.recordContext();
+                try {
+                    context.setRecordContext(finalEntry.entry().context());
+                    wrapped().put(binaryKey, finalEntry.newValue(), 
windowStartTimestamp);
+
+                    // Only forward for base key to avoid forwarding multiple 
times for index
+                    if (isBaseKey) {
+                        flushListener.apply(
+                            new Record<>(
+                                WindowKeySchema.toStoreKeyBinary(binaryKey, 
windowStartTimestamp, 0)
+                                    .get(),
+                                new Change<>(rawNewValue, sendOldValues ? 
rawOldValue : null),
+                                finalEntry.entry().context().timestamp(),
+                                finalEntry.entry().context().headers()));
+                    }
+                } finally {
+                    context.setRecordContext(current);
+                }
+            }
+        } else {
+            final ProcessorRecordContext current = context.recordContext();
+            try {
+                context.setRecordContext(finalEntry.entry().context());
+                wrapped().put(binaryKey, finalEntry.newValue(), 
windowStartTimestamp);
+            } finally {
+                context.setRecordContext(current);
+            }
+        }
+    }
+
+    @Override
+    public boolean setFlushListener(final CacheFlushListener<byte[], byte[]> 
flushListener,
+                                    final boolean sendOldValues) {
+        this.flushListener = flushListener;
+        this.sendOldValues = sendOldValues;
+
+        return true;
+    }
+
+    private Bytes indexKeyToBaseKey(final Bytes indexKey) {
+        final byte[] key = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
+        final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
+        final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
+        return TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, 
seqnum);
+    }
+
+    @Override
+    public synchronized void put(final Bytes key,
+                                 final byte[] value,
+                                 final long windowStartTimestamp) {
+        // since this function may not access the underlying inner store, we 
need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final Bytes baseKeyBytes = 
TimeFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
+        final LRUCacheEntry entry =
+            new LRUCacheEntry(
+                value,
+                context.headers(),
+                true,
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
+                context.topic());
+
+        // Put to index first so that base can be evicted later
+        if (hasIndex) {
+            // Important: put base key first to avoid the situation that if we 
put index first,
+            // it could be evicted when we are putting base key. In that case, 
base key is not yet
+            // in cache so we can't store key/value to store when index is 
evicted. Then if we fetch
+            // using index, we can't find it in either store or cache
+            context.cache().put(cacheName, 
baseKeyCacheFunction.cacheKey(baseKeyBytes), entry);
+            final LRUCacheEntry emptyEntry =
+                new LRUCacheEntry(
+                    new byte[0],
+                    new RecordHeaders(),
+                    true,
+                    context.offset(),
+                    context.timestamp(),
+                    context.partition(),
+                    "");
+            final Bytes indexKey = 
KeyFirstWindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, 0);
+            context.cache().put(cacheName, 
indexKeyCacheFunction.cacheKey(indexKey), emptyEntry);
+        } else {
+            context.cache().put(cacheName, 
baseKeyCacheFunction.cacheKey(baseKeyBytes), entry);
+        }
+        maxObservedTimestamp.set(Math.max(windowStartTimestamp, 
maxObservedTimestamp.get()));
+    }
+
+    @Override
+    public byte[] fetch(final Bytes key,
+                        final long timestamp) {
+        validateStoreOpen();
+        if (context.cache() == null) {
+            return wrapped().fetch(key, timestamp);
+        }
+
+        final Bytes baseBytesKey = 
TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, 0);
+        final Bytes cacheKey = baseKeyCacheFunction.cacheKey(baseBytesKey);
+
+        final LRUCacheEntry entry = context.cache().get(cacheName, cacheKey);
+        if (entry == null) {
+            return wrapped().fetch(key, timestamp);
+        } else {
+            return entry.value();
+        }
+    }
+
+    @Override
+    public synchronized WindowStoreIterator<byte[]> fetch(final Bytes key,
+                                                          final long timeFrom,
+                                                          final long timeTo) {
+        // since this function may not access the underlying inner store, we 
need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final WindowStoreIterator<byte[]> underlyingIterator = 
wrapped().fetch(key, timeFrom, timeTo);
+        if (context.cache() == null) {
+            return underlyingIterator;
+        }
+
+        return fetchInternal(underlyingIterator, key, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public synchronized WindowStoreIterator<byte[]> backwardFetch(final Bytes 
key,
+                                                                  final long 
timeFrom,
+                                                                  final long 
timeTo) {
+        // since this function may not access the underlying inner store, we 
need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final WindowStoreIterator<byte[]> underlyingIterator = 
wrapped().backwardFetch(key, timeFrom, timeTo);
+        if (context.cache() == null) {
+            return underlyingIterator;
+        }
+
+        return fetchInternal(underlyingIterator, key, timeFrom, timeTo, false);
+    }
+
+    private WindowStoreIterator<byte[]> fetchInternal(final 
WindowStoreIterator<byte[]> underlyingIterator,
+                                                      final Bytes key,
+                                                      final long timeFrom,
+                                                      final long timeTo,
+                                                      final boolean forward) {
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
new CacheIteratorWrapper(
+            key, timeFrom, timeTo, forward, hasIndex);
+        final KeySchema keySchema = hasIndex ? indexKeySchema : baseKeySchema;
+        final SegmentedCacheFunction cacheFunction = hasIndex ? 
indexKeyCacheFunction : baseKeyCacheFunction;
+        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(key, key, timeFrom, timeTo, forward);
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
+
+        final Function<byte[], Long> tsExtractor = hasIndex ? 
KeyFirstWindowKeySchema::extractStoreTimestamp
+            : TimeFirstWindowKeySchema::extractStoreTimestamp;
+        return new MergedSortedCacheWindowStoreIterator(filteredCacheIterator, 
underlyingIterator, forward, tsExtractor);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom,
+                                                           final Bytes keyTo,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. " +
+                "This may be due to range arguments set in the wrong order, " +
+                "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this 
for negative numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
+        // since this function may not access the underlying inner store, we 
need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
+            wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
+        if (context.cache() == null) {
+            return underlyingIterator;
+        }
+
+        return fetchKeyRange(underlyingIterator, keyFrom, keyTo, timeFrom, 
timeTo, true);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(final Bytes 
keyFrom,
+                                                                   final Bytes 
keyTo,
+                                                                   final long 
timeFrom,
+                                                                   final long 
timeTo) {
+        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
+            LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. "
+                + "This may be due to serdes that don't preserve ordering when 
lexicographically comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this 
for negative numbers");
+            return KeyValueIterators.emptyIterator();
+        }
+
+        // since this function may not access the underlying inner store, we 
need to validate
+        // if store is open outside as well.
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator =
+            wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
+        if (context.cache() == null) {
+            return underlyingIterator;
+        }
+
+        return fetchKeyRange(underlyingIterator, keyFrom, keyTo, timeFrom, 
timeTo, false);
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> fetchKeyRange(final 
KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator,
+                                                                    final 
Bytes keyFrom,
+                                                                    final 
Bytes keyTo,
+                                                                    final long 
timeFrom,
+                                                                    final long 
timeTo,
+                                                                    final 
boolean forward) {
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
new CacheIteratorWrapper(
+            keyFrom, keyTo, timeFrom, timeTo, forward, hasIndex);
+
+        final KeySchema keySchema = hasIndex ? indexKeySchema : baseKeySchema;
+        final HasNextCondition hasNextCondition = 
keySchema.hasNextCondition(keyFrom, keyTo, timeFrom, timeTo, forward);
+        final SegmentedCacheFunction cacheFunction = hasIndex ? 
indexKeyCacheFunction : baseKeyCacheFunction;
+
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, 
cacheFunction);
+        final StoreKeyToWindowKey storeKeyToWindowKey = hasIndex ? 
KeyFirstWindowKeySchema::fromStoreKey : TimeFirstWindowKeySchema::fromStoreKey;
+        final WindowKeyToBytes windowKeyToBytes = hasIndex ? 
KeyFirstWindowKeySchema::toStoreKeyBinary : 
TimeFirstWindowKeySchema::toStoreKeyBinary;
+
+        return new MergedSortedCacheWindowStoreKeyValueIterator(
+            filteredCacheIterator,
+            underlyingIterator,
+            bytesSerdes,
+            windowSize,
+            cacheFunction,
+            forward,
+            storeKeyToWindowKey,
+            windowKeyToBytes
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long 
timeFrom,
+                                                              final long 
timeTo) {
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().fetchAll(timeFrom, timeTo);
+        return fetchAllInternal(underlyingIterator, timeFrom, timeTo, true);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final 
long timeFrom,
+                                                                      final 
long timeTo) {
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().backwardFetchAll(timeFrom, timeTo);
+        return fetchAllInternal(underlyingIterator, timeFrom, timeTo, false);
+    }
+
+    private KeyValueIterator<Windowed<Bytes>, byte[]> fetchAllInternal(final 
KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator,
+                                                                       final 
long timeFrom,
+                                                                       final 
long timeTo,
+                                                                       final 
boolean forward) {
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> cacheIterator = 
new CacheIteratorWrapper(
+            null, null, timeFrom, timeTo, forward, false);
+        final HasNextCondition hasNextCondition = 
baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, forward);
+        final PeekingKeyValueIterator<Bytes, LRUCacheEntry> 
filteredCacheIterator =
+            new FilteredCacheIterator(cacheIterator, hasNextCondition, 
baseKeyCacheFunction);
+
+        final StoreKeyToWindowKey storeKeyToWindowKey = 
TimeFirstWindowKeySchema::fromStoreKey;
+        final WindowKeyToBytes windowKeyToBytes = 
TimeFirstWindowKeySchema::toStoreKeyBinary;
+
+        return new MergedSortedCacheWindowStoreKeyValueIterator(
+            filteredCacheIterator,
+            underlyingIterator,
+            bytesSerdes,
+            windowSize,
+            baseKeyCacheFunction,
+            forward,
+            storeKeyToWindowKey,
+            windowKeyToBytes
+        );
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().all();
+        return fetchAllInternal(underlyingIterator, 0, Long.MAX_VALUE, true);
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
+        validateStoreOpen();
+
+        final KeyValueIterator<Windowed<Bytes>, byte[]> underlyingIterator = 
wrapped().backwardAll();
+        return fetchAllInternal(underlyingIterator, 0, Long.MAX_VALUE, false);
+    }
+
+    @Override
+    public synchronized void flush() {
+        context.cache().flush(cacheName);
+        wrapped().flush();
+    }
+
+    @Override
+    public void flushCache() {
+        context.cache().flush(cacheName);
+    }
+
+    @Override
+    public synchronized void close() {
+        final LinkedList<RuntimeException> suppressed = executeAll(
+            () -> context.cache().flush(cacheName),
+            () -> context.cache().close(cacheName),
+            wrapped()::close
+        );
+        if (!suppressed.isEmpty()) {
+            throwSuppressed("Caught an exception while closing caching window 
store for store " + name(),
+                suppressed);
+        }
+    }
+
+
+    private class CacheIteratorWrapper implements 
PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
+
+        private final long segmentInterval;
+        private final Bytes keyFrom;
+        private final Bytes keyTo;
+        private final long timeTo;
+        private final boolean forward;
+        private final boolean useIndex; // If we are iterating from index
+
+        private long lastSegmentId;
+        private long currentSegmentId;
+        private Bytes cacheKeyFrom;
+        private Bytes cacheKeyTo;
+        private LRUCacheEntry cachedBaseValue;
+        private final SegmentedCacheFunction cacheFunction;
+
+        private ThreadCache.MemoryLRUCacheBytesIterator current;
+
+        private CacheIteratorWrapper(final Bytes key,
+                                     final long timeFrom,
+                                     final long timeTo,
+                                     final boolean forward,
+                                     final boolean index) {
+            this(key, key, timeFrom, timeTo, forward, index);
+        }
+
+        private CacheIteratorWrapper(final Bytes keyFrom,
+                                     final Bytes keyTo,
+                                     final long timeFrom,
+                                     final long timeTo,
+                                     final boolean forward,
+                                     final boolean index) {
+            this.keyFrom = keyFrom;
+            this.keyTo = keyTo;
+            this.timeTo = timeTo;
+            this.forward = forward;
+            this.useIndex = index;
+
+            cacheFunction = index ? indexKeyCacheFunction : 
baseKeyCacheFunction;
+
+            this.segmentInterval = cacheFunction.getSegmentInterval();
+
+            if (forward) {
+                this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, 
maxObservedTimestamp.get()));
+                this.currentSegmentId = cacheFunction.segmentId(timeFrom);
+
+                setCacheKeyRange(timeFrom, currentSegmentLastTime());
+                this.current = context.cache().range(cacheName, cacheKeyFrom, 
cacheKeyTo);
+            } else {
+                this.currentSegmentId = 
cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get()));
+                this.lastSegmentId = cacheFunction.segmentId(timeFrom);
+
+                setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, 
maxObservedTimestamp.get()));
+                this.current = context.cache().reverseRange(cacheName, 
cacheKeyFrom, cacheKeyTo);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (current == null) {
+                return false;
+            }
+
+            if (useIndex) {
+                do {
+                    // If iterating from index, need to make sure base 
key/value exist in cache
+                    while (current.hasNext()) {
+                        final Bytes cacheIndexKey = current.peekNextKey();
+                        final Bytes indexKey = 
indexKeyCacheFunction.key(cacheIndexKey);
+                        final Bytes baseKey = indexKeyToBaseKey(indexKey);
+                        final Bytes cachedBaseKey = 
baseKeyCacheFunction.cacheKey(baseKey);
+                        cachedBaseValue = context.cache().get(cacheName, 
cachedBaseKey);
+                        if (cachedBaseValue != null) {
+                            return true;
+                        }
+                        current.next();

Review Comment:
   My understanding is that base value would be not available only if the 
previous put failed on base but succeeded in index, right? In that case would 
the index store's size keep increasing over time?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java:
##########
@@ -0,0 +1,1245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Collection;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.KeyFirstWindowKeySchema;
+import 
org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas.TimeFirstWindowKeySchema;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static java.time.Duration.ofHours;
+import static java.time.Duration.ofMinutes;
+import static java.time.Instant.ofEpochMilli;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static 
org.apache.kafka.test.StreamsTestUtils.verifyAllWindowedKeyValues;
+import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
+import static org.apache.kafka.test.StreamsTestUtils.verifyWindowedKeyValue;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TimeOrderedCachingPersistentWindowStoreTest {
+
+    private static final int MAX_CACHE_SIZE_BYTES = 300;
+    private static final long DEFAULT_TIMESTAMP = 10L;
+    private static final Long WINDOW_SIZE = 10L;
+    private static final long SEGMENT_INTERVAL = 100L;
+    private final static String TOPIC = "topic";
+    private static final String CACHE_NAMESPACE = "0_0-store-name";
+
+    private InternalMockProcessorContext context;
+    private RocksDBTimeOrderedSegmentedBytesStore bytesStore;
+    private WindowStore<Bytes, byte[]> underlyingStore;
+    private TimeOrderedCachingWindowStore cachingStore;
+    private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
+    private ThreadCache cache;
+    private TimeFirstWindowKeySchema baseKeySchema;
+
+    @Parameter
+    public boolean hasIndex;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> data() {
+        return asList(new Object[][] {
+            {true},
+            {false}
+        });
+    }
+
+    @Before
+    public void setUp() {
+        baseKeySchema = new TimeFirstWindowKeySchema();
+        bytesStore = new RocksDBTimeOrderedSegmentedBytesStore("test", 
"metrics-scope", 100, SEGMENT_INTERVAL, hasIndex);
+        underlyingStore = new RocksDBTimeOrderedWindowStore(bytesStore, false, 
WINDOW_SIZE);
+        final TimeWindowedDeserializer<String> keyDeserializer = new 
TimeWindowedDeserializer<>(new StringDeserializer(), WINDOW_SIZE);
+        keyDeserializer.setIsChangelogTopic(true);
+        cacheListener = new CacheFlushListenerStub<>(keyDeserializer, new 
StringDeserializer());
+        cachingStore = new TimeOrderedCachingWindowStore(underlyingStore, 
WINDOW_SIZE, SEGMENT_INTERVAL);
+        cachingStore.setFlushListener(cacheListener, false);
+        cache = new ThreadCache(new LogContext("testCache "), 
MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
+        context = new 
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null, 
cache);
+        context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 
0, 0, TOPIC, new RecordHeaders()));
+        cachingStore.init((StateStoreContext) context, cachingStore);
+    }
+
+    @After
+    public void closeStore() {
+        cachingStore.close();
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {
+        final RocksDBTimeOrderedWindowStore inner = 
EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
+        EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
+        EasyMock.replay(inner);
+        final TimeOrderedCachingWindowStore outer = new 
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+
+        EasyMock.reset(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((ProcessorContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((ProcessorContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldDelegateInit() {
+        final RocksDBTimeOrderedWindowStore inner = 
EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
+        EasyMock.expect(inner.hasIndex()).andReturn(hasIndex);
+        EasyMock.replay(inner);
+        final TimeOrderedCachingWindowStore outer = new 
TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL);
+
+        EasyMock.reset(inner);
+        EasyMock.expect(inner.name()).andStubReturn("store");
+        inner.init((StateStoreContext) context, outer);
+        EasyMock.expectLastCall();
+        EasyMock.replay(inner);
+        outer.init((StateStoreContext) context, outer);
+        EasyMock.verify(inner);
+    }
+
+    @Test
+    public void shouldThrowIfWrongStore() {
+        final RocksDBTimestampedWindowStore innerWrong = 
EasyMock.mock(RocksDBTimestampedWindowStore.class);
+        final Exception e = assertThrows(IllegalArgumentException.class,
+            () -> new TimeOrderedCachingWindowStore(innerWrong, WINDOW_SIZE, 
SEGMENT_INTERVAL));
+        assertThat(e.getMessage(),
+            containsString("TimeOrderedCachingWindowStore only supports 
RocksDBTimeOrderedWindowStore backed store"));
+
+        final RocksDBTimeOrderedWindowStore inner = 
EasyMock.mock(RocksDBTimeOrderedWindowStore.class);
+        // Nothing happens
+        new TimeOrderedCachingWindowStore(inner, WINDOW_SIZE, 
SEGMENT_INTERVAL);
+    }
+
+    @Test
+    public void shouldNotReturnDuplicatesInRanges() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final StoreBuilder<TimestampedWindowStore<String, String>> 
storeBuilder = Stores.timestampedWindowStoreBuilder(
+            RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                "store-name",
+                ofHours(1L),
+                ofMinutes(1),
+                false,
+                hasIndex
+            ), Serdes.String(), Serdes.String())
+            .withCachingEnabled();
+
+        builder.addStateStore(storeBuilder);
+
+        builder.stream(TOPIC,
+            Consumed.with(Serdes.String(), Serdes.String()))
+            .transform(() -> new Transformer<String, String, KeyValue<String, 
String>>() {
+                private WindowStore<String, ValueAndTimestamp<String>> store;
+                private int numRecordsProcessed;
+                private ProcessorContext context;
+
+                @SuppressWarnings("unchecked")
+                @Override
+                public void init(final ProcessorContext processorContext) {
+                    this.context = processorContext;
+                    this.store = processorContext.getStateStore("store-name");
+                    int count = 0;
+
+                    try (final KeyValueIterator<Windowed<String>, 
ValueAndTimestamp<String>> all = store.all()) {
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
+                    }
+
+                    assertThat(count, equalTo(0));
+                }
+
+                @Override
+                public KeyValue<String, String> transform(final String key, 
final String value) {
+                    int count = 0;
+
+                    try (final KeyValueIterator<Windowed<String>, 
ValueAndTimestamp<String>> all = store.all()) {
+                        while (all.hasNext()) {
+                            count++;
+                            all.next();
+                        }
+                    }
+
+                    assertThat(count, equalTo(numRecordsProcessed));
+
+                    store.put(value, ValueAndTimestamp.make(value, 
context.timestamp()), context.timestamp());
+
+                    numRecordsProcessed++;
+
+                    return new KeyValue<>(key, value);
+                }
+
+                @Override
+                public void close() {
+                }
+            }, "store-name");
+
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 
1000L);
+
+        final Instant initialWallClockTime = Instant.ofEpochMilli(0L);
+        final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), streamsConfiguration, initialWallClockTime);
+
+        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(TOPIC,
+            Serdes.String().serializer(),
+            Serdes.String().serializer(),
+            initialWallClockTime,
+            Duration.ZERO);
+
+        for (int i = 0; i < 5; i++) {
+            inputTopic.pipeInput(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        }
+        driver.advanceWallClockTime(Duration.ofSeconds(10));
+        inputTopic.advanceTime(Duration.ofSeconds(10));
+        for (int i = 0; i < 5; i++) {
+            inputTopic.pipeInput(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        }
+        driver.advanceWallClockTime(Duration.ofSeconds(10));
+        inputTopic.advanceTime(Duration.ofSeconds(10));
+        for (int i = 0; i < 5; i++) {
+            inputTopic.pipeInput(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        }
+        driver.advanceWallClockTime(Duration.ofSeconds(10));
+        inputTopic.advanceTime(Duration.ofSeconds(10));
+        for (int i = 0; i < 5; i++) {
+            inputTopic.pipeInput(UUID.randomUUID().toString(), 
UUID.randomUUID().toString());
+        }
+
+        driver.close();
+    }
+
+    @Test
+    public void shouldPutFetchFromCache() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+
+        assertThat(cachingStore.fetch(bytesKey("a"), 10), 
equalTo(bytesValue("a")));
+        assertThat(cachingStore.fetch(bytesKey("b"), 10), 
equalTo(bytesValue("b")));
+        assertThat(cachingStore.fetch(bytesKey("c"), 10), equalTo(null));
+        assertThat(cachingStore.fetch(bytesKey("a"), 0), equalTo(null));
+
+        try (final WindowStoreIterator<byte[]> a = 
cachingStore.fetch(bytesKey("a"), ofEpochMilli(10), ofEpochMilli(10));
+             final WindowStoreIterator<byte[]> b = 
cachingStore.fetch(bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10))) {
+            verifyKeyValue(a.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(b.next(), DEFAULT_TIMESTAMP, "b");
+            assertFalse(a.hasNext());
+            assertFalse(b.hasNext());
+            final int expectedSize = hasIndex ? 4 : 2;
+            assertEquals(expectedSize, cache.size());
+        }
+    }
+
+    @Test
+    public void shouldMatchPositionAfterPutWithFlushListener() {
+        cachingStore.setFlushListener(record -> { }, false);
+        shouldMatchPositionAfterPut();
+    }
+
+    @Test
+    public void shouldMatchPositionAfterPutWithoutFlushListener() {
+        cachingStore.setFlushListener(null, false);
+        shouldMatchPositionAfterPut();
+    }
+
+    private void shouldMatchPositionAfterPut() {
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new 
RecordHeaders()));
+        cachingStore.put(bytesKey("key1"), bytesValue("value1"), 
DEFAULT_TIMESTAMP);
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new 
RecordHeaders()));
+        cachingStore.put(bytesKey("key2"), bytesValue("value2"), 
DEFAULT_TIMESTAMP);
+
+        // Position should correspond to the last record's context, not the 
current context.
+        context.setRecordContext(
+            new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders())
+        );
+
+        // the caching window store doesn't maintain a separate
+        // position because it never serves queries from the cache
+        assertEquals(Position.emptyPosition(), cachingStore.getPosition());
+        assertEquals(Position.emptyPosition(), underlyingStore.getPosition());
+
+        cachingStore.flush();
+
+        assertEquals(
+            Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
+            cachingStore.getPosition()
+        );
+        assertEquals(
+            Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 2L))))),
+            underlyingStore.getPosition()
+        );
+    }
+
+    private void verifyKeyValue(final KeyValue<Long, byte[]> next,
+                                final long expectedKey,
+                                final String expectedValue) {
+        assertThat(next.key, equalTo(expectedKey));
+        assertThat(next.value, equalTo(bytesValue(expectedValue)));
+    }
+
+    private static byte[] bytesValue(final String value) {
+        return value.getBytes();
+    }
+
+    private static Bytes bytesKey(final String key) {
+        return Bytes.wrap(key.getBytes());
+    }
+
+    private String stringFrom(final byte[] from) {
+        return Serdes.String().deserializer().deserialize("", from);
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCache() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(bytesKey("a"), bytesKey("b"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+            final int expectedSize = hasIndex ? 4 : 2;
+            assertEquals(expectedSize, cache.size());
+        }
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(null, bytesKey("d"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b", "c", 
"d");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(bytesKey("b"), null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("b", "c", "d", 
"e");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetch(null, null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("a", "b", "c", 
"d", "e");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(null, bytesKey("c"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("c", "b", "a");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(bytesKey("c"), null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("e", "d", "c");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP + 
10L);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP + 
20L);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP + 
20L);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetch(null, null, 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 20L))) {
+            final List<Windowed<Bytes>> expectedKeys = Arrays.asList(
+                new Windowed<>(bytesKey("e"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("d"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 20L, DEFAULT_TIMESTAMP + 20L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("c"), new TimeWindow(DEFAULT_TIMESTAMP 
+ 10L, DEFAULT_TIMESTAMP + 10L + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("b"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                new Windowed<>(bytesKey("a"), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE))
+            );
+
+            final List<String> expectedValues = Arrays.asList("e", "d", "c", 
"b", "a");
+
+            verifyAllWindowedKeyValues(iterator, expectedKeys, expectedValues);
+        }
+    }
+
+    @Test
+    public void shouldGetAllFromCache() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.all()) {
+            final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+            for (final String s : array) {
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(s), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                    s);
+            }
+            assertFalse(iterator.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldGetAllBackwardFromCache() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = 
cachingStore.backwardAll()) {
+            final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
+            for (final String s : array) {
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(s), new 
TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                    s);
+            }
+            assertFalse(iterator.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldFetchAllWithinTimestampRange() {
+        final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        for (int i = 0; i < array.length; i++) {
+            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.fetchAll(ofEpochMilli(0), ofEpochMilli(7))) {
+            for (int i = 0; i < array.length; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator.hasNext());
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+                 cachingStore.fetchAll(ofEpochMilli(2), ofEpochMilli(4))) {
+            for (int i = 2; i <= 4; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator1.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator1.hasNext());
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+                 cachingStore.fetchAll(ofEpochMilli(5), ofEpochMilli(7))) {
+            for (int i = 5; i <= 7; i++) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator2.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator2.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldFetchAllBackwardWithinTimestampRange() {
+        final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
+        for (int i = 0; i < array.length; i++) {
+            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
+                 cachingStore.backwardFetchAll(ofEpochMilli(0), 
ofEpochMilli(7))) {
+            for (int i = array.length - 1; i >= 0; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator.hasNext());
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator1 =
+                 cachingStore.backwardFetchAll(ofEpochMilli(2), 
ofEpochMilli(4))) {
+            for (int i = 4; i >= 2; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator1.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator1.hasNext());
+        }
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> iterator2 =
+                 cachingStore.backwardFetchAll(ofEpochMilli(5), 
ofEpochMilli(7))) {
+            for (int i = 7; i >= 5; i--) {
+                final String str = array[i];
+                verifyWindowedKeyValue(
+                    iterator2.next(),
+                    new Windowed<>(bytesKey(str), new TimeWindow(i, i + 
WINDOW_SIZE)),
+                    str);
+            }
+            assertFalse(iterator2.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
+        final int added = addItemsToCache();
+        // all dirty entries should have been flushed
+        try (final KeyValueIterator<Bytes, byte[]> iter = bytesStore.fetch(
+            Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)),
+            DEFAULT_TIMESTAMP,
+            DEFAULT_TIMESTAMP)) {
+            final KeyValue<Bytes, byte[]> next = iter.next();
+            assertEquals(DEFAULT_TIMESTAMP, 
baseKeySchema.segmentTimestamp(next.key));
+            assertArrayEquals("0".getBytes(), next.value);
+            assertFalse(iter.hasNext());
+            assertEquals(added - 1, cache.size());
+        }
+    }
+
+    @Test
+    public void shouldForwardDirtyItemsWhenFlushCalled() {
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+    }
+
+    @Test
+    public void shouldSetFlushListener() {
+        assertTrue(cachingStore.setFlushListener(null, true));
+        assertTrue(cachingStore.setFlushListener(null, false));
+    }
+
+    @Test
+    public void shouldForwardOldValuesWhenEnabled() {
+        cachingStore.setFlushListener(cacheListener, true);
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey).newValue);
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey));
+        cacheListener.forwarded.clear();
+    }
+
+    @Test
+    public void shouldForwardOldValuesWhenDisabled() {
+        final Windowed<String> windowedKey =
+            new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey).newValue);
+        assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
+        cacheListener.forwarded.clear();
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        assertNull(cacheListener.forwarded.get(windowedKey));
+        cacheListener.forwarded.clear();
+    }
+
+    @Test
+    public void shouldForwardDirtyItemToListenerWhenEvicted() {
+        final int numRecords = addItemsToCache();
+        assertEquals(numRecords, cacheListener.forwarded.size());
+    }
+
+    @Test
+    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() {
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.flush();
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "b");
+            assertFalse(fetch.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateAcrossWindows() {
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            assertFalse(fetch.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateBackwardAcrossWindows() {
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            assertFalse(fetch.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateCacheAndStore() {
+        final Bytes key = Bytes.wrap("1".getBytes());
+        bytesStore.put(TimeFirstWindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
+        cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.fetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            assertFalse(fetch.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateBackwardCacheAndStore() {
+        final Bytes key = Bytes.wrap("1".getBytes());
+        bytesStore.put(TimeFirstWindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
+        cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+        try (final WindowStoreIterator<byte[]> fetch =
+                 cachingStore.backwardFetch(bytesKey("1"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE, "b");
+            verifyKeyValue(fetch.next(), DEFAULT_TIMESTAMP, "a");
+            assertFalse(fetch.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateCacheAndStoreKeyRange() {
+        final Bytes key = Bytes.wrap("1".getBytes());
+        bytesStore.put(TimeFirstWindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
+        cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+                 cachingStore.fetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "a");
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+                "b");
+            assertFalse(fetchRange.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldIterateBackwardCacheAndStoreKeyRange() {
+        final Bytes key = Bytes.wrap("1".getBytes());
+        bytesStore.put(TimeFirstWindowKeySchema.toStoreKeyBinary(key, 
DEFAULT_TIMESTAMP, 0), "a".getBytes());
+        cachingStore.put(key, bytesValue("b"), DEFAULT_TIMESTAMP + 
WINDOW_SIZE);
+
+        try (final KeyValueIterator<Windowed<Bytes>, byte[]> fetchRange =
+                 cachingStore.backwardFetch(key, bytesKey("2"), 
ofEpochMilli(DEFAULT_TIMESTAMP), ofEpochMilli(DEFAULT_TIMESTAMP + 
WINDOW_SIZE))) {
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP + 
WINDOW_SIZE, DEFAULT_TIMESTAMP + WINDOW_SIZE + WINDOW_SIZE)),
+                "b");
+            verifyWindowedKeyValue(
+                fetchRange.next(),
+                new Windowed<>(key, new TimeWindow(DEFAULT_TIMESTAMP, 
DEFAULT_TIMESTAMP + WINDOW_SIZE)),
+                "a");
+            assertFalse(fetchRange.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldClearNamespaceCacheOnClose() {
+        cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
+        final int size = hasIndex ? 2 : 1;
+        assertEquals(size, cache.size());
+        cachingStore.close();
+        assertEquals(0, cache.size());
+    }
+
+    @Test
+    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
+        cachingStore.close();
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.fetch(bytesKey("a"), ofEpochMilli(0), ofEpochMilli(10)));
+    }
+
+    @Test
+    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() {
+        cachingStore.close();
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(0), 
ofEpochMilli(10)));
+    }
+
+    @Test
+    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
+        cachingStore.close();
+        assertThrows(InvalidStateStoreException.class, () -> 
cachingStore.put(bytesKey("a"), bytesValue("a"), 0L));
+    }
+
+    @Test
+    public void shouldSkipNonExistBaseKeyInCache() {
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
+
+        final SegmentedCacheFunction baseCacheFunction = new 
SegmentedCacheFunction(new TimeFirstWindowKeySchema(), SEGMENT_INTERVAL);
+        final SegmentedCacheFunction indexCacheFunction = new 
SegmentedCacheFunction(new KeyFirstWindowKeySchema(), SEGMENT_INTERVAL);
+
+        final Bytes key = bytesKey("a");
+        final byte[] value = bytesValue("0001");
+        final Bytes cacheIndexKey = 
indexCacheFunction.cacheKey(KeyFirstWindowKeySchema.toStoreKeyBinary(key, 1, 
0));
+        final String cacheName = context.taskId() + "-test";
+
+        // Only put index to store
+        cache.put(cacheName,
+            cacheIndexKey,
+            new LRUCacheEntry(
+                new byte[0],
+                new RecordHeaders(),
+                true,
+                context.offset(),
+                context.timestamp(),
+                context.partition(),
+                "")
+        );
+
+        underlyingStore.put(key, value, 1);
+
+        if (hasIndex) {
+            verifyKeyValueList(
+                asList(
+                    windowedPair("a", "0001", 1),
+                    windowedPair("aa", "0002", 0)
+                ),
+                toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 
ofEpochMilli(0),
+                    ofEpochMilli(Long.MAX_VALUE)))
+            );
+        } else {
+            verifyKeyValueList(
+                asList(
+                    windowedPair("aa", "0002", 0),
+                    windowedPair("a", "0001", 1)
+                ),
+                toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 
ofEpochMilli(0),
+                    ofEpochMilli(Long.MAX_VALUE)))
+            );
+        }
+    }
+
+    @Test
+    public void shouldFetchAndIterateOverExactKeys() {
+        cachingStore.put(bytesKey("a"), bytesValue("0001"), 0);
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
+        cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
+        cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+        final List<KeyValue<Long, byte[]>> expected = asList(
+            KeyValue.pair(0L, bytesValue("0001")),
+            KeyValue.pair(1L, bytesValue("0003")),
+            KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005"))
+        );
+        final List<KeyValue<Long, byte[]>> actual =
+            toList(cachingStore.fetch(bytesKey("a"), ofEpochMilli(0), 
ofEpochMilli(Long.MAX_VALUE)));
+        verifyKeyValueList(expected, actual);
+    }
+
+    @Test
+    public void shouldBackwardFetchAndIterateOverExactKeys() {
+        cachingStore.put(bytesKey("a"), bytesValue("0001"), 0);
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
+        cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
+        cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+        final List<KeyValue<Long, byte[]>> expected = asList(
+            KeyValue.pair(SEGMENT_INTERVAL, bytesValue("0005")),
+            KeyValue.pair(1L, bytesValue("0003")),
+            KeyValue.pair(0L, bytesValue("0001"))
+        );
+        final List<KeyValue<Long, byte[]>> actual =
+            toList(cachingStore.backwardFetch(bytesKey("a"), ofEpochMilli(0), 
ofEpochMilli(Long.MAX_VALUE)));
+        verifyKeyValueList(expected, actual);
+    }
+
+    @Test
+    public void shouldFetchAndIterateOverKeyRange() {
+        cachingStore.put(bytesKey("a"), bytesValue("0001"), 0);
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
+        cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
+        cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+        verifyKeyValueList(
+            asList(
+                windowedPair("a", "0001", 0),
+                windowedPair("a", "0003", 1),
+                windowedPair("a", "0005", SEGMENT_INTERVAL)
+            ),
+            toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 
ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)))
+        );
+
+        verifyKeyValueList(
+            asList(
+                windowedPair("aa", "0002", 0),
+                windowedPair("aa", "0004", 1)),
+            toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 
ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)))
+        );
+
+        if (hasIndex) {
+            verifyKeyValueList(
+                asList(
+                    windowedPair("a", "0001", 0),
+                    windowedPair("a", "0003", 1),
+                    windowedPair("aa", "0002", 0),
+                    windowedPair("aa", "0004", 1),
+                    windowedPair("a", "0005", SEGMENT_INTERVAL)
+                ),
+                toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 
ofEpochMilli(0),
+                    ofEpochMilli(Long.MAX_VALUE)))
+            );
+        } else {
+            verifyKeyValueList(
+                asList(
+                    windowedPair("a", "0001", 0),
+                    windowedPair("aa", "0002", 0),
+                    windowedPair("a", "0003", 1),
+                    windowedPair("aa", "0004", 1),
+                    windowedPair("a", "0005", SEGMENT_INTERVAL)
+                ),
+                toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 
ofEpochMilli(0),
+                    ofEpochMilli(Long.MAX_VALUE)))
+            );
+        }
+    }
+
+    @Test
+    public void shouldFetchAndIterateOverKeyBackwardRange() {
+        cachingStore.put(bytesKey("a"), bytesValue("0001"), 0);
+        cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0);
+        cachingStore.put(bytesKey("a"), bytesValue("0003"), 1);
+        cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
+        cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
+
+        verifyKeyValueList(
+            asList(
+                windowedPair("a", "0005", SEGMENT_INTERVAL),
+                windowedPair("a", "0003", 1),
+                windowedPair("a", "0001", 0)
+            ),
+            toList(cachingStore.backwardFetch(bytesKey("a"), bytesKey("a"), 
ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)))
+        );
+
+        verifyKeyValueList(
+            asList(
+                windowedPair("aa", "0004", 1),
+                windowedPair("aa", "0002", 0)),
+            toList(cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"), 
ofEpochMilli(0), ofEpochMilli(Long.MAX_VALUE)))
+        );
+
+        if (!hasIndex) {
+            verifyKeyValueList(
+                // Ordered by timestamp if has no index
+                asList(
+                    windowedPair("a", "0005", SEGMENT_INTERVAL),
+                    windowedPair("aa", "0004", 1),
+                    windowedPair("a", "0003", 1),
+                    windowedPair("aa", "0002", 0),
+                    windowedPair("a", "0001", 0)
+                ),
+                toList(cachingStore.backwardFetch(bytesKey("a"), 
bytesKey("aa"), ofEpochMilli(0),
+                    ofEpochMilli(Long.MAX_VALUE)))
+            );
+        } else {
+            verifyKeyValueList(
+                asList(
+                    windowedPair("a", "0005", SEGMENT_INTERVAL), // First 
because in larger segments

Review Comment:
   nit: we usually have the comment on top but not at the end of the code line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to