This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6ebc93ccc0f MINOR: simplify window-byte-store-supplier (#21829)
6ebc93ccc0f is described below
commit 6ebc93ccc0f200452508e3927ccb6eb3b6ecac24
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Mar 27 16:26:50 2026 -0700
MINOR: simplify window-byte-store-supplier (#21829)
This PR removes an unnecessary "duplicated" constructor for
RocksDbWindowBytesStoreSupplier, and unifies the caller stack to re-use
more shared code.
Also make internal contractors non-public for
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.
Reviewers: Lan Ding <[email protected]>, Alieh Saeedi
<[email protected]>, TengYao Chi <[email protected]>
---
.../org/apache/kafka/streams/state/Stores.java | 50 ++++------------
...IndexedTimeOrderedWindowBytesStoreSupplier.java | 69 ++++++++++++++--------
.../internals/RocksDbWindowBytesStoreSupplier.java | 26 +++-----
3 files changed, 63 insertions(+), 82 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index db39656b83b..c541eca32a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -299,7 +299,7 @@ public final class Stores {
final
Duration retentionPeriod,
final
Duration windowSize,
final boolean
retainDuplicates) throws IllegalArgumentException {
- return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates, false);
+ return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE);
}
/**
@@ -331,7 +331,7 @@ public final class Stores {
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
- return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates, true);
+ return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE);
}
/**
@@ -348,28 +348,14 @@ public final class Stores {
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
- Objects.requireNonNull(name, "name cannot be null");
- final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
- final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
- final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize,
"windowSize");
- final long windowSizeMs = validateMillisecondDuration(windowSize,
wsMsgPrefix);
-
- final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
-
- return new RocksDbWindowBytesStoreSupplier(
- name,
- retentionMs,
- defaultSegmentInterval,
- windowSizeMs,
- retainDuplicates,
-
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
+ return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
}
private static WindowBytesStoreSupplier persistentWindowStore(final String
name,
final
Duration retentionPeriod,
final
Duration windowSize,
final
boolean retainDuplicates,
- final
boolean timestampedStore) {
+ final
RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
@@ -378,26 +364,13 @@ public final class Stores {
final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
- return persistentWindowStore(name, retentionMs, windowSizeMs,
retainDuplicates, defaultSegmentInterval, timestampedStore);
- }
-
- private static WindowBytesStoreSupplier persistentWindowStore(final String
name,
- final long
retentionPeriod,
- final long
windowSize,
- final
boolean retainDuplicates,
- final long
segmentInterval,
- final
boolean timestampedStore) {
- Objects.requireNonNull(name, "name cannot be null");
- if (retentionPeriod < 0L) {
+ if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be
negative");
}
- if (windowSize < 0L) {
+ if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be
negative");
}
- if (segmentInterval < 1L) {
- throw new IllegalArgumentException("segmentInterval cannot be zero
or negative");
- }
- if (windowSize > retentionPeriod) {
+ if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the
window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
@@ -405,11 +378,12 @@ public final class Stores {
return new RocksDbWindowBytesStoreSupplier(
name,
- retentionPeriod,
- segmentInterval,
- windowSize,
+ retentionMs,
+ defaultSegmentInterval,
+ windowSizeMs,
retainDuplicates,
- timestampedStore);
+ storeType
+ );
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
index 968821eb5c4..4e372242f8d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.java
@@ -40,12 +40,14 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
private final boolean retainDuplicates;
private final WindowStoreTypes windowStoreType;
- public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier
create(final String name,
-
final Duration retentionPeriod,
-
final Duration windowSize,
-
final boolean retainDuplicates,
-
final boolean hasIndex,
-
final boolean withHeaders) {
+ public static RocksDbIndexedTimeOrderedWindowBytesStoreSupplier create(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates,
+ final boolean hasIndex,
+ final boolean withHeaders
+ ) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
@@ -60,28 +62,41 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be
negative");
}
- if (defaultSegmentInterval < 1L) {
- throw new IllegalArgumentException("segmentInterval cannot be zero
or negative");
- }
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the
window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSizeMs + "], retention=[" + retentionMs + "]");
}
- return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(name,
retentionMs,
- defaultSegmentInterval, windowSizeMs, retainDuplicates, hasIndex,
withHeaders);
+ return new RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ name,
+ retentionMs,
+ defaultSegmentInterval,
+ windowSizeMs,
+ retainDuplicates,
+ hasIndex,
+ withHeaders
+ );
}
- public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
- final long retentionPeriod,
- final long segmentInterval,
- final long windowSize,
- final boolean retainDuplicates,
- final boolean withIndex,
- final boolean withHeaders) {
- this(name, retentionPeriod, segmentInterval, windowSize,
retainDuplicates,
- determineStoreType(withIndex, withHeaders));
+ // for testing only
+ RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ final String name,
+ final long retentionPeriod,
+ final long segmentInterval,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final boolean withIndex,
+ final boolean withHeaders
+ ) {
+ this(
+ name,
+ retentionPeriod,
+ segmentInterval,
+ windowSize,
+ retainDuplicates,
+ determineStoreType(withIndex, withHeaders)
+ );
}
private static WindowStoreTypes determineStoreType(final boolean
withIndex, final boolean withHeaders) {
@@ -94,12 +109,14 @@ public class
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier implements Window
}
}
- public RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(final String name,
- final long retentionPeriod,
- final long segmentInterval,
- final long windowSize,
- final boolean retainDuplicates,
- final WindowStoreTypes
windowStoreType) {
+ private RocksDbIndexedTimeOrderedWindowBytesStoreSupplier(
+ final String name,
+ final long retentionPeriod,
+ final long segmentInterval,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final WindowStoreTypes windowStoreType
+ ) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.segmentInterval = segmentInterval;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 14213b659e5..0d958d82822 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -34,24 +34,14 @@ public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier
private final boolean retainDuplicates;
private final WindowStoreTypes windowStoreType;
- public RocksDbWindowBytesStoreSupplier(final String name,
- final long retentionPeriod,
- final long segmentInterval,
- final long windowSize,
- final boolean retainDuplicates,
- final boolean
returnTimestampedStore) {
- this(name, retentionPeriod, segmentInterval, windowSize,
retainDuplicates,
- returnTimestampedStore
- ? WindowStoreTypes.TIMESTAMPED_WINDOW_STORE
- : WindowStoreTypes.DEFAULT_WINDOW_STORE);
- }
-
- public RocksDbWindowBytesStoreSupplier(final String name,
- final long retentionPeriod,
- final long segmentInterval,
- final long windowSize,
- final boolean retainDuplicates,
- final WindowStoreTypes
windowStoreType) {
+ public RocksDbWindowBytesStoreSupplier(
+ final String name,
+ final long retentionPeriod,
+ final long segmentInterval,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final WindowStoreTypes windowStoreType
+ ) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.segmentInterval = segmentInterval;