This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d53410d8fe KAFKA-7952: Switch stream store from RocksDB to in-memory 
(#21648)
7d53410d8fe is described below

commit 7d53410d8febf4e39a65cba75951a941fe83c337
Author: gabriellefu <[email protected]>
AuthorDate: Fri Mar 13 15:41:14 2026 -0400

    KAFKA-7952: Switch stream store from RocksDB to in-memory (#21648)
    
    Switch KTableAggregateTest to in-memory stores to reduce test runtime.
---
 .../kstream/internals/KTableAggregateTest.java     | 24 +++++++++++++++-------
 1 file changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 2ad5d36893b..f3f29572030 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -30,12 +30,14 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
@@ -44,6 +46,7 @@ import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockInitializer;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -70,10 +73,17 @@ public class KTableAggregateTest {
     private final MockApiProcessorSupplier<String, Object, Void, Void> 
supplier = new MockApiProcessorSupplier<>();
     private static final Properties CONFIG = mkProperties(mkMap(
         mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory("kafka-test").getAbsolutePath())));
+    
+    private StreamsBuilder createStreamBuilderInMemory() {
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+        props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
+                    
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
+        return new StreamsBuilder(new TopologyConfig(new 
StreamsConfig(props)));
+    }
 
     @Test
     public void testAggBasic() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String topic1 = "topic1";
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
@@ -121,7 +131,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testAggRepartition() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String topic1 = "topic1";
 
         final KTable<String, String> table1 = builder.table(topic1, consumed);
@@ -262,7 +272,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testCount() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String input = "count-test-input";
 
         builder
@@ -277,7 +287,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testCountWithInternalStore() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String input = "count-test-input";
 
         builder
@@ -331,7 +341,7 @@ public class KTableAggregateTest {
 
     @Test
     public void testRemoveOldBeforeAddNew() {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String input = "count-test-input";
         final MockApiProcessorSupplier<String, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
 
@@ -377,7 +387,7 @@ public class KTableAggregateTest {
     }
 
     private void testUpgradeFromConfig(final Properties config, final 
List<KeyValueTimestamp<String, Long>> expected) {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String input = "input-topic";
         final String output = "output-topic";
         final Serde<String> stringSerde = Serdes.String();
@@ -460,7 +470,7 @@ public class KTableAggregateTest {
     private void testKeyWithNoEquals(
             final KeyValueMapper<NoEqualsImpl, NoEqualsImpl, 
KeyValue<NoEqualsImpl, NoEqualsImpl>> keyValueMapper,
             final List<TestRecord<NoEqualsImpl, Long>> expected) {
-        final StreamsBuilder builder = new StreamsBuilder();
+        final StreamsBuilder builder = createStreamBuilderInMemory();
         final String input = "input-topic";
         final String output = "output-topic";
         final Serde<NoEqualsImpl> noEqualsImplSerde = new NoEqualsImplSerde();

Reply via email to