This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 1b2b38c4484 KAFKA-20326: WindowStoreMaterializerTests needs to get 
updated (#21825)
1b2b38c4484 is described below

commit 1b2b38c4484e4560362e8cebbf6d32a0e5f4356c
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 20 03:47:12 2026 +0000

    KAFKA-20326: WindowStoreMaterializerTests needs to get updated (#21825)
    
    `shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled` is
    incorrect – we removed a check to make it pass for now, but need to add
    back the correct assertion after the root cause was fixed.
    
    ref: https://github.com/apache/kafka/pull/21580/changes#r2921529513
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../kafka/streams/state/internals/TimeOrderedCachingWindowStore.java   | 3 +++
 .../state/internals/TimestampedToHeadersWindowStoreAdapter.java        | 2 +-
 .../state/internals/TimestampedWindowStoreWithHeadersBuilder.java      | 3 +++
 .../kafka/streams/processor/internals/WindowStoreMaterializerTest.java | 1 -
 4 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 4644f166cc1..aea172ea9cc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -104,6 +104,9 @@ public class TimeOrderedCachingWindowStore
         if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
             return (RocksDBTimeOrderedWindowStore<?>) wrapped;
         }
+        if (wrapped instanceof TimestampedToHeadersWindowStoreAdapter) {
+            return getWrappedStore(((TimestampedToHeadersWindowStoreAdapter) 
wrapped).store);
+        }
         if (wrapped instanceof WrappedStateStore) {
             return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>) 
wrapped).wrapped());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
index 738787b9c21..2b054532a6b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -58,7 +58,7 @@ import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFo
  * </ul>
  */
 public class TimestampedToHeadersWindowStoreAdapter implements 
WindowStore<Bytes, byte[]> {
-    private final WindowStore<Bytes, byte[]> store;
+    final WindowStore<Bytes, byte[]> store;
 
     public TimestampedToHeadersWindowStoreAdapter(final WindowStore<Bytes, 
byte[]> store) {
         if (!store.persistent()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 3714643d43b..4dac948e3f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,6 +115,9 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
         if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
             return true;
         }
+        if (stateStore instanceof TimestampedToHeadersWindowStoreAdapter) {
+            return 
isTimeOrderedStore(((TimestampedToHeadersWindowStoreAdapter) stateStore).store);
+        }
         if (stateStore instanceof WrappedStateStore) {
             return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>) 
stateStore).wrapped());
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index b118b0b2c31..1abe4beb6af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -228,7 +228,6 @@ public class WindowStoreMaterializerTest {
 
     @Test
     public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
-        
doReturn("headers").when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
         emitStrategy = EmitStrategy.onWindowClose();
 
         final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>> 
materialized =

Reply via email to