This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new e4f94633ac6 KAFKA-20279: Refactor dslStoreFormat resolution into
AbstractConfigurableStoreFactory (#21804)
e4f94633ac6 is described below
commit e4f94633ac6f89f2abff4587ef73c5c14f318627
Author: zoo-code <[email protected]>
AuthorDate: Sat Mar 28 08:15:28 2026 +0900
KAFKA-20279: Refactor dslStoreFormat resolution into
AbstractConfigurableStoreFactory (#21804)
Move the control flow to pick the default store format vs an overwritten
format into the base class.
Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
<[email protected]>, Alieh Saeedi <[email protected]>
---
.../kstream/internals/AbstractConfigurableStoreFactory.java | 12 +++++++-----
.../streams/kstream/internals/KeyValueStoreMaterializer.java | 5 ++---
.../streams/kstream/internals/MaterializedStoreFactory.java | 6 ++++--
.../kstream/internals/OuterStreamJoinStoreFactory.java | 5 ++---
.../streams/kstream/internals/SessionStoreMaterializer.java | 5 ++---
.../kstream/internals/SlidingWindowStoreMaterializer.java | 5 ++---
.../streams/kstream/internals/StreamJoinedStoreFactory.java | 5 ++---
.../streams/kstream/internals/SubscriptionStoreFactory.java | 5 ++---
.../streams/kstream/internals/WindowStoreMaterializer.java | 5 ++---
9 files changed, 25 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 6dca8141945..99c8c5a2bc6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -27,10 +27,13 @@ import java.util.Set;
public abstract class AbstractConfigurableStoreFactory implements StoreFactory
{
private final Set<String> connectedProcessorNames = new HashSet<>();
private DslStoreSuppliers dslStoreSuppliers;
- private DslStoreFormat dslStoreFormat;
+ private final DslStoreFormat defaultStoreDefaultFormat;
+ private DslStoreFormat dslStoreFormatOverwrite;
- public AbstractConfigurableStoreFactory(final DslStoreSuppliers
initialStoreSuppliers) {
+ public AbstractConfigurableStoreFactory(final DslStoreSuppliers
initialStoreSuppliers,
+ final DslStoreFormat
defaultStoreDefaultFormat) {
this.dslStoreSuppliers = initialStoreSuppliers;
+ this.defaultStoreDefaultFormat = defaultStoreDefaultFormat;
}
@Override
@@ -44,9 +47,8 @@ public abstract class AbstractConfigurableStoreFactory
implements StoreFactory {
}
final String dslStoreFormatValue =
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
if
(dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) {
- dslStoreFormat = DslStoreFormat.HEADERS;
+ dslStoreFormatOverwrite = DslStoreFormat.HEADERS;
}
- // else dslStoreFormat remains null and the lower layers decide
between PLAIN and TIMESTAMPED
}
@Override
@@ -55,7 +57,7 @@ public abstract class AbstractConfigurableStoreFactory
implements StoreFactory {
}
public DslStoreFormat dslStoreFormat() {
- return dslStoreFormat;
+ return dslStoreFormatOverwrite == null ? defaultStoreDefaultFormat :
dslStoreFormatOverwrite;
}
protected DslStoreSuppliers dslStoreSuppliers() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 69553956d10..2a80f21a6ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -40,14 +40,13 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
public KeyValueStoreMaterializer(
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materialized
) {
- super(materialized);
+ super(materialized, DslStoreFormat.TIMESTAMPED);
}
@Override
public StoreBuilder<?> builder() {
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
final KeyValueBytesStoreSupplier supplier =
materialized.storeSupplier() == null
- ? dslStoreSuppliers().keyValueStore(new
DslKeyValueParams(materialized.storeName(), storeFormat))
+ ? dslStoreSuppliers().keyValueStore(new
DslKeyValueParams(materialized.storeName(), dslStoreFormat()))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
index 83cb6606790..b5dfb19a3c3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.StoreSupplier;
@@ -30,8 +31,9 @@ import java.util.Map;
public abstract class MaterializedStoreFactory<K, V, S extends StateStore>
extends AbstractConfigurableStoreFactory {
protected final MaterializedInternal<K, V, S> materialized;
- public MaterializedStoreFactory(final MaterializedInternal<K, V, S>
materialized) {
- super(materialized.dslStoreSuppliers().orElse(null));
+ public MaterializedStoreFactory(final MaterializedInternal<K, V, S>
materialized,
+ final DslStoreFormat defaultStoreFormat) {
+ super(materialized.dslStoreSuppliers().orElse(null),
defaultStoreFormat);
this.materialized = materialized;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index e7194ff89ea..e1d7a2295ba 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -60,7 +60,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
final JoinWindows windows,
final Type type
) {
- super(streamJoined.dslStoreSuppliers());
+ super(streamJoined.dslStoreSuppliers(), DslStoreFormat.PLAIN);
// we store this one manually instead of relying on
super#dslStoreSuppliers()
// so that we can differentiate between one that was explicitly passed
in and
@@ -96,8 +96,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new
LeftOrRightValueSerde<>(streamJoined.valueSerde(),
streamJoined.otherValueSerde());
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
- final DslKeyValueParams dslKeyValueParams = new
DslKeyValueParams(name, storeFormat);
+ final DslKeyValueParams dslKeyValueParams = new
DslKeyValueParams(name, dslStoreFormat());
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index e89c46141d5..e3974e937c9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -41,7 +41,7 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
final SessionWindows sessionWindows,
final EmitStrategy emitStrategy
) {
- super(materialized);
+ super(materialized, DslStoreFormat.PLAIN);
this.materialized = materialized;
this.sessionWindows = sessionWindows;
this.emitStrategy = emitStrategy;
@@ -60,13 +60,12 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
@Override
public StoreBuilder<SessionStoreWithHeaders<K, V>> builder() {
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.PLAIN : dslStoreFormat();
final SessionBytesStoreSupplier supplier =
materialized.storeSupplier() == null
? dslStoreSuppliers().sessionStore(new DslSessionParams(
materialized.storeName(),
Duration.ofMillis(retentionPeriod),
emitStrategy,
- storeFormat))
+ dslStoreFormat()))
: (SessionBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<SessionStoreWithHeaders<K, V>> builder =
Stores.sessionStoreBuilderWithHeaders(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index b217accea2d..2fa87ed9616 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -40,7 +40,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
final SlidingWindows windows,
final EmitStrategy emitStrategy
) {
- super(materialized);
+ super(materialized, DslStoreFormat.TIMESTAMPED);
this.windows = windows;
this.emitStrategy = emitStrategy;
@@ -59,7 +59,6 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
@Override
public StoreBuilder<?> builder() {
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@@ -68,7 +67,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFacto
false,
emitStrategy,
true,
- storeFormat
+ dslStoreFormat()
))
: (WindowBytesStoreSupplier) materialized.storeSupplier();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
index 0c21b30350a..430d34d455a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
@@ -54,7 +54,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
final StreamJoinedInternal<K, V1, V2> joinedInternal,
final Type type
) {
- super(joinedInternal.dslStoreSuppliers());
+ super(joinedInternal.dslStoreSuppliers(), DslStoreFormat.PLAIN);
this.name = name + "-store";
this.joinedInternal = joinedInternal;
this.windows = windows;
@@ -82,7 +82,6 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
@Override
public StoreBuilder<?> builder() {
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.PLAIN : dslStoreFormat();
final WindowBytesStoreSupplier supplier = storeSupplier == null
? dslStoreSuppliers().windowStore(new DslWindowParams(
this.name,
@@ -91,7 +90,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends
AbstractConfigurableSto
true,
EmitStrategy.onWindowUpdate(),
false,
- storeFormat
+ dslStoreFormat()
))
: storeSupplier;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index d68e6389c31..2aed2f74d34 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -39,7 +39,7 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
final String name,
final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde
) {
- super(null);
+ super(null, DslStoreFormat.TIMESTAMPED);
this.name = name;
this.subscriptionWrapperSerde = subscriptionWrapperSerde;
}
@@ -47,9 +47,8 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
@Override
public StoreBuilder<?> builder() {
StoreBuilder<?> builder;
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
- dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
storeFormat)),
+ dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
dslStoreFormat())),
new Serdes.BytesSerde(),
subscriptionWrapperSerde
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index 5150a1ad4fa..c15c4e8c804 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -40,7 +40,7 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
final Windows<?> windows,
final EmitStrategy emitStrategy
) {
- super(materialized);
+ super(materialized, DslStoreFormat.TIMESTAMPED);
this.windows = windows;
this.emitStrategy = emitStrategy;
@@ -57,7 +57,6 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
@Override
public StoreBuilder<?> builder() {
- final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
final WindowBytesStoreSupplier supplier = materialized.storeSupplier()
== null
? dslStoreSuppliers().windowStore(new DslWindowParams(
materialized.storeName(),
@@ -66,7 +65,7 @@ public class WindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V
false,
emitStrategy,
false,
- storeFormat
+ dslStoreFormat()
))
: (WindowBytesStoreSupplier) materialized.storeSupplier();