This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e9eb75fc784 KAFKA-20194: Ensure backward compatibility (#21904)
e9eb75fc784 is described below
commit e9eb75fc784c256c98ede4a8f051abcb71c95ea2
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 1 14:37:21 2026 -0700
KAFKA-20194: Ensure backward compatibility (#21904)
By default, the DSL should expose it's state-stores as ts-stores, as
long as header format is not enabled; otherwise, it would be a backward
incompatible change.
This PR ensures that the builders are creating the correct state stores,
depending on the format, and we insert an adaptor to allow the DSL
Processors to work only against headers-store interface.
Reviewers: Bill Bejeck <[email protected]>, TengYao Chi
<[email protected]>
---
.../integration/PapiDslIntegrationTest.java | 296 +++++++++++++++++++++
.../integration/QueryableStateIntegrationTest.java | 34 +--
.../kstream/internals/KStreamAggregate.java | 2 +-
.../streams/kstream/internals/KStreamReduce.java | 2 +-
.../streams/kstream/internals/KTableAggregate.java | 2 +-
.../streams/kstream/internals/KTableFilter.java | 2 +-
.../kstream/internals/KTableKTableJoinMerger.java | 2 +-
.../streams/kstream/internals/KTableMapValues.java | 2 +-
.../streams/kstream/internals/KTableReduce.java | 2 +-
.../streams/kstream/internals/KTableSource.java | 2 +-
.../kstream/internals/KTableTransformValues.java | 2 +-
.../internals/KeyValueStoreMaterializer.java | 14 +-
.../internals/TimestampedCacheFlushListener.java | 62 +++++
.../streams/state/HeadersBytesStoreSupplier.java | 24 ++
.../org/apache/kafka/streams/state/Stores.java | 7 +-
.../state/internals/KeyValueStoreWrapper.java | 13 +
...HeaderStoreToKeyValueTimestampStoreAdapter.java | 225 ++++++++++++++++
.../RocksDBKeyValueBytesStoreSupplier.java | 12 +-
... RocksDBKeyValueHeadersBytesStoreSupplier.java} | 23 +-
.../internals/KeyValueStoreMaterializerTest.java | 84 +++---
20 files changed, 712 insertions(+), 100 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
new file mode 100644
index 00000000000..891f1d527fd
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/PapiDslIntegrationTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class PapiDslIntegrationTest {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ private void verify(final KTable<String, String> table) {
+ table.toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStore<String, String> store =
context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueAndTimestamp<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String, ValueAndTimestamp<String>>
row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void processorShouldAccessSourceKTableStoreAsTimestampedStore() {
+ verify(builder.table("input-topic", Materialized.<String, String,
KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())));
+ }
+
+ @Test
+ public void processorShouldAccessFilteredKTableStoreAsTimestampedStore() {
+ verify(builder
+ .table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .filter((k, v) -> true, Materialized.<String, String,
KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+ );
+ }
+
+ @Test
+ public void processorShouldAccessMappedKTableStoreAsTimestampedStore() {
+ verify(builder
+ .table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .mapValues(v -> v, Materialized.<String, String,
KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessTransformedKTableStoreAsTimestampedStore() {
+ verify(builder
+ .table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .transformValues(() -> new ValueTransformerWithKey<String, String,
String>() {
+ @Override
+ public void init(final ProcessorContext context) { }
+
+ @Override
+ public String transform(final String readOnlyKey, final String
value) {
+ return value;
+ }
+
+ @Override
+ public void close() { }
+ }, Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+ );
+ }
+
+ @Test
+ public void processorShouldAccessReducedKTableStoreAsTimestampedStore() {
+ verify(builder
+ .table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupBy((KeyValueMapper<String, String, KeyValue<String,
String>>) KeyValue::pair, Grouped.with(Serdes.String(), Serdes.String()))
+ .reduce(
+ (value, aggregate) -> value,
+ (value, aggregate) -> aggregate,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void processorShouldAccessAggregatedKTableStoreAsTimestampedStore()
{
+ verify(builder
+ .table("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupBy((KeyValueMapper<String, String, KeyValue<String,
String>>) KeyValue::pair, Grouped.with(Serdes.String(), Serdes.String()))
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ (key, value, aggregate) -> aggregate,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ private void verifyJoin(final KTable<String, String> table) {
+ table.toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStore<String, String> store =
context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueAndTimestamp<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String, ValueAndTimestamp<String>>
row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> leftInputTopic =
testDriver.createInputTopic("left-input-topic", new StringSerializer(), new
StringSerializer());
+ final TestInputTopic<String, String> rightInputTopic =
testDriver.createInputTopic("right-input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ leftInputTopic.pipeInput("key1", "left");
+ rightInputTopic.pipeInput("key1", "right");
+
+ assertEquals(KeyValue.pair("key1", "left-right"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void processorShouldAccessJoinedKTableStoreAsTimestampedStore() {
+ verifyJoin(builder
+ .table("left-input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .join(
+ builder.table("right-input-topic",
Consumed.with(Serdes.String(), Serdes.String())),
+ (left, right) -> left + "-" + right,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void processorShouldAccessFKJoinedKTableStoreAsTimestampedStore() {
+ verifyJoin(builder
+ .table("left-input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .join(
+ builder.table("right-input-topic",
Consumed.with(Serdes.String(), Serdes.String())),
+ (key, value) -> key,
+ (left, right) -> left + "-" + right,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamReducedKTableStoreAsTimestampedStore() {
+ verify(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .reduce(
+ (value, aggregate) -> value,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void
processorShouldAccessKStreamAggregatedKTableStoreAsTimestampedStore() {
+ verify(builder
+ .stream("input-topic", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .aggregate(
+ () -> "",
+ (key, value, aggregate) -> value,
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ )
+ );
+ }
+
+ @Test
+ public void processorShouldAccessKTableStoreAsHeadersStoreViaConfig() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.table("input-topic", Materialized.<String, String,
KeyValueStore<Bytes,
byte[]>>as("table-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+ .toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStoreWithHeaders<String, String>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueTimestampHeaders<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String,
ValueTimestampHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ final Properties props = new Properties();
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG,
StreamsConfig.DSL_STORE_FORMAT_HEADERS);
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+
+ @Test
+ public void processorShouldAccessKTableStoreAsHeadersStoreViaSupplier() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized =
Materialized.as(Stores.persistentTimestampedKeyValueStoreWithHeaders("table-store"));
+ builder.table("input-topic",
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
+ .toStream()
+ .process(() -> new ContextualProcessor<String, String, String,
String>() {
+ @Override
+ public void process(final Record<String, String> record) {
+ final TimestampedKeyValueStoreWithHeaders<String, String>
store = context().getStateStore("table-store");
+
+ try (final KeyValueIterator<String,
ValueTimestampHeaders<String>> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String,
ValueTimestampHeaders<String>> row = it.next();
+ context().forward(new Record<>(row.key,
row.value.value(), row.value.timestamp()));
+ }
+ }
+ }
+ }, "table-store")
+ .to("output-topic", Produced.with(Serdes.String(),
Serdes.String()));
+
+ try (final TopologyTestDriver testDriver = new
TopologyTestDriver(builder.build())) {
+ final TestInputTopic<String, String> inputTopic =
testDriver.createInputTopic("input-topic", new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
testDriver.createOutputTopic("output-topic", new StringDeserializer(), new
StringDeserializer());
+
+ inputTopic.pipeInput("key1", "value1");
+
+ assertEquals(KeyValue.pair("key1", "value1"),
outputTopic.readKeyValue());
+ }
+ }
+}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 9859e3ed24f..ae0e6226ba2 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -220,8 +220,8 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
100L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
10000);
@@ -442,7 +442,7 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldRejectNonExistentStoreName(final TestInfo testInfo)
throws InterruptedException {
+ public void shouldRejectNonExistentStoreName(final TestInfo testInfo) {
final String uniqueTestName = safeUniqueTestName(testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
@@ -480,7 +480,7 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws
InterruptedException {
+ public void shouldRejectWronglyTypedStore(final TestInfo testInfo) {
final String uniqueTestName = safeUniqueTestName(testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
@@ -521,7 +521,7 @@ public class QueryableStateIntegrationTest {
"Cannot get state store " + storeName + " because the
queryable store type" +
" [class
org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreType]" +
" does not accept the actual store type" +
- " [class
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders]."
+ " [class
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
)
);
}
@@ -734,8 +734,8 @@ public class QueryableStateIntegrationTest {
@Test
public void shouldBeAbleToQueryFilterState() throws Exception {
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.LongSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, Long>> batch1 = new HashSet<>(
@@ -800,8 +800,8 @@ public class QueryableStateIntegrationTest {
@Test
public void shouldBeAbleToQueryMapValuesState() throws Exception {
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -852,8 +852,8 @@ public class QueryableStateIntegrationTest {
@Test
public void shouldBeAbleToQueryKeysWithGivenPrefix() throws Exception {
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -897,7 +897,7 @@ public class QueryableStateIntegrationTest {
IntegrationTestUtils.getStore("queryMapValues", kafkaStreams,
keyValueStore());
int index = 0;
- try (final KeyValueIterator<String, Long> range =
myMapStore.prefixScan("go", Serdes.String().serializer())) {
+ try (final KeyValueIterator<String, Long> range =
myMapStore.prefixScan("go", new StringSerializer())) {
while (range.hasNext()) {
assertEquals(expectedPrefixScanResult.get(index++),
range.next());
}
@@ -906,8 +906,8 @@ public class QueryableStateIntegrationTest {
@Test
public void shouldBeAbleToQueryMapValuesAfterFilterState() throws
Exception {
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
final Set<KeyValue<String, String>> batch1 = new HashSet<>(
@@ -1220,7 +1220,6 @@ public class QueryableStateIntegrationTest {
private void waitUntilAtLeastNumRecordProcessed(final String topic,
final int numRecs) throws
Exception {
- final long timeout = DEFAULT_TIMEOUT_MS;
final Properties config = new Properties();
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"queryable-state-consumer");
@@ -1231,7 +1230,8 @@ public class QueryableStateIntegrationTest {
config,
topic,
numRecs,
- timeout);
+ DEFAULT_TIMEOUT_MS
+ );
}
private Set<KeyValue<String, Long>> fetch(final
ReadOnlyWindowStore<String, Long> store,
@@ -1250,7 +1250,7 @@ public class QueryableStateIntegrationTest {
/**
* A class that periodically produces records in a separate thread
*/
- private class ProducerRunnable implements Runnable {
+ private static class ProducerRunnable implements Runnable {
private final String topic;
private final List<String> inputValues;
private final int numIterations;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 166c41f1d62..4cc4736cd21 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -95,7 +95,7 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 0f240fde857..5a6a17a74c9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -92,7 +92,7 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index eaea79a21ce..e6e0cf91298 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -88,7 +88,7 @@ public class KTableAggregate<KIn, VIn, VAgg> implements
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index f9ed84f66b1..f1a48e2620f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -128,7 +128,7 @@ public class KTableFilter<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn,
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
index 7008620aa07..7fe6798e503 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
@@ -126,7 +126,7 @@ public class KTableKTableJoinMerger<K, V> implements
KTableProcessorSupplier<K,
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 2c6f7abb01e..e71dd84cf7c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -139,7 +139,7 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 04d2f65fbe6..8969a6bc0bc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -84,7 +84,7 @@ public class KTableReduce<K, V> implements
KTableProcessorSupplier<K, V, K, V> {
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 0263aa76356..9bb54befeb4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -113,7 +113,7 @@ public class KTableSource<KIn, VIn> implements
ProcessorSupplier<KIn, VIn, KIn,
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 3d92c5e5b08..75b8289132d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -109,7 +109,7 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
tupleForwarder = new TimestampedTupleForwarder<>(
store.store(),
context,
- new TimestampedCacheFlushListenerWithHeaders<>(context),
+ store.isHeadersStore() ? new
TimestampedCacheFlushListenerWithHeaders<>(context) : new
TimestampedCacheFlushListener<>(context),
sendOldValues);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 2a80f21a6ee..b5f8cdfbe14 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -55,11 +56,16 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
(VersionedBytesStoreSupplier) supplier,
materialized.keySerde(),
materialized.valueSerde());
- } else {
+ } else if (supplier instanceof HeadersBytesStoreSupplier) {
builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde());
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+ } else {
+ builder = Stores.timestampedKeyValueStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
}
if (materialized.loggingEnabled()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
new file mode 100644
index 00000000000..1cef42dbff2
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+class TimestampedCacheFlushListener<KOut, VOut> implements
CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
+
+ private final InternalProcessorContext<KOut, Change<VOut>> context;
+
+ @SuppressWarnings("rawtypes")
+ private final ProcessorNode myNode;
+
+ TimestampedCacheFlushListener(final ProcessorContext<KOut, Change<VOut>>
context) {
+ this.context = (InternalProcessorContext<KOut, Change<VOut>>) context;
+ myNode = this.context.currentNode();
+ }
+
+ @Override
+ public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>>
record) {
+ @SuppressWarnings("rawtypes") final ProcessorNode prev =
context.currentNode();
+ context.setCurrentNode(myNode);
+ try {
+ final VOut newValue = getValueOrNull(record.value().newValue);
+ final VOut oldValue = getValueOrNull(record.value().oldValue);
+ final long timestamp = record.value().newValue != null ?
record.value().newValue.timestamp() : record.timestamp();
+ final Headers headers = record.headers() != null ?
record.headers() : new RecordHeaders();
+
+ context.forward(
+ record
+ .withValue(new Change<>(newValue, oldValue,
record.value().isLatest))
+ .withTimestamp(timestamp)
+ .withHeaders(headers)
+ );
+ } finally {
+ context.setCurrentNode(prev);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
new file mode 100644
index 00000000000..f27c1711b0a
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStoreSupplier.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+/**
+ * A store supplier that can be used to create one or more "header" stores,
+ * specifically, {@link HeadersBytesStore} instances.
+ */
+public interface HeadersBytesStoreSupplier extends KeyValueBytesStoreSupplier {
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index c541eca32a0..48f982f5e67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.MemoryNavigableLRUCache;
import
org.apache.kafka.streams.state.internals.RocksDBKeyValueBytesStoreSupplier;
+import
org.apache.kafka.streams.state.internals.RocksDBKeyValueHeadersBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
@@ -97,7 +98,7 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final
String name) {
Objects.requireNonNull(name, "name cannot be null");
- return new RocksDBKeyValueBytesStoreSupplier(name, false, false);
+ return new RocksDBKeyValueBytesStoreSupplier(name, false);
}
/**
@@ -116,7 +117,7 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier
persistentTimestampedKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
- return new RocksDBKeyValueBytesStoreSupplier(name, true, false);
+ return new RocksDBKeyValueBytesStoreSupplier(name, true);
}
/**
@@ -142,7 +143,7 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier
persistentTimestampedKeyValueStoreWithHeaders(final String name) {
Objects.requireNonNull(name, "name cannot be null");
- return new RocksDBKeyValueBytesStoreSupplier(name, true, true);
+ return new RocksDBKeyValueHeadersBytesStoreSupplier(name);
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index dc9fb21046b..62d939562ca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -57,6 +57,15 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
private StateStore store;
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
+ // Try timestamped store
+ try {
+ headersStore = new
KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter<>(context.getStateStore(storeName));
+ store = headersStore;
+ return;
+ } catch (final ClassCastException e) {
+ // not timestamped store, try headers
+ }
+
// Try headers-aware timestamped store
try {
headersStore = context.getStateStore(storeName);
@@ -119,6 +128,10 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
return store;
}
+ public boolean isHeadersStore() {
+ return !(headersStore instanceof
KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter);
+ }
+
public boolean isVersionedStore() {
return versionedStore != null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
new file mode 100644
index 00000000000..a85f96877ac
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Adaptor store for the Kafka Streams DSL to bridge between "headers" store
and "ts-store".
+ *
+ * <p> With KIP-1285 we did rewrite the DLS Processor code to work against
"header store" interface to allow users
+ * to plugin "header stores", but by default the underlying store is still a
"ts-store". To avoid "if-then-else"
+ * code across the entire DSL Processor code base, we use this adaptor to wrap
a "ts-store" and make it look like
+ * a "header store".
+ *
+ * <p> On any write operation, provided {@link
org.apache.kafka.common.header.Headers} will just be dropped,
+ * and {@link ValueTimestampHeaders} type is translated into {@link
ValueAndTimestamp} type. Similarly for
+ * any read operation, the underlying {@link ValueAndTimestamp} type is
translated into a {@link ValueTimestampHeaders}
+ * type with an empty {@link org.apache.kafka.common.header.Headers} object.
+ */
+public class KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter<K,
V>
+ extends WrappedStateStore<TimestampedKeyValueStore<K, V>, K, V>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ public KeyValueTimestampedHeaderStoreToKeyValueTimestampStoreAdapter(final
TimestampedKeyValueStore<K, V> timestampedKeyValueStore) {
+ super(timestampedKeyValueStore);
+ }
+
+ @Override
+ public String name() {
+ return wrapped().name();
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ wrapped().init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ wrapped().flush();
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ wrapped().commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition partition) {
+ return wrapped().committedOffset(partition);
+ }
+
+ @Override
+ public void close() {
+ wrapped().close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return wrapped().persistent();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return wrapped().managesOffsets();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return wrapped().isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ return wrapped().query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return wrapped().getPosition();
+ }
+
+ @Override
+ public void put(final K key, final ValueTimestampHeaders<V> value) {
+ wrapped().put(
+ key,
+ value == null ? null : ValueAndTimestamp.make(value.value(),
value.timestamp())
+ );
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> putIfAbsent(final K key, final
ValueTimestampHeaders<V> value) {
+ final ValueAndTimestamp<V> oldValueAndTimestamp =
wrapped().putIfAbsent(
+ key,
+ value == null ? null : ValueAndTimestamp.make(value.value(),
value.timestamp())
+ );
+
+ return oldValueAndTimestamp == null
+ ? null
+ : ValueTimestampHeaders.make(oldValueAndTimestamp.value(),
oldValueAndTimestamp.timestamp(), new RecordHeaders());
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>>
entries) {
+ wrapped().putAll(
+ entries.stream().map(keyValuePair -> KeyValue.pair(
+ keyValuePair.key,
+ ValueAndTimestamp.make(keyValuePair.value.value(),
keyValuePair.value.timestamp()))
+ )
+ .collect(Collectors.toList())
+ );
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> delete(final K key) {
+ final ValueAndTimestamp<V> oldValueAndTimestamp =
wrapped().delete(key);
+
+ return oldValueAndTimestamp == null
+ ? null
+ : ValueTimestampHeaders.make(oldValueAndTimestamp.value(),
oldValueAndTimestamp.timestamp(), new RecordHeaders());
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> get(final K key) {
+ final ValueAndTimestamp<V> valueAndTimestamp = wrapped().get(key);
+
+ return valueAndTimestamp == null
+ ? null
+ : ValueTimestampHeaders.make(valueAndTimestamp.value(),
valueAndTimestamp.timestamp(), new RecordHeaders());
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
final K to) {
+ return new KeyValueIteratorAdapter(wrapped().range(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K
from, final K to) {
+ return new KeyValueIteratorAdapter(wrapped().reverseRange(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+ return new KeyValueIteratorAdapter(wrapped().all());
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+ return new KeyValueIteratorAdapter(wrapped().reverseAll());
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScan(final P prefix, final PS
prefixKeySerializer) {
+ return new KeyValueIteratorAdapter(wrapped().prefixScan(prefix,
prefixKeySerializer));
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return wrapped().approximateNumEntries();
+ }
+
+ private final class KeyValueIteratorAdapter implements KeyValueIterator<K,
ValueTimestampHeaders<V>> {
+ private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
+
+ private KeyValueIteratorAdapter(final KeyValueIterator<K,
ValueAndTimestamp<V>> innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, ValueTimestampHeaders<V>> next() {
+ final KeyValue<K, ValueAndTimestamp<V>> next =
innerIterator.next();
+ return KeyValue.pair(next.key,
ValueTimestampHeaders.make(next.value.value(), next.value.timestamp(), new
RecordHeaders()));
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
index a2d3426c6c7..400074f7154 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
@@ -24,18 +24,11 @@ public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupp
private final String name;
private final boolean returnTimestampedStore;
- private final boolean returnHeadersStore;
public RocksDBKeyValueBytesStoreSupplier(final String name,
- final boolean
returnTimestampedStore,
- final boolean returnHeadersStore)
{
+ final boolean
returnTimestampedStore) {
this.name = name;
this.returnTimestampedStore = returnTimestampedStore;
- this.returnHeadersStore = returnHeadersStore;
- if (returnHeadersStore && !returnTimestampedStore) {
- throw new IllegalStateException(
- "RocksDBKeyValueBytesStoreSupplier cannot return a headers
store without also returning a timestamped store!");
- }
}
@Override
@@ -45,9 +38,6 @@ public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupp
@Override
public KeyValueStore<Bytes, byte[]> get() {
- if (returnHeadersStore && returnTimestampedStore) {
- return new RocksDBTimestampedStoreWithHeaders(name,
metricsScope());
- }
if (returnTimestampedStore) {
return new RocksDBTimestampedStore(name, metricsScope());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
similarity index 54%
copy from
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
copy to
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
index a2d3426c6c7..87608668e39 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueHeadersBytesStoreSupplier.java
@@ -17,25 +17,16 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
-public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupplier {
+public class RocksDBKeyValueHeadersBytesStoreSupplier implements
KeyValueBytesStoreSupplier, HeadersBytesStoreSupplier {
private final String name;
- private final boolean returnTimestampedStore;
- private final boolean returnHeadersStore;
- public RocksDBKeyValueBytesStoreSupplier(final String name,
- final boolean
returnTimestampedStore,
- final boolean returnHeadersStore)
{
+ public RocksDBKeyValueHeadersBytesStoreSupplier(final String name) {
this.name = name;
- this.returnTimestampedStore = returnTimestampedStore;
- this.returnHeadersStore = returnHeadersStore;
- if (returnHeadersStore && !returnTimestampedStore) {
- throw new IllegalStateException(
- "RocksDBKeyValueBytesStoreSupplier cannot return a headers
store without also returning a timestamped store!");
- }
}
@Override
@@ -45,13 +36,7 @@ public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupp
@Override
public KeyValueStore<Bytes, byte[]> get() {
- if (returnHeadersStore && returnTimestampedStore) {
- return new RocksDBTimestampedStoreWithHeaders(name,
metricsScope());
- }
- if (returnTimestampedStore) {
- return new RocksDBTimestampedStore(name, metricsScope());
- }
- return new RocksDBStore(name, metricsScope());
+ return new RocksDBTimestampedStoreWithHeaders(name, metricsScope());
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 59a207b988b..6e6e35e7512 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -26,17 +26,20 @@ import
org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.apache.kafka.streams.state.DslStoreSuppliers;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.HeadersBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
+import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders;
import
org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -69,7 +72,7 @@ public class KeyValueStoreMaterializerTest {
@Mock
private InternalNameProvider nameProvider;
@Mock
- private KeyValueBytesStoreSupplier keyValueStoreSupplier;
+ private HeadersBytesStoreSupplier headersStoreSupplier;
@Mock
private VersionedBytesStoreSupplier versionedStoreSupplier;
private final KeyValueStore<Bytes, byte[]> innerKeyValueStore = new
InMemoryKeyValueStore(STORE_NAME);
@@ -99,74 +102,73 @@ public class KeyValueStoreMaterializerTest {
when(versionedStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
}
- private void mockKeyValueStoreSupplier() {
- when(keyValueStoreSupplier.get()).thenReturn(innerKeyValueStore);
- when(keyValueStoreSupplier.name()).thenReturn(STORE_NAME);
- when(keyValueStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ private void mockHeadersStoreSupplier() {
+ when(headersStoreSupplier.get()).thenReturn(innerKeyValueStore);
+ when(headersStoreSupplier.name()).thenReturn(STORE_NAME);
+ when(headersStoreSupplier.metricsScope()).thenReturn(METRICS_SCOPE);
}
@Test
- public void
shouldCreateHeadersBuilderWithCachingAndLoggingEnabledByDefault() {
+ public void
shouldCreateTimestampedBuilderWithCachingAndLoggingEnabledByDefault() {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
- final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedKeyValueStore<String, String> store =
getTimestampedStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore logging = caching.wrapped();
- assertThat(store,
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+ assertThat(store, instanceOf(MeteredTimestampedKeyValueStore.class));
assertThat(caching, instanceOf(CachingKeyValueStore.class));
- assertThat(logging,
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+ assertThat(logging,
instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
}
@Test
- public void shouldCreateDefaultHeadersBuilderWithCachingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store").withCachingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedKeyValueStore<String, String> store =
getTimestampedStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ assertThat(logging,
instanceOf(ChangeLoggingTimestampedKeyValueBytesStore.class));
}
@Test
- public void shouldCreateDefaultHeadersBuilderWithLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithLoggingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
);
- final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedKeyValueStore<String, String> store =
getTimestampedStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertThat(caching, instanceOf(CachingKeyValueStore.class));
assertThat(caching.wrapped(),
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@Test
- public void
shouldCreateDefaultHeadersBuilderWithCachingAndLoggingDisabled() {
+ public void shouldCreateTimestampedBuilderWithCachingAndLoggingDisabled() {
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized = new MaterializedInternal<>(
Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store").withCachingDisabled().withLoggingDisabled(), nameProvider,
STORE_PREFIX
);
- final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
+ final TimestampedKeyValueStore<String, String> store =
getTimestampedStore(materialized);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
assertThat(wrapped,
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingEnabledByDefault()
{
- mockKeyValueStoreSupplier();
-
+ mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.as(keyValueStoreSupplier),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.as(headersStoreSupplier),
nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore logging = caching.wrapped();
assertThat(innerKeyValueStore.name(), equalTo(store.name()));
assertThat(store,
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
@@ -176,26 +178,26 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingDisabled() {
- mockKeyValueStoreSupplier();
+ mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(keyValueStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertThat(innerKeyValueStore.name(), equalTo(store.name()));
assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class));
}
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndLoggingDisabled() {
- mockKeyValueStoreSupplier();
+ mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(keyValueStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withLoggingDisabled(), nameProvider,
STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> caching = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
assertThat(innerKeyValueStore.name(), equalTo(store.name()));
assertThat(caching, instanceOf(CachingKeyValueStore.class));
assertThat(caching.wrapped(),
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
@@ -203,13 +205,13 @@ public class KeyValueStoreMaterializerTest {
@Test
public void
shouldCreateHeadersStoreWithProvidedSupplierAndCachingAndLoggingDisabled() {
- mockKeyValueStoreSupplier();
+ mockHeadersStoreSupplier();
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
- new MaterializedInternal<>(Materialized.<String,
String>as(keyValueStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
+ new MaterializedInternal<>(Materialized.<String,
String>as(headersStoreSupplier).withCachingDisabled().withLoggingDisabled(),
nameProvider, STORE_PREFIX);
final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersStore(materialized);
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ final StateStore wrapped = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(innerKeyValueStore.name(), equalTo(store.name()));
assertThat(wrapped, not(instanceOf(CachingKeyValueStore.class)));
assertThat(wrapped,
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
@@ -223,7 +225,7 @@ public class KeyValueStoreMaterializerTest {
final VersionedKeyValueStore<String, String> store =
getVersionedStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore inner = logging.wrapped();
assertThat(innerVersionedStore.name(), equalTo(store.name()));
assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
@@ -239,7 +241,7 @@ public class KeyValueStoreMaterializerTest {
final VersionedKeyValueStore<String, String> store =
getVersionedStore(materialized);
- final StateStore inner = ((WrappedStateStore) store).wrapped();
+ final StateStore inner = ((WrappedStateStore<?, ?, ?>)
store).wrapped();
assertThat(innerVersionedStore.name(), equalTo(store.name()));
assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
assertThat(innerVersionedStore, equalTo(inner));
@@ -253,7 +255,7 @@ public class KeyValueStoreMaterializerTest {
final VersionedKeyValueStore<String, String> store =
getVersionedStore(materialized);
- final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore<?, ?, ?> logging = (WrappedStateStore<?, ?,
?>) ((WrappedStateStore<?, ?, ?>) store).wrapped();
final StateStore inner = logging.wrapped();
assertThat(innerVersionedStore.name(), equalTo(store.name()));
assertThat(store, instanceOf(MeteredVersionedKeyValueStore.class));
@@ -261,6 +263,14 @@ public class KeyValueStoreMaterializerTest {
assertThat(innerVersionedStore, equalTo(inner));
}
+ @SuppressWarnings("unchecked")
+ private TimestampedKeyValueStore<String, String> getTimestampedStore(
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
+ final KeyValueStoreMaterializer<String, String> materializer = new
KeyValueStoreMaterializer<>(materialized);
+ materializer.configure(streamsConfig);
+ return (TimestampedKeyValueStore<String, String>)
materializer.builder().build();
+ }
+
@SuppressWarnings("unchecked")
private TimestampedKeyValueStoreWithHeaders<String, String>
getHeadersStore(
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {