This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new e4f94633ac6 KAFKA-20279: Refactor dslStoreFormat resolution into 
AbstractConfigurableStoreFactory (#21804)
e4f94633ac6 is described below

commit e4f94633ac6f89f2abff4587ef73c5c14f318627
Author: zoo-code <[email protected]>
AuthorDate: Sat Mar 28 08:15:28 2026 +0900

    KAFKA-20279: Refactor dslStoreFormat resolution into 
AbstractConfigurableStoreFactory (#21804)
    
    Move the control flow to pick the default store format vs an overwritten
    format into the base class.
    
    Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
    <[email protected]>, Alieh Saeedi <[email protected]>
---
 .../kstream/internals/AbstractConfigurableStoreFactory.java  | 12 +++++++-----
 .../streams/kstream/internals/KeyValueStoreMaterializer.java |  5 ++---
 .../streams/kstream/internals/MaterializedStoreFactory.java  |  6 ++++--
 .../kstream/internals/OuterStreamJoinStoreFactory.java       |  5 ++---
 .../streams/kstream/internals/SessionStoreMaterializer.java  |  5 ++---
 .../kstream/internals/SlidingWindowStoreMaterializer.java    |  5 ++---
 .../streams/kstream/internals/StreamJoinedStoreFactory.java  |  5 ++---
 .../streams/kstream/internals/SubscriptionStoreFactory.java  |  5 ++---
 .../streams/kstream/internals/WindowStoreMaterializer.java   |  5 ++---
 9 files changed, 25 insertions(+), 28 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 6dca8141945..99c8c5a2bc6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -27,10 +27,13 @@ import java.util.Set;
 public abstract class AbstractConfigurableStoreFactory implements StoreFactory 
{
     private final Set<String> connectedProcessorNames = new HashSet<>();
     private DslStoreSuppliers dslStoreSuppliers;
-    private DslStoreFormat dslStoreFormat;
+    private final DslStoreFormat defaultStoreDefaultFormat;
+    private DslStoreFormat dslStoreFormatOverwrite;
 
-    public AbstractConfigurableStoreFactory(final DslStoreSuppliers 
initialStoreSuppliers) {
+    public AbstractConfigurableStoreFactory(final DslStoreSuppliers 
initialStoreSuppliers,
+                                            final DslStoreFormat 
defaultStoreDefaultFormat) {
         this.dslStoreSuppliers = initialStoreSuppliers;
+        this.defaultStoreDefaultFormat = defaultStoreDefaultFormat;
     }
 
     @Override
@@ -44,9 +47,8 @@ public abstract class AbstractConfigurableStoreFactory 
implements StoreFactory {
         }
         final String dslStoreFormatValue = 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
         if 
(dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) {
-            dslStoreFormat = DslStoreFormat.HEADERS;
+            dslStoreFormatOverwrite = DslStoreFormat.HEADERS;
         }
-        // else dslStoreFormat remains null and the lower layers decide 
between PLAIN and TIMESTAMPED
     }
 
     @Override
@@ -55,7 +57,7 @@ public abstract class AbstractConfigurableStoreFactory 
implements StoreFactory {
     }
 
     public DslStoreFormat dslStoreFormat() {
-        return dslStoreFormat;
+        return dslStoreFormatOverwrite == null ? defaultStoreDefaultFormat : 
dslStoreFormatOverwrite;
     }
 
     protected DslStoreSuppliers dslStoreSuppliers() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 69553956d10..2a80f21a6ee 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -40,14 +40,13 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
     public KeyValueStoreMaterializer(
             final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materialized
     ) {
-        super(materialized);
+        super(materialized, DslStoreFormat.TIMESTAMPED);
     }
 
     @Override
     public StoreBuilder<?> builder() {
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
         final KeyValueBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
-                ? dslStoreSuppliers().keyValueStore(new 
DslKeyValueParams(materialized.storeName(), storeFormat))
+                ? dslStoreSuppliers().keyValueStore(new 
DslKeyValueParams(materialized.storeName(), dslStoreFormat()))
                 : (KeyValueBytesStoreSupplier) materialized.storeSupplier();
 
         final StoreBuilder<?> builder;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
index 83cb6606790..b5dfb19a3c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.StoreSupplier;
@@ -30,8 +31,9 @@ import java.util.Map;
 public abstract class MaterializedStoreFactory<K, V, S extends StateStore> 
extends AbstractConfigurableStoreFactory {
     protected final MaterializedInternal<K, V, S> materialized;
 
-    public MaterializedStoreFactory(final MaterializedInternal<K, V, S> 
materialized) {
-        super(materialized.dslStoreSuppliers().orElse(null));
+    public MaterializedStoreFactory(final MaterializedInternal<K, V, S> 
materialized,
+                                    final DslStoreFormat defaultStoreFormat) {
+        super(materialized.dslStoreSuppliers().orElse(null), 
defaultStoreFormat);
         this.materialized = materialized;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index e7194ff89ea..e1d7a2295ba 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -60,7 +60,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
             final JoinWindows windows,
             final Type type
     ) {
-        super(streamJoined.dslStoreSuppliers());
+        super(streamJoined.dslStoreSuppliers(), DslStoreFormat.PLAIN);
 
         // we store this one manually instead of relying on 
super#dslStoreSuppliers()
         // so that we can differentiate between one that was explicitly passed 
in and
@@ -96,8 +96,7 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
         final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde 
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
         final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new 
LeftOrRightValueSerde<>(streamJoined.valueSerde(), 
streamJoined.otherValueSerde());
 
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
-        final DslKeyValueParams dslKeyValueParams = new 
DslKeyValueParams(name, storeFormat);
+        final DslKeyValueParams dslKeyValueParams = new 
DslKeyValueParams(name, dslStoreFormat());
         final KeyValueBytesStoreSupplier supplier;
 
         if (passedInDslStoreSuppliers != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index e89c46141d5..e3974e937c9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -41,7 +41,7 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
             final SessionWindows sessionWindows,
             final EmitStrategy emitStrategy
     ) {
-        super(materialized);
+        super(materialized, DslStoreFormat.PLAIN);
         this.materialized = materialized;
         this.sessionWindows = sessionWindows;
         this.emitStrategy = emitStrategy;
@@ -60,13 +60,12 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
 
     @Override
     public  StoreBuilder<SessionStoreWithHeaders<K, V>> builder() {
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : dslStoreFormat();
         final SessionBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
                 ? dslStoreSuppliers().sessionStore(new DslSessionParams(
                         materialized.storeName(),
                         Duration.ofMillis(retentionPeriod),
                         emitStrategy,
-                        storeFormat))
+                        dslStoreFormat()))
                 : (SessionBytesStoreSupplier) materialized.storeSupplier();
 
         final StoreBuilder<SessionStoreWithHeaders<K, V>> builder = 
Stores.sessionStoreBuilderWithHeaders(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
index b217accea2d..2fa87ed9616 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -40,7 +40,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
             final SlidingWindows windows,
             final EmitStrategy emitStrategy
     ) {
-        super(materialized);
+        super(materialized, DslStoreFormat.TIMESTAMPED);
         this.windows = windows;
         this.emitStrategy = emitStrategy;
 
@@ -59,7 +59,6 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
 
     @Override
     public StoreBuilder<?> builder() {
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
         final WindowBytesStoreSupplier supplier = materialized.storeSupplier() 
== null
             ? dslStoreSuppliers().windowStore(new DslWindowParams(
             materialized.storeName(),
@@ -68,7 +67,7 @@ public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFacto
             false,
             emitStrategy,
             true,
-            storeFormat
+            dslStoreFormat()
         ))
             : (WindowBytesStoreSupplier) materialized.storeSupplier();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
index 0c21b30350a..430d34d455a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java
@@ -54,7 +54,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
             final StreamJoinedInternal<K, V1, V2> joinedInternal,
             final Type type
     ) {
-        super(joinedInternal.dslStoreSuppliers());
+        super(joinedInternal.dslStoreSuppliers(), DslStoreFormat.PLAIN);
         this.name = name + "-store";
         this.joinedInternal = joinedInternal;
         this.windows = windows;
@@ -82,7 +82,6 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
 
     @Override
     public StoreBuilder<?> builder() {
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : dslStoreFormat();
         final WindowBytesStoreSupplier supplier = storeSupplier == null
                 ? dslStoreSuppliers().windowStore(new DslWindowParams(
                         this.name,
@@ -91,7 +90,7 @@ public class StreamJoinedStoreFactory<K, V1, V2> extends 
AbstractConfigurableSto
                         true,
                         EmitStrategy.onWindowUpdate(),
                         false,
-                        storeFormat
+                        dslStoreFormat()
                 ))
                 : storeSupplier;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index d68e6389c31..2aed2f74d34 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -39,7 +39,7 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
         final String name,
         final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde
     ) {
-        super(null);
+        super(null, DslStoreFormat.TIMESTAMPED);
         this.name = name;
         this.subscriptionWrapperSerde = subscriptionWrapperSerde;
     }
@@ -47,9 +47,8 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
     @Override
     public StoreBuilder<?> builder() {
         StoreBuilder<?> builder;
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
         builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
-            dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
storeFormat)),
+            dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
dslStoreFormat())),
             new Serdes.BytesSerde(),
             subscriptionWrapperSerde
         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
index 5150a1ad4fa..c15c4e8c804 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -40,7 +40,7 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
             final Windows<?> windows,
             final EmitStrategy emitStrategy
     ) {
-        super(materialized);
+        super(materialized, DslStoreFormat.TIMESTAMPED);
         this.windows = windows;
         this.emitStrategy = emitStrategy;
 
@@ -57,7 +57,6 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
 
     @Override
     public StoreBuilder<?> builder() {
-        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : dslStoreFormat();
         final WindowBytesStoreSupplier supplier = materialized.storeSupplier() 
== null
             ? dslStoreSuppliers().windowStore(new DslWindowParams(
             materialized.storeName(),
@@ -66,7 +65,7 @@ public class WindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V
             false,
             emitStrategy,
             false,
-            storeFormat
+            dslStoreFormat()
         ))
             : (WindowBytesStoreSupplier) materialized.storeSupplier();
 

Reply via email to