This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bba99521848 KAFKA-20326: WindowStoreMaterializerTests needs to get
updated (#21825)
bba99521848 is described below
commit bba9952184895988a59a341fa077670ff2c53145
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 20 03:47:12 2026 +0000
KAFKA-20326: WindowStoreMaterializerTests needs to get updated (#21825)
`shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled` is
incorrect – we removed a check to make it pass for now, but need to add
back the correct assertion after the root cause was fixed.
ref: https://github.com/apache/kafka/pull/21580/changes#r2921529513
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/state/internals/TimeOrderedCachingWindowStore.java | 3 +++
.../state/internals/TimestampedToHeadersWindowStoreAdapter.java | 2 +-
.../state/internals/TimestampedWindowStoreWithHeadersBuilder.java | 3 +++
.../kafka/streams/processor/internals/WindowStoreMaterializerTest.java | 1 -
4 files changed, 7 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 4644f166cc1..aea172ea9cc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -104,6 +104,9 @@ public class TimeOrderedCachingWindowStore
if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
return (RocksDBTimeOrderedWindowStore<?>) wrapped;
}
+ if (wrapped instanceof TimestampedToHeadersWindowStoreAdapter) {
+ return getWrappedStore(((TimestampedToHeadersWindowStoreAdapter)
wrapped).store);
+ }
if (wrapped instanceof WrappedStateStore) {
return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>)
wrapped).wrapped());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
index 738787b9c21..2b054532a6b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -58,7 +58,7 @@ import static
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFo
* </ul>
*/
public class TimestampedToHeadersWindowStoreAdapter implements
WindowStore<Bytes, byte[]> {
- private final WindowStore<Bytes, byte[]> store;
+ final WindowStore<Bytes, byte[]> store;
public TimestampedToHeadersWindowStoreAdapter(final WindowStore<Bytes,
byte[]> store) {
if (!store.persistent()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 3714643d43b..4dac948e3f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -115,6 +115,9 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
+ if (stateStore instanceof TimestampedToHeadersWindowStoreAdapter) {
+ return
isTimeOrderedStore(((TimestampedToHeadersWindowStoreAdapter) stateStore).store);
+ }
if (stateStore instanceof WrappedStateStore) {
return isTimeOrderedStore(((WrappedStateStore<?, ?, ?>)
stateStore).wrapped());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
index b118b0b2c31..1abe4beb6af 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WindowStoreMaterializerTest.java
@@ -228,7 +228,6 @@ public class WindowStoreMaterializerTest {
@Test
public void shouldCreateHeadersStoreWithOnWindowCloseAndCachingEnabled() {
-
doReturn("headers").when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
emitStrategy = EmitStrategy.onWindowClose();
final MaterializedInternal<String, String, WindowStore<Bytes, byte[]>>
materialized =