This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d53410d8fe KAFKA-7952: Switch stream store from RocksDB to in-memory
(#21648)
7d53410d8fe is described below
commit 7d53410d8febf4e39a65cba75951a941fe83c337
Author: gabriellefu <[email protected]>
AuthorDate: Fri Mar 13 15:41:14 2026 -0400
KAFKA-7952: Switch stream store from RocksDB to in-memory (#21648)
Switch KTableAggregateTest to in-memory stores to reduce test runtime.
---
.../kstream/internals/KTableAggregateTest.java | 24 +++++++++++++++-------
1 file changed, 17 insertions(+), 7 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 2ad5d36893b..f3f29572030 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -30,12 +30,14 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
@@ -44,6 +46,7 @@ import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -70,10 +73,17 @@ public class KTableAggregateTest {
private final MockApiProcessorSupplier<String, Object, Void, Void>
supplier = new MockApiProcessorSupplier<>();
private static final Properties CONFIG = mkProperties(mkMap(
mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory("kafka-test").getAbsolutePath())));
+
+ private StreamsBuilder createStreamBuilderInMemory() {
+ final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+ props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG,
+
BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
+ return new StreamsBuilder(new TopologyConfig(new
StreamsConfig(props)));
+ }
@Test
public void testAggBasic() {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String topic1 = "topic1";
final KTable<String, String> table1 = builder.table(topic1, consumed);
@@ -121,7 +131,7 @@ public class KTableAggregateTest {
@Test
public void testAggRepartition() {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String topic1 = "topic1";
final KTable<String, String> table1 = builder.table(topic1, consumed);
@@ -262,7 +272,7 @@ public class KTableAggregateTest {
@Test
public void testCount() {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String input = "count-test-input";
builder
@@ -277,7 +287,7 @@ public class KTableAggregateTest {
@Test
public void testCountWithInternalStore() {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String input = "count-test-input";
builder
@@ -331,7 +341,7 @@ public class KTableAggregateTest {
@Test
public void testRemoveOldBeforeAddNew() {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String input = "count-test-input";
final MockApiProcessorSupplier<String, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
@@ -377,7 +387,7 @@ public class KTableAggregateTest {
}
private void testUpgradeFromConfig(final Properties config, final
List<KeyValueTimestamp<String, Long>> expected) {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String input = "input-topic";
final String output = "output-topic";
final Serde<String> stringSerde = Serdes.String();
@@ -460,7 +470,7 @@ public class KTableAggregateTest {
private void testKeyWithNoEquals(
final KeyValueMapper<NoEqualsImpl, NoEqualsImpl,
KeyValue<NoEqualsImpl, NoEqualsImpl>> keyValueMapper,
final List<TestRecord<NoEqualsImpl, Long>> expected) {
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = createStreamBuilderInMemory();
final String input = "input-topic";
final String output = "output-topic";
final Serde<NoEqualsImpl> noEqualsImplSerde = new NoEqualsImplSerde();