This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 579a4e0e2db KAFKA-20219: Headers store with caching enabled should 
reject queries (#21941)
579a4e0e2db is described below

commit 579a4e0e2db47f087b459bdc14268c443628d538
Author: Zheguang Zhao <[email protected]>
AuthorDate: Wed Apr 8 17:00:39 2026 +1000

    KAFKA-20219: Headers store with caching enabled should reject queries 
(#21941)
    
    For headers-store with caching enabled, we want to keep IQv2 disabled
    until we implement IQv2 across the board. Thus, we need to introduce
    CachingKeyValueStoreWithHeaders and just forward any query to the
    underlying store, instead of re-using the existing query implementation
    on CachingKeyValueStore.
    
    Reviewers: Matthias J. Sax <[email protected]>
    
    ---------
    
    Co-authored-by: Matthias J. Sax <[email protected]>
---
 .../internals/CachingKeyValueStoreWithHeaders.java | 43 +++++++++++
 ...TimestampedKeyValueStoreBuilderWithHeaders.java |  2 +-
 ...stampedKeyValueStoreBuilderWithHeadersTest.java | 90 ++++++++++++----------
 3 files changed, 92 insertions(+), 43 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java
new file mode 100644
index 00000000000..47c9c10c45c
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreWithHeaders.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+/**
+ * A caching key-value store with headers is a caching key-value store that 
only forwards the query to the 
+ * wrapped store.
+ */
+public class CachingKeyValueStoreWithHeaders extends CachingKeyValueStore {
+
+    CachingKeyValueStoreWithHeaders(final KeyValueStore<Bytes, byte[]> 
underlying) {
+        super(underlying, CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS);
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query,
+                                    final PositionBound positionBound,
+                                    final QueryConfig config) {
+        return wrapped().query(query, positionBound, config);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index 2401aded711..2c979d9c6b6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -90,7 +90,7 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
         if (!enableCaching) {
             return inner;
         }
-        return new CachingKeyValueStore(inner, 
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS);
+        return new CachingKeyValueStoreWithHeaders(inner);
     }
 
     private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final 
KeyValueStore<Bytes, byte[]> inner) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index 23e3c7e35d4..3c150da4f3d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -44,12 +44,15 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Properties;
 
@@ -59,6 +62,9 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -506,33 +512,56 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
         }
     }
 
-    @Test
-    public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore() {
+    private static ThreadCache mockCacheHit() {
+        final ThreadCache cache = mock(ThreadCache.class);
+        final LRUCacheEntry entry = mock(LRUCacheEntry.class);
+        final byte[] entryValue = 
"mockEntryValue".getBytes(StandardCharsets.UTF_8);
+        lenient().when(entry.value()).thenReturn(entryValue);
+        lenient().when(cache.get(any(String.class), 
any(Bytes.class))).thenReturn(entry);
+        return cache;
+    }
+
+    private TimestampedKeyValueStoreWithHeaders<String, String> 
headersStoreMaybeWithCache(final boolean cachingEnabled) {
         when(supplier.name()).thenReturn("test-store");
         when(supplier.metricsScope()).thenReturn("metricScope");
         when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
 
-        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
-            supplier,
+        final File dir = TestUtils.tempDirectory();
+        final ThreadCache cache = mockCacheHit();
+        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
+            dir,
             Serdes.String(),
             Serdes.String(),
-            new MockTime()
+            null,
+            cache
         );
 
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
-            .withLoggingDisabled()
-            .withCachingDisabled()
-            .build();
-
-        final File dir = TestUtils.tempDirectory();
-        final Properties props = StreamsTestUtils.getStreamsConfig();
-        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
-            dir,
+        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+            supplier,
             Serdes.String(),
             Serdes.String(),
-            new StreamsConfig(props)
+            new MockTime()
         );
+        
+        final TimestampedKeyValueStoreWithHeaders<String, String> store;
+        if (cachingEnabled) {
+            store = builder.withLoggingDisabled()
+                .withCachingEnabled()
+                .build();
+        } else {
+            store = builder.withLoggingDisabled()
+                .withCachingDisabled()
+                .build();
+        }
+
         store.init(context, store);
+        return store;
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldReturnUnknownQueryTypeForKeyQueryOnHeadersStore(final 
boolean cachingEnabled) {
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
headersStoreMaybeWithCache(cachingEnabled);
 
         try {
             final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new 
Bytes("test-key".getBytes()));
@@ -555,33 +584,10 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
         }
     }
 
-    @Test
-    public void shouldReturnUnknownQueryTypeForRangeQueryOnHeadersStore() {
-        when(supplier.name()).thenReturn("test-store");
-        when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new 
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
-
-        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
-            supplier,
-            Serdes.String(),
-            Serdes.String(),
-            new MockTime()
-        );
-
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
-            .withLoggingDisabled()
-            .withCachingDisabled()
-            .build();
-
-        final File dir = TestUtils.tempDirectory();
-        final Properties props = StreamsTestUtils.getStreamsConfig();
-        final InternalMockProcessorContext<String, String> context = new 
InternalMockProcessorContext<>(
-            dir,
-            Serdes.String(),
-            Serdes.String(),
-            new StreamsConfig(props)
-        );
-        store.init(context, store);
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldReturnUnknownQueryTypeForRangeQueryOnHeadersStore(final 
boolean cachingEnabled) {
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
headersStoreMaybeWithCache(cachingEnabled);
 
         try {
             final RangeQuery<Bytes, byte[]> query = RangeQuery.withRange(

Reply via email to