This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b89f34af14c616076571d690e755b1272de02522 Author: Matthias J. Sax <[email protected]> AuthorDate: Fri Mar 27 16:26:50 2026 -0700 MINOR: simplify window-byte-store-supplier (#21829) This PR removes an unnecessary "duplicated" constructor for RocksDbWindowBytesStoreSupplier, and unifies the caller stack to re-use more shared code. Also make internal contractors non-public for RocksDbIndexedTimeOrderedWindowBytesStoreSupplier. Reviewers: Lan Ding <[email protected]>, Alieh Saeedi <[email protected]>, TengYao Chi <[email protected]> --- .../org/apache/kafka/streams/state/Stores.java | 50 ++++------------ ...IndexedTimeOrderedWindowBytesStoreSupplier.java | 69 ++++++++++++++-------- .../internals/RocksDbWindowBytesStoreSupplier.java | 26 +++----- 3 files changed, 63 insertions(+), 82 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index db39656b83b..c541eca32a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -299,7 +299,7 @@ public final class Stores { final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException { - return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false); + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE); } /** @@ -331,7 +331,7 @@ public final class Stores { final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException { - return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true); + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE); } /** @@ -348,28 +348,14 @@ public final class Stores { final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException { - Objects.requireNonNull(name, "name cannot be null"); - final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); - final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); - final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); - final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); - - final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L); - - return new RocksDbWindowBytesStoreSupplier( - name, - retentionMs, - defaultSegmentInterval, - windowSizeMs, - retainDuplicates, - RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS); + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS); } private static WindowBytesStoreSupplier persistentWindowStore(final String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates, - final boolean timestampedStore) { + final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) { Objects.requireNonNull(name, "name cannot be null"); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); @@ -378,26 +364,13 @@ public final class Stores { final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L); - return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore); - } - - private static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates, - final long segmentInterval, - final boolean timestampedStore) { - Objects.requireNonNull(name, "name cannot be null"); - if (retentionPeriod < 0L) { + if (retentionMs < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); } - if (windowSize < 0L) { + if (windowSizeMs < 0L) { throw new IllegalArgumentException("windowSize cannot be negative"); } - if (segmentInterval < 1L) { - throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); - } - if (windowSize > retentionPeriod) { + if (windowSizeMs > retentionMs) { throw new IllegalArgumentException("The retention period of the window store " + name + " must be no smaller than its window size. Got size=[" + windowSize + "], retention=[" + retentionPeriod + "]"); @@ -405,11 +378,12 @@ public final class Stores { return new RocksDbWindowBytesStoreSupplier( name, - retentionPeriod, - segmentInterval, - windowSize, + retentionMs, + defaultSegmentInterval, + windowSizeMs, retainDuplicates, - timestampedStore); + storeType + ); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java index 968821eb5c4..4e372242f8d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java @@ -40,12 +40,14 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window private final boolean retainDuplicates; private final WindowStoreTypes windowStoreType; - public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates, - final boolean hasIndex, - final boolean withHeaders) { + public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create( + final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates, + final boolean hasIndex, + final boolean withHeaders + ) { Objects.requireNonNull(name, "name cannot be null"); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); @@ -60,28 +62,41 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window if (windowSizeMs < 0L) { throw new IllegalArgumentException("windowSize cannot be negative"); } - if (defaultSegmentInterval < 1L) { - throw new IllegalArgumentException("segmentInterval cannot be zero or negative"); - } if (windowSizeMs > retentionMs) { throw new IllegalArgumentException("The retention period of the window store " + name + " must be no smaller than its window size. Got size=[" + windowSizeMs + "], retention=[" + retentionMs + "]"); } - return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(name, retentionMs, - defaultSegmentInterval, windowSizeMs, retainDuplicates, hasIndex, withHeaders); + return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( + name, + retentionMs, + defaultSegmentInterval, + windowSizeMs, + retainDuplicates, + hasIndex, + withHeaders + ); } - public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name, - final long retentionPeriod, - final long segmentInterval, - final long windowSize, - final boolean retainDuplicates, - final boolean withIndex, - final boolean withHeaders) { - this(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates, - determineStoreType(withIndex, withHeaders)); + // for testing only + RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( + final String name, + final long retentionPeriod, + final long segmentInterval, + final long windowSize, + final boolean retainDuplicates, + final boolean withIndex, + final boolean withHeaders + ) { + this( + name, + retentionPeriod, + segmentInterval, + windowSize, + retainDuplicates, + determineStoreType(withIndex, withHeaders) + ); } private static WindowStoreTypes determineStoreType(final boolean withIndex, final boolean withHeaders) { @@ -94,12 +109,14 @@ public class RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window } } - public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name, - final long retentionPeriod, - final long segmentInterval, - final long windowSize, - final boolean retainDuplicates, - final WindowStoreTypes windowStoreType) { + private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier( + final String name, + final long retentionPeriod, + final long segmentInterval, + final long windowSize, + final boolean retainDuplicates, + final WindowStoreTypes windowStoreType + ) { this.name = name; this.retentionPeriod = retentionPeriod; this.segmentInterval = segmentInterval; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index 14213b659e5..0d958d82822 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -34,24 +34,14 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier private final boolean retainDuplicates; private final WindowStoreTypes windowStoreType; - public RocksDbWindowBytesStoreSupplier(final String name, - final long retentionPeriod, - final long segmentInterval, - final long windowSize, - final boolean retainDuplicates, - final boolean returnTimestampedStore) { - this(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates, - returnTimestampedStore - ? WindowStoreTypes.TIMESTAMPED_WINDOW_STORE - : WindowStoreTypes.DEFAULT_WINDOW_STORE); - } - - public RocksDbWindowBytesStoreSupplier(final String name, - final long retentionPeriod, - final long segmentInterval, - final long windowSize, - final boolean retainDuplicates, - final WindowStoreTypes windowStoreType) { + public RocksDbWindowBytesStoreSupplier( + final String name, + final long retentionPeriod, + final long segmentInterval, + final long windowSize, + final boolean retainDuplicates, + final WindowStoreTypes windowStoreType + ) { this.name = name; this.retentionPeriod = retentionPeriod; this.segmentInterval = segmentInterval;
