This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 930f4eab437 KAFKA-20194: Ensure backward compatibility for Windowed 
Store (#22004)
930f4eab437 is described below

commit 930f4eab43700a81cf248ec9c6ef2dae47ef6e94
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 22:31:06 2026 -0700

    KAFKA-20194: Ensure backward compatibility for Windowed Store (#22004)
    
    WindowHeaders-supplier must implement HeadersBytesStoreSupplier.
    
    Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedi
     <[email protected]>, TengYao Chi <[email protected]>
---
 .../org/apache/kafka/streams/state/Stores.java     | 30 +++++++----
 .../internals/RocksDbWindowBytesStoreSupplier.java | 20 ++------
 .../RocksDbWindowHeadersBytesStoreSupplier.java    | 58 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 24 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index dfa2a2e2bd0..675cf06d45c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier
 import 
org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbWindowHeadersBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
@@ -44,6 +45,7 @@ import java.util.Objects;
 
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS;
 
 /**
  * Factory for creating state stores in Kafka Streams.
@@ -360,7 +362,7 @@ public final class Stores {
         final Duration windowSize,
         final boolean retainDuplicates
     ) throws IllegalArgumentException {
-        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
+        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
     }
 
     private static WindowBytesStoreSupplier persistentWindowStore(
@@ -390,14 +392,24 @@ public final class Stores {
                 + windowSize + "], retention=[" + retentionPeriod + "]");
         }
 
-        return new RocksDbWindowBytesStoreSupplier(
-            name,
-            retentionMs,
-            defaultSegmentInterval,
-            windowSizeMs,
-            retainDuplicates,
-            storeType
-        );
+        if (storeType == TIMESTAMPED_WINDOW_STORE_WITH_HEADERS) {
+            return new RocksDbWindowHeadersBytesStoreSupplier(
+                name,
+                retentionMs,
+                defaultSegmentInterval,
+                windowSizeMs,
+                retainDuplicates
+            );
+        } else {
+            return new RocksDbWindowBytesStoreSupplier(
+                name,
+                retentionMs,
+                defaultSegmentInterval,
+                windowSizeMs,
+                retainDuplicates,
+                storeType
+            );
+        }
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 0d958d82822..6a5ec29145b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -27,11 +27,11 @@ public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier
         TIMESTAMPED_WINDOW_STORE_WITH_HEADERS
     }
 
-    private final String name;
-    private final long retentionPeriod;
-    private final long segmentInterval;
-    private final long windowSize;
-    private final boolean retainDuplicates;
+    protected final String name;
+    protected final long retentionPeriod;
+    protected final long segmentInterval;
+    protected final long windowSize;
+    protected final boolean retainDuplicates;
     private final WindowStoreTypes windowStoreType;
 
     public RocksDbWindowBytesStoreSupplier(
@@ -78,16 +78,6 @@ public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier
                         new WindowKeySchema()),
                     retainDuplicates,
                     windowSize);
-            case TIMESTAMPED_WINDOW_STORE_WITH_HEADERS:
-                return new RocksDBTimestampedWindowStoreWithHeaders(
-                    new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
-                        name,
-                        metricsScope(),
-                        retentionPeriod,
-                        segmentInterval,
-                        new WindowKeySchema()),
-                    retainDuplicates,
-                    windowSize);
             default:
                 throw new IllegalArgumentException("invalid window store type: 
" + windowStoreType);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowHeadersBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowHeadersBytesStoreSupplier.java
new file mode 100644
index 00000000000..87384bcdc9a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowHeadersBytesStoreSupplier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+
+import static 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS;
+
+public class RocksDbWindowHeadersBytesStoreSupplier
+    extends RocksDbWindowBytesStoreSupplier
+    implements HeadersBytesStoreSupplier {
+
+    public RocksDbWindowHeadersBytesStoreSupplier(
+        final String name,
+        final long retentionPeriod,
+        final long segmentInterval,
+        final long windowSize,
+        final boolean retainDuplicates
+    ) {
+        super(
+            name,
+            retentionPeriod,
+            segmentInterval,
+            windowSize,
+            retainDuplicates,
+            TIMESTAMPED_WINDOW_STORE_WITH_HEADERS
+        );
+    }
+
+    @Override
+    public WindowStore<Bytes, byte[]> get() {
+        return new RocksDBTimestampedWindowStoreWithHeaders(
+            new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+                name,
+                metricsScope(),
+                retentionPeriod,
+                segmentInterval,
+                new WindowKeySchema()),
+            retainDuplicates,
+            windowSize);
+    }
+}

Reply via email to