This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aa97adcf368 MINOR: Cleaning DSL ops and tests (#21792)
aa97adcf368 is described below
commit aa97adcf368c2136d3e74015afc55d6ad26ef499
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Mar 17 23:45:05 2026 +0100
MINOR: Cleaning DSL ops and tests (#21792)
This PR
- cleans up tests (replaces null headers with empty headers),
- removes unused classes and unnecessary identifiers,
- and removes unnecessary variable defined in KeyValueStoreWrapper
class.
Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../state/internals/KeyValueStoreWrapper.java | 18 +-
.../kafka/streams/KeyValueTimestampHeaders.java | 75 ---------
.../kstream/internals/KTableFilterTest.java | 21 +--
.../kstream/internals/KTableMapValuesTest.java | 45 ++---
.../kstream/internals/KTableReduceTest.java | 7 +-
.../kstream/internals/KTableSourceTest.java | 21 +--
.../internals/KTableTransformValuesTest.java | 4 +-
...mestampedCacheFlushListenerWithHeadersTest.java | 7 +-
.../ForeignTableJoinProcessorSupplierTests.java | 3 +-
.../ResponseJoinProcessorSupplierTest.java | 3 +-
.../SubscriptionJoinProcessorSupplierTest.java | 15 +-
.../SubscriptionReceiveProcessorSupplierTest.java | 9 +-
.../state/internals/KeyValueStoreWrapperTest.java | 3 +-
.../internals/TimeOrderedKeyValueBufferTest.java | 12 +-
.../kafka/test/GenericInMemoryKeyValueStore.java | 29 ++--
.../GenericInMemoryTimestampedKeyValueStore.java | 184 ---------------------
...nMemoryTimestampedKeyValueStoreWithHeaders.java | 30 ++--
.../apache/kafka/streams/TopologyTestDriver.java | 2 +-
18 files changed, 117 insertions(+), 371 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index 5d21e622ae3..dc9fb21046b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -56,13 +56,10 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
// to simplify implementation for methods which do not depend on store
type.
private StateStore store;
- @SuppressWarnings("unchecked")
public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final
String storeName) {
- final StateStore rawStore = context.getStateStore(storeName);
-
// Try headers-aware timestamped store
try {
- headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>)
rawStore;
+ headersStore = context.getStateStore(storeName);
store = headersStore;
return;
} catch (final ClassCastException e) {
@@ -71,12 +68,13 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
// Try versioned store
try {
- versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
+ versionedStore = context.getStateStore(storeName);
store = versionedStore;
} catch (final ClassCastException e) {
- final String storeType = rawStore == null ? "null" :
rawStore.getClass().getName();
+ store = context.getStateStore(storeName);
+ final String storeType = store == null ? "null" :
store.getClass().getName();
throw new InvalidStateStoreException("KTable source state store
must implement either "
- + "TimestampedKeyValueStore,
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " +
storeType);
+ + "TimestampedKeyValueStoreWithHeaders, or
VersionedKeyValueStore. Got: " + storeType);
}
}
@@ -88,9 +86,9 @@ public class KeyValueStoreWrapper<K, V> implements StateStore
{
final VersionedRecord<V> versionedRecord = versionedStore.get(key);
return versionedRecord == null
? null
- : ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), null);
+ : ValueTimestampHeaders.make(versionedRecord.value(),
versionedRecord.timestamp(), new RecordHeaders());
}
- throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either headers or versioned store");
}
public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp)
{
@@ -114,7 +112,7 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
if (versionedStore != null) {
return versionedStore.put(key, value, timestamp);
}
- throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped, headers, or versioned store");
+ throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either headers or versioned store");
}
public StateStore store() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java
b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java
deleted file mode 100644
index 5f75d825b15..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.header.Headers;
-
-import java.util.Objects;
-
-/**
- * A key-value pair with timestamp and headers, used for testing KTable with
headers support.
- */
-public class KeyValueTimestampHeaders<K, V> {
- private final K key;
- private final V value;
- private final long timestamp;
- private final Headers headers;
-
- public KeyValueTimestampHeaders(final K key, final V value, final long
timestamp, final Headers headers) {
- this.key = key;
- this.value = value;
- this.timestamp = timestamp;
- this.headers = headers;
- }
-
- public K key() {
- return key;
- }
-
- public V value() {
- return value;
- }
-
- public long timestamp() {
- return timestamp;
- }
-
- public Headers headers() {
- return headers;
- }
-
- @Override
- public String toString() {
- return "KeyValueTimestampHeaders{key=" + key + ", value=" + value + ",
timestamp=" + timestamp + ", headers=" + headers + '}';
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- final KeyValueTimestampHeaders<?, ?> that =
(KeyValueTimestampHeaders<?, ?>) o;
- return timestamp == that.timestamp
- && Objects.equals(key, that.key)
- && Objects.equals(value, that.value)
- && Objects.equals(headers, that.headers);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(key, value, timestamp, headers);
- }
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index f87c3bbe87c..45adf9951f6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -167,30 +168,30 @@ public class KTableFilterTest {
assertNull(getter2.get("B"));
assertNull(getter2.get("C"));
- assertEquals(ValueTimestampHeaders.make(1, 5L, null),
getter3.get("A"));
- assertEquals(ValueTimestampHeaders.make(1, 10L, null),
getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 15L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(1, 5L, new
RecordHeaders()), getter3.get("A"));
+ assertEquals(ValueTimestampHeaders.make(1, 10L, new
RecordHeaders()), getter3.get("B"));
+ assertEquals(ValueTimestampHeaders.make(1, 15L, new
RecordHeaders()), getter3.get("C"));
inputTopic.pipeInput("A", 2, 10L);
inputTopic.pipeInput("B", 2, 5L);
- assertEquals(ValueTimestampHeaders.make(2, 10L, null),
getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(2, 5L, null),
getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(2, 10L, new
RecordHeaders()), getter2.get("A"));
+ assertEquals(ValueTimestampHeaders.make(2, 5L, new
RecordHeaders()), getter2.get("B"));
assertNull(getter2.get("C"));
assertNull(getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 15L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(1, 15L, new
RecordHeaders()), getter3.get("C"));
inputTopic.pipeInput("A", 3, 15L);
assertNull(getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(2, 5L, null),
getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(2, 5L, new
RecordHeaders()), getter2.get("B"));
assertNull(getter2.get("C"));
- assertEquals(ValueTimestampHeaders.make(3, 15L, null),
getter3.get("A"));
+ assertEquals(ValueTimestampHeaders.make(3, 15L, new
RecordHeaders()), getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 15L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(1, 15L, new
RecordHeaders()), getter3.get("C"));
inputTopic.pipeInput("A", null, 10L);
inputTopic.pipeInput("B", null, 20L);
@@ -201,7 +202,7 @@ public class KTableFilterTest {
assertNull(getter3.get("A"));
assertNull(getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 15L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(1, 15L, new
RecordHeaders()), getter3.get("C"));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 00c9d372c8e..6ca6a8320d7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
@@ -130,44 +131,44 @@ public class KTableMapValuesTest {
inputTopic1.pipeInput("B", "01", 10L);
inputTopic1.pipeInput("C", "01", 30L);
- assertEquals(ValueTimestampHeaders.make(1, 50L, null),
getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(1, 10L, null),
getter2.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 30L, null),
getter2.get("C"));
+ assertEquals(ValueTimestampHeaders.make(1, 50L, new
RecordHeaders()), getter2.get("A"));
+ assertEquals(ValueTimestampHeaders.make(1, 10L, new
RecordHeaders()), getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(1, 30L, new
RecordHeaders()), getter2.get("C"));
- assertEquals(ValueTimestampHeaders.make(-1, 50L, null),
getter3.get("A"));
- assertEquals(ValueTimestampHeaders.make(-1, 10L, null),
getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(-1, 30L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(-1, 50L, new
RecordHeaders()), getter3.get("A"));
+ assertEquals(ValueTimestampHeaders.make(-1, 10L, new
RecordHeaders()), getter3.get("B"));
+ assertEquals(ValueTimestampHeaders.make(-1, 30L, new
RecordHeaders()), getter3.get("C"));
inputTopic1.pipeInput("A", "02", 25L);
inputTopic1.pipeInput("B", "02", 20L);
- assertEquals(ValueTimestampHeaders.make(2, 25L, null),
getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(2, 20L, null),
getter2.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 30L, null),
getter2.get("C"));
+ assertEquals(ValueTimestampHeaders.make(2, 25L, new
RecordHeaders()), getter2.get("A"));
+ assertEquals(ValueTimestampHeaders.make(2, 20L, new
RecordHeaders()), getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(1, 30L, new
RecordHeaders()), getter2.get("C"));
- assertEquals(ValueTimestampHeaders.make(-2, 25L, null),
getter3.get("A"));
- assertEquals(ValueTimestampHeaders.make(-2, 20L, null),
getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(-1, 30L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(-2, 25L, new
RecordHeaders()), getter3.get("A"));
+ assertEquals(ValueTimestampHeaders.make(-2, 20L, new
RecordHeaders()), getter3.get("B"));
+ assertEquals(ValueTimestampHeaders.make(-1, 30L, new
RecordHeaders()), getter3.get("C"));
inputTopic1.pipeInput("A", "03", 35L);
- assertEquals(ValueTimestampHeaders.make(3, 35L, null),
getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(2, 20L, null),
getter2.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 30L, null),
getter2.get("C"));
+ assertEquals(ValueTimestampHeaders.make(3, 35L, new
RecordHeaders()), getter2.get("A"));
+ assertEquals(ValueTimestampHeaders.make(2, 20L, new
RecordHeaders()), getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(1, 30L, new
RecordHeaders()), getter2.get("C"));
- assertEquals(ValueTimestampHeaders.make(-3, 35L, null),
getter3.get("A"));
- assertEquals(ValueTimestampHeaders.make(-2, 20L, null),
getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(-1, 30L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(-3, 35L, new
RecordHeaders()), getter3.get("A"));
+ assertEquals(ValueTimestampHeaders.make(-2, 20L, new
RecordHeaders()), getter3.get("B"));
+ assertEquals(ValueTimestampHeaders.make(-1, 30L, new
RecordHeaders()), getter3.get("C"));
inputTopic1.pipeInput("A", (String) null, 1L);
assertNull(getter2.get("A"));
- assertEquals(ValueTimestampHeaders.make(2, 20L, null),
getter2.get("B"));
- assertEquals(ValueTimestampHeaders.make(1, 30L, null),
getter2.get("C"));
+ assertEquals(ValueTimestampHeaders.make(2, 20L, new
RecordHeaders()), getter2.get("B"));
+ assertEquals(ValueTimestampHeaders.make(1, 30L, new
RecordHeaders()), getter2.get("C"));
assertNull(getter3.get("A"));
- assertEquals(ValueTimestampHeaders.make(-2, 20L, null),
getter3.get("B"));
- assertEquals(ValueTimestampHeaders.make(-1, 30L, null),
getter3.get("C"));
+ assertEquals(ValueTimestampHeaders.make(-2, 20L, new
RecordHeaders()), getter3.get("B"));
+ assertEquals(ValueTimestampHeaders.make(-1, 30L, new
RecordHeaders()), getter3.get("C"));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index ff39dddabc1..308dba4943e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
@@ -56,11 +57,11 @@ public class KTableReduceTest {
context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor,
singleton("myStore")));
reduceProcessor.process(new Record<>("A", new Change<>(singleton("a"),
null), 10L));
- assertEquals(ValueTimestampHeaders.make(singleton("a"), 10L, null),
myStore.get("A"));
+ assertEquals(ValueTimestampHeaders.make(singleton("a"), 10L, new
RecordHeaders()), myStore.get("A"));
reduceProcessor.process(new Record<>("A", new Change<>(singleton("b"),
singleton("a")), 15L));
- assertEquals(ValueTimestampHeaders.make(singleton("b"), 15L, null),
myStore.get("A"));
+ assertEquals(ValueTimestampHeaders.make(singleton("b"), 15L, new
RecordHeaders()), myStore.get("A"));
reduceProcessor.process(new Record<>("A", new Change<>(null,
singleton("b")), 12L));
- assertEquals(ValueTimestampHeaders.make(emptySet(), 15L, null),
myStore.get("A"));
+ assertEquals(ValueTimestampHeaders.make(emptySet(), 15L, new
RecordHeaders()), myStore.get("A"));
}
private Set<String> differenceNotNullArgs(final Set<String> left, final
Set<String> right) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index e383729a79c..4680bcad4c7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -221,29 +222,29 @@ public class KTableSourceTest {
inputTopic1.pipeInput("B", "01", 20L);
inputTopic1.pipeInput("C", "01", 15L);
- assertEquals(ValueTimestampHeaders.make("01", 10L, null),
getter1.get("A"));
- assertEquals(ValueTimestampHeaders.make("01", 20L, null),
getter1.get("B"));
- assertEquals(ValueTimestampHeaders.make("01", 15L, null),
getter1.get("C"));
+ assertEquals(ValueTimestampHeaders.make("01", 10L, new
RecordHeaders()), getter1.get("A"));
+ assertEquals(ValueTimestampHeaders.make("01", 20L, new
RecordHeaders()), getter1.get("B"));
+ assertEquals(ValueTimestampHeaders.make("01", 15L, new
RecordHeaders()), getter1.get("C"));
inputTopic1.pipeInput("A", "02", 30L);
inputTopic1.pipeInput("B", "02", 5L);
- assertEquals(ValueTimestampHeaders.make("02", 30L, null),
getter1.get("A"));
- assertEquals(ValueTimestampHeaders.make("02", 5L, null),
getter1.get("B"));
- assertEquals(ValueTimestampHeaders.make("01", 15L, null),
getter1.get("C"));
+ assertEquals(ValueTimestampHeaders.make("02", 30L, new
RecordHeaders()), getter1.get("A"));
+ assertEquals(ValueTimestampHeaders.make("02", 5L, new
RecordHeaders()), getter1.get("B"));
+ assertEquals(ValueTimestampHeaders.make("01", 15L, new
RecordHeaders()), getter1.get("C"));
inputTopic1.pipeInput("A", "03", 29L);
- assertEquals(ValueTimestampHeaders.make("03", 29L, null),
getter1.get("A"));
- assertEquals(ValueTimestampHeaders.make("02", 5L, null),
getter1.get("B"));
- assertEquals(ValueTimestampHeaders.make("01", 15L, null),
getter1.get("C"));
+ assertEquals(ValueTimestampHeaders.make("03", 29L, new
RecordHeaders()), getter1.get("A"));
+ assertEquals(ValueTimestampHeaders.make("02", 5L, new
RecordHeaders()), getter1.get("B"));
+ assertEquals(ValueTimestampHeaders.make("01", 15L, new
RecordHeaders()), getter1.get("C"));
inputTopic1.pipeInput("A", null, 50L);
inputTopic1.pipeInput("B", null, 3L);
assertNull(getter1.get("A"));
assertNull(getter1.get("B"));
- assertEquals(ValueTimestampHeaders.make("01", 15L, null),
getter1.get("C"));
+ assertEquals(ValueTimestampHeaders.make("01", 15L, new
RecordHeaders()), getter1.get("C"));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index d9b07fdbdf3..cd8767a2064 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -210,7 +210,7 @@ public class KTableTransformValuesTest {
when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier);
when(parentGetterSupplier.get()).thenReturn(parentGetter);
-
when(parentGetter.get("Key")).thenReturn(ValueTimestampHeaders.make("Value",
73L, null));
+
when(parentGetter.get("Key")).thenReturn(ValueTimestampHeaders.make("Value",
73L, new RecordHeaders()));
final ProcessorRecordContext recordContext = new
ProcessorRecordContext(
42L,
23L,
@@ -242,7 +242,7 @@ public class KTableTransformValuesTest {
new KTableTransformValues<>(parent, new
ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
when(context.getStateStore(QUERYABLE_NAME)).thenReturn(stateStore);
-
when(stateStore.get("Key")).thenReturn(ValueTimestampHeaders.make("something",
0L, null));
+
when(stateStore.get("Key")).thenReturn(ValueTimestampHeaders.make("something",
0L, new RecordHeaders()));
final KTableValueGetter<String, String> getter =
transformValues.view().get();
getter.init(context);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
index 7d7c3da9287..a7ba164b1da 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -49,8 +50,8 @@ public class TimestampedCacheFlushListenerWithHeadersTest {
new Record<>(
"key",
new Change<>(
- ValueTimestampHeaders.make("newValue", 42L, null),
- ValueTimestampHeaders.make("oldValue", 21L, null)),
+ ValueTimestampHeaders.make("newValue", 42L, new
RecordHeaders()),
+ ValueTimestampHeaders.make("oldValue", 21L, new
RecordHeaders())),
73L));
verify(context, times(2)).setCurrentNode(null);
@@ -69,7 +70,7 @@ public class TimestampedCacheFlushListenerWithHeadersTest {
new TimestampedCacheFlushListenerWithHeaders<>(context).apply(
new Record<>(
"key",
- new Change<>(null, ValueTimestampHeaders.make("oldValue", 21L,
null)),
+ new Change<>(null, ValueTimestampHeaders.make("oldValue", 21L,
new RecordHeaders())),
73L));
verify(context, times(2)).setCurrentNode(null);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 19684cbb6fd..2d53b83a94f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -200,7 +201,7 @@ public class ForeignTableJoinProcessorSupplierTests {
SubscriptionWrapper.VERSION_0,
null
);
- final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
+ final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk);
stateStore.put(key, oldValue);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index dcd492feebe..e422d27992d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -62,7 +63,7 @@ public class ResponseJoinProcessorSupplierTest {
@Override
public ValueTimestampHeaders<V> get(final K key) {
- return ValueTimestampHeaders.make(map.get(key), -1, null);
+ return ValueTimestampHeaders.make(map.get(key), -1, new
RecordHeaders());
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
index 310e47d5e51..a632c71c9fe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
@@ -37,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class SubscriptionJoinProcessorSupplierTest {
final Map<String, ValueTimestampHeaders<String>> fks =
Collections.singletonMap(
- "fk1", ValueTimestampHeaders.make("foo", 1L, null)
+ "fk1", ValueTimestampHeaders.make("foo", 1L, new RecordHeaders())
);
final KTableValueGetterSupplier<String, String> valueGetterSupplier =
valueGetterSupplier(fks);
final Processor<CombinedKey<String, String>,
@@ -67,7 +68,7 @@ public class SubscriptionJoinProcessorSupplierTest {
final Record<CombinedKey<String, String>,
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
new Record<>(
new CombinedKey<>("fk1", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
@@ -101,7 +102,7 @@ public class SubscriptionJoinProcessorSupplierTest {
final Record<CombinedKey<String, String>,
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
new Record<>(
new CombinedKey<>("fk1", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
@@ -136,7 +137,7 @@ public class SubscriptionJoinProcessorSupplierTest {
final Record<CombinedKey<String, String>,
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
new Record<>(
new CombinedKey<>("fk1", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
@@ -171,7 +172,7 @@ public class SubscriptionJoinProcessorSupplierTest {
final Record<CombinedKey<String, String>,
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
new Record<>(
new CombinedKey<>("fk1", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
@@ -205,7 +206,7 @@ public class SubscriptionJoinProcessorSupplierTest {
Record<CombinedKey<String, String>,
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
new Record<>(
new CombinedKey<>("fk1", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
@@ -226,7 +227,7 @@ public class SubscriptionJoinProcessorSupplierTest {
record = new Record<>(
new CombinedKey<>("fk9000", "pk1"),
- new Change<>(ValueTimestampHeaders.make(newValue, 1L, null),
null),
+ new Change<>(ValueTimestampHeaders.make(newValue, 1L, new
RecordHeaders()), null),
1L
);
processor.process(record);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index 61729383315..ddfcadfc261 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -107,7 +108,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
SubscriptionWrapper.VERSION_0,
null
);
- final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
+ final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
stateStore.put(key, oldValue);
@@ -158,7 +159,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
SubscriptionWrapper.VERSION_1,
1
);
- final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
+ final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
stateStore.put(key, oldValue);
@@ -210,7 +211,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
SubscriptionWrapper.VERSION_0,
null
);
- final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
+ final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
stateStore.put(key, oldValue);
@@ -262,7 +263,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
SubscriptionWrapper.VERSION_1,
1
);
- final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, null);
+ final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue =
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
stateStore.put(key, oldValue);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
index d0546ca7d15..d5309922467 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -55,7 +56,7 @@ public class KeyValueStoreWrapperTest {
private static final String STORE_NAME = "kvStore";
private static final String KEY = "k";
private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS
- = ValueTimestampHeaders.make("v", 8L, null);
+ = ValueTimestampHeaders.make("v", 8L, new RecordHeaders());
@Mock
private TimestampedKeyValueStoreWithHeaders<String, String> headersStore;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 29c05296d2c..3efbfdf5b53 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -306,7 +306,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
context.setRecordContext(recordContext);
buffer.put(1L, new Record<>("A", new Change<>("new-value",
"old-value"), 0L), recordContext);
buffer.put(1L, new Record<>("B", new Change<>("new-value", null), 0L),
recordContext);
- assertThat(buffer.priorValueForBuffered("A"),
is(Maybe.defined(ValueTimestampHeaders.make("old-value", -1, null))));
+ assertThat(buffer.priorValueForBuffered("A"),
is(Maybe.defined(ValueTimestampHeaders.make("old-value", -1, new
RecordHeaders()))));
assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
}
@@ -475,7 +475,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
assertThat(buffer.priorValueForBuffered("todelete"),
is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"),
is(Maybe.defined(null)));
- assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+ assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new
RecordHeaders()))));
// flush the buffer into a list in buffer order so we can make
assertions about the contents.
@@ -598,7 +598,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
assertThat(buffer.priorValueForBuffered("todelete"),
is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"),
is(Maybe.defined(null)));
- assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+ assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new
RecordHeaders()))));
// flush the buffer into a list in buffer order so we can make
assertions about the contents.
@@ -722,7 +722,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
assertThat(buffer.priorValueForBuffered("todelete"),
is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"),
is(Maybe.defined(null)));
- assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+ assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new
RecordHeaders()))));
// flush the buffer into a list in buffer order so we can make
assertions about the contents.
@@ -848,7 +848,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
assertThat(buffer.priorValueForBuffered("todelete"),
is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"),
is(Maybe.defined(null)));
- assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+ assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new
RecordHeaders()))));
// flush the buffer into a list in buffer order so we can make
assertions about the contents.
@@ -971,7 +971,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
assertThat(buffer.priorValueForBuffered("todelete"),
is(Maybe.undefined()));
assertThat(buffer.priorValueForBuffered("asdf"),
is(Maybe.defined(null)));
- assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+ assertThat(buffer.priorValueForBuffered("zxcv"),
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new
RecordHeaders()))));
// flush the buffer into a list in buffer order so we can make
assertions about the contents.
diff --git
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index 203aae869ba..e6589e015ca 100644
---
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -51,12 +51,12 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
super(null);
this.name = name;
- this.map = new TreeMap<>();
+ map = new TreeMap<>();
}
@Override
public String name() {
- return this.name;
+ return name;
}
@Override
@@ -65,7 +65,7 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
stateStoreContext.register(root, null);
}
- this.open = true;
+ open = true;
}
@Override
@@ -81,21 +81,21 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
@Override
public boolean isOpen() {
- return this.open;
+ return open;
}
@Override
public synchronized V get(final K key) {
- return this.map.get(key);
+ return map.get(key);
}
@Override
public synchronized void put(final K key,
final V value) {
if (value == null) {
- this.map.remove(key);
+ map.remove(key);
} else {
- this.map.put(key, value);
+ map.put(key, value);
}
}
@@ -118,26 +118,25 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
@Override
public synchronized V delete(final K key) {
- return this.map.remove(key);
+ return map.remove(key);
}
@Override
public synchronized KeyValueIterator<K, V> range(final K from,
final K to) {
- return new DelegatingPeekingKeyValueIterator<>(
- name,
- new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true,
to, true).entrySet().iterator()));
+ final TreeMap<K, V> copy = new TreeMap<>(map.subMap(from, true, to,
true));
+ return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
}
@Override
public synchronized KeyValueIterator<K, V> all() {
- final TreeMap<K, V> copy = new TreeMap<>(this.map);
+ final TreeMap<K, V> copy = new TreeMap<>(map);
return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
}
@Override
public long approximateNumEntries() {
- return this.map.size();
+ return map.size();
}
@Override
@@ -147,8 +146,8 @@ public class GenericInMemoryKeyValueStore<K extends
Comparable, V>
@Override
public void close() {
- this.map.clear();
- this.open = false;
+ map.clear();
+ open = false;
}
private static class GenericInMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, V> {
diff --git
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
deleted file mode 100644
index 59880319583..00000000000
---
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-import
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * This class is a generic version of the in-memory key-value store that is
useful for testing when you
- * need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
- */
-@SuppressWarnings("deprecation")
-public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
- extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
- implements TimestampedKeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, ValueAndTimestamp<V>> map;
- private volatile boolean open = false;
-
- public GenericInMemoryTimestampedKeyValueStore(final String name) {
- // it's not really a `WrappedStateStore` so we pass `null`
- // however, we need to implement `WrappedStateStore` to make the store
usable
- super(null);
- this.name = name;
-
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
- if (root != null) {
- stateStoreContext.register(root, null);
- }
-
- this.open = true;
- }
-
- @Override
- public boolean setFlushListener(final CacheFlushListener<K,
ValueAndTimestamp<V>> listener,
- final boolean sendOldValues) {
- return false;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public boolean isOpen() {
- return this.open;
- }
-
- @Override
- public synchronized ValueAndTimestamp<V> get(final K key) {
- return this.map.get(key);
- }
-
- @Override
- public synchronized void put(final K key,
- final ValueAndTimestamp<V> value) {
- if (value == null) {
- this.map.remove(key);
- } else {
- this.map.put(key, value);
- }
- }
-
- @Override
- public synchronized ValueAndTimestamp<V> putIfAbsent(final K key,
- final
ValueAndTimestamp<V> value) {
- final ValueAndTimestamp<V> originalValue = get(key);
- if (originalValue == null) {
- put(key, value);
- }
- return originalValue;
- }
-
- @Override
- public synchronized void putAll(final List<KeyValue<K,
ValueAndTimestamp<V>>> entries) {
- for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
- put(entry.key, entry.value);
- }
- }
-
- @Override
- public synchronized ValueAndTimestamp<V> delete(final K key) {
- return this.map.remove(key);
- }
-
- @Override
- public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> range(final
K from,
- final K to) {
- return new DelegatingPeekingKeyValueIterator<>(
- name,
- new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true,
to, true).entrySet().iterator()));
- }
-
- @Override
- public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> all() {
- final TreeMap<K, ValueAndTimestamp<V>> copy = new TreeMap<>(this.map);
- return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
- }
-
- @Override
- public long approximateNumEntries() {
- return this.map.size();
- }
-
- @Override
- public void commit(final Map<TopicPartition, Long> changelogOffsets) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- this.map.clear();
- this.open = false;
- }
-
- private static class GenericInMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, ValueAndTimestamp<V>> {
- private final Iterator<Entry<K, ValueAndTimestamp<V>>> iter;
-
- private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K,
ValueAndTimestamp<V>>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public KeyValue<K, ValueAndTimestamp<V>> next() {
- final Map.Entry<K, ValueAndTimestamp<V>> entry = iter.next();
- return new KeyValue<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void close() {
- // do nothing
- }
-
- @Override
- public K peekNextKey() {
- throw new UnsupportedOperationException("peekNextKey() not
supported in " + getClass().getName());
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
index d264f647ead..220fad8ae0d 100644
---
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
@@ -38,7 +38,6 @@ import java.util.TreeMap;
* This class is a generic version of the in-memory key-value store that is
useful for testing when you
* need a basic KeyValueStore for arbitrary types and don't have/want to
write a serde
*/
-@SuppressWarnings("deprecation")
public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends
Comparable, V>
extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
implements TimestampedKeyValueStoreWithHeaders<K, V> {
@@ -53,12 +52,12 @@ public class
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
super(null);
this.name = name;
- this.map = new TreeMap<>();
+ map = new TreeMap<>();
}
@Override
public String name() {
- return this.name;
+ return name;
}
@Override
@@ -67,7 +66,7 @@ public class
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
stateStoreContext.register(root, null);
}
- this.open = true;
+ open = true;
}
@Override
@@ -83,21 +82,21 @@ public class
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
@Override
public boolean isOpen() {
- return this.open;
+ return open;
}
@Override
public synchronized ValueTimestampHeaders<V> get(final K key) {
- return this.map.get(key);
+ return map.get(key);
}
@Override
public synchronized void put(final K key,
final ValueTimestampHeaders<V> value) {
if (value == null) {
- this.map.remove(key);
+ map.remove(key);
} else {
- this.map.put(key, value);
+ map.put(key, value);
}
}
@@ -120,26 +119,25 @@ public class
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
@Override
public synchronized ValueTimestampHeaders<V> delete(final K key) {
- return this.map.remove(key);
+ return map.remove(key);
}
@Override
public synchronized KeyValueIterator<K, ValueTimestampHeaders<V>>
range(final K from,
final K to) {
- return new DelegatingPeekingKeyValueIterator<>(
- name,
- new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true,
to, true).entrySet().iterator()));
+ final TreeMap<K, ValueTimestampHeaders<V>> copy = new
TreeMap<>(map.subMap(from, true, to, true));
+ return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
}
@Override
public synchronized KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
- final TreeMap<K, ValueTimestampHeaders<V>> copy = new
TreeMap<>(this.map);
+ final TreeMap<K, ValueTimestampHeaders<V>> copy = new TreeMap<>(map);
return new DelegatingPeekingKeyValueIterator<>(name, new
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
}
@Override
public long approximateNumEntries() {
- return this.map.size();
+ return map.size();
}
@Override
@@ -149,8 +147,8 @@ public class
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
@Override
public void close() {
- this.map.clear();
- this.open = false;
+ map.clear();
+ open = false;
}
private static class GenericInMemoryKeyValueIterator<K, V> implements
KeyValueIterator<K, ValueTimestampHeaders<V>> {
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9f927f5893a..ae56c798776 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1581,7 +1581,7 @@ public class TopologyTestDriver implements Closeable {
public void put(final K key,
final V value,
final long windowStartTimestamp) {
- inner.put(key, ValueTimestampHeaders.make(value,
ConsumerRecord.NO_TIMESTAMP, null), windowStartTimestamp);
+ inner.put(key, ValueTimestampHeaders.make(value,
ConsumerRecord.NO_TIMESTAMP, new RecordHeaders()), windowStartTimestamp);
}
@Override