This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6ebc93ccc0f MINOR: simplify window-byte-store-supplier (#21829)
6ebc93ccc0f is described below

commit 6ebc93ccc0f200452508e3927ccb6eb3b6ecac24
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Mar 27 16:26:50 2026 -0700

    MINOR: simplify window-byte-store-supplier (#21829)
    
    This PR removes an unnecessary "duplicated" constructor for
    RocksDbWindowBytesStoreSupplier, and unifies the caller stack to re-use
    more shared code.
    
    Also make internal contractors non-public for
    RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.
    
    Reviewers: Lan Ding <[email protected]>, Alieh Saeedi
     <[email protected]>, TengYao Chi <[email protected]>
---
 .../org/apache/kafka/streams/state/Stores.java     | 50 ++++------------
 ...IndexedTimeOrderedWindowBytesStoreSupplier.java | 69 ++++++++++++++--------
 .../internals/RocksDbWindowBytesStoreSupplier.java | 26 +++-----
 3 files changed, 63 insertions(+), 82 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index db39656b83b..c541eca32a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -299,7 +299,7 @@ public final class Stores {
                                                                  final 
Duration retentionPeriod,
                                                                  final 
Duration windowSize,
                                                                  final boolean 
retainDuplicates) throws IllegalArgumentException {
-        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, false);
+        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE);
     }
 
     /**
@@ -331,7 +331,7 @@ public final class Stores {
                                                                             
final Duration retentionPeriod,
                                                                             
final Duration windowSize,
                                                                             
final boolean retainDuplicates) throws IllegalArgumentException {
-        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, true);
+        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE);
     }
 
     /**
@@ -348,28 +348,14 @@ public final class Stores {
                                                                                
        final Duration retentionPeriod,
                                                                                
        final Duration windowSize,
                                                                                
        final boolean retainDuplicates) throws IllegalArgumentException {
-        Objects.requireNonNull(name, "name cannot be null");
-        final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
-        final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
-        final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, 
"windowSize");
-        final long windowSizeMs = validateMillisecondDuration(windowSize, 
wsMsgPrefix);
-
-        final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
-
-        return new RocksDbWindowBytesStoreSupplier(
-            name,
-            retentionMs,
-            defaultSegmentInterval,
-            windowSizeMs,
-            retainDuplicates,
-            
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
+        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
     }
 
     private static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
                                                                   final 
Duration retentionPeriod,
                                                                   final 
Duration windowSize,
                                                                   final 
boolean retainDuplicates,
-                                                                  final 
boolean timestampedStore) {
+                                                                  final 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
@@ -378,26 +364,13 @@ public final class Stores {
 
         final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
 
-        return persistentWindowStore(name, retentionMs, windowSizeMs, 
retainDuplicates, defaultSegmentInterval, timestampedStore);
-    }
-
-    private static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
-                                                                  final long 
retentionPeriod,
-                                                                  final long 
windowSize,
-                                                                  final 
boolean retainDuplicates,
-                                                                  final long 
segmentInterval,
-                                                                  final 
boolean timestampedStore) {
-        Objects.requireNonNull(name, "name cannot be null");
-        if (retentionPeriod < 0L) {
+        if (retentionMs < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
         }
-        if (windowSize < 0L) {
+        if (windowSizeMs < 0L) {
             throw new IllegalArgumentException("windowSize cannot be 
negative");
         }
-        if (segmentInterval < 1L) {
-            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
-        }
-        if (windowSize > retentionPeriod) {
+        if (windowSizeMs > retentionMs) {
             throw new IllegalArgumentException("The retention period of the 
window store "
                 + name + " must be no smaller than its window size. Got size=["
                 + windowSize + "], retention=[" + retentionPeriod + "]");
@@ -405,11 +378,12 @@ public final class Stores {
 
         return new RocksDbWindowBytesStoreSupplier(
             name,
-            retentionPeriod,
-            segmentInterval,
-            windowSize,
+            retentionMs,
+            defaultSegmentInterval,
+            windowSizeMs,
             retainDuplicates,
-            timestampedStore);
+            storeType
+        );
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 968821eb5c4..4e372242f8d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -40,12 +40,14 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
     private final boolean retainDuplicates;
     private final WindowStoreTypes windowStoreType;
 
-    public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier 
create(final String name,
-                                                                           
final Duration retentionPeriod,
-                                                                           
final Duration windowSize,
-                                                                           
final boolean retainDuplicates,
-                                                                           
final boolean hasIndex,
-                                                                           
final boolean withHeaders) {
+    public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates,
+        final boolean hasIndex,
+        final boolean withHeaders
+    ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
@@ -60,28 +62,41 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
         if (windowSizeMs < 0L) {
             throw new IllegalArgumentException("windowSize cannot be 
negative");
         }
-        if (defaultSegmentInterval < 1L) {
-            throw new IllegalArgumentException("segmentInterval cannot be zero 
or negative");
-        }
         if (windowSizeMs > retentionMs) {
             throw new IllegalArgumentException("The retention period of the 
window store "
                 + name + " must be no smaller than its window size. Got size=["
                 + windowSizeMs + "], retention=[" + retentionMs + "]");
         }
 
-        return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(name, 
retentionMs,
-            defaultSegmentInterval, windowSizeMs, retainDuplicates, hasIndex, 
withHeaders);
+        return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+            name,
+            retentionMs,
+            defaultSegmentInterval,
+            windowSizeMs,
+            retainDuplicates,
+            hasIndex,
+            withHeaders
+        );
     }
 
-    public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
-                                           final long retentionPeriod,
-                                           final long segmentInterval,
-                                           final long windowSize,
-                                           final boolean retainDuplicates,
-                                           final boolean withIndex,
-                                           final boolean withHeaders) {
-        this(name, retentionPeriod, segmentInterval, windowSize, 
retainDuplicates,
-            determineStoreType(withIndex, withHeaders));
+    // for testing only
+    RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+        final String name,
+        final long retentionPeriod,
+        final long segmentInterval,
+        final long windowSize,
+        final boolean retainDuplicates,
+        final boolean withIndex,
+        final boolean withHeaders
+    ) {
+        this(
+            name,
+            retentionPeriod,
+            segmentInterval,
+            windowSize,
+            retainDuplicates,
+            determineStoreType(withIndex, withHeaders)
+        );
     }
 
     private static WindowStoreTypes determineStoreType(final boolean 
withIndex, final boolean withHeaders) {
@@ -94,12 +109,14 @@ public class 
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
         }
     }
 
-    public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
-                                           final long retentionPeriod,
-                                           final long segmentInterval,
-                                           final long windowSize,
-                                           final boolean retainDuplicates,
-                                           final WindowStoreTypes 
windowStoreType) {
+    private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+        final String name,
+        final long retentionPeriod,
+        final long segmentInterval,
+        final long windowSize,
+        final boolean retainDuplicates,
+        final WindowStoreTypes windowStoreType
+    ) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.segmentInterval = segmentInterval;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 14213b659e5..0d958d82822 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -34,24 +34,14 @@ public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier
     private final boolean retainDuplicates;
     private final WindowStoreTypes windowStoreType;
 
-    public RocksDbWindowBytesStoreSupplier(final String name,
-                                           final long retentionPeriod,
-                                           final long segmentInterval,
-                                           final long windowSize,
-                                           final boolean retainDuplicates,
-                                           final boolean 
returnTimestampedStore) {
-        this(name, retentionPeriod, segmentInterval, windowSize, 
retainDuplicates,
-            returnTimestampedStore
-                ? WindowStoreTypes.TIMESTAMPED_WINDOW_STORE
-                : WindowStoreTypes.DEFAULT_WINDOW_STORE);
-    }
-
-    public RocksDbWindowBytesStoreSupplier(final String name,
-                                           final long retentionPeriod,
-                                           final long segmentInterval,
-                                           final long windowSize,
-                                           final boolean retainDuplicates,
-                                           final WindowStoreTypes 
windowStoreType) {
+    public RocksDbWindowBytesStoreSupplier(
+        final String name,
+        final long retentionPeriod,
+        final long segmentInterval,
+        final long windowSize,
+        final boolean retainDuplicates,
+        final WindowStoreTypes windowStoreType
+    ) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.segmentInterval = segmentInterval;

Reply via email to