This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aa97adcf368 MINOR: Cleaning DSL ops and tests (#21792)
aa97adcf368 is described below

commit aa97adcf368c2136d3e74015afc55d6ad26ef499
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Mar 17 23:45:05 2026 +0100

    MINOR: Cleaning DSL ops and tests (#21792)
    
    This PR
    - cleans up tests (replaces null headers with empty headers),
    - removes unused classes and unnecessary identifiers,
    - and removes unnecessary variable defined in KeyValueStoreWrapper
    class.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../state/internals/KeyValueStoreWrapper.java      |  18 +-
 .../kafka/streams/KeyValueTimestampHeaders.java    |  75 ---------
 .../kstream/internals/KTableFilterTest.java        |  21 +--
 .../kstream/internals/KTableMapValuesTest.java     |  45 ++---
 .../kstream/internals/KTableReduceTest.java        |   7 +-
 .../kstream/internals/KTableSourceTest.java        |  21 +--
 .../internals/KTableTransformValuesTest.java       |   4 +-
 ...mestampedCacheFlushListenerWithHeadersTest.java |   7 +-
 .../ForeignTableJoinProcessorSupplierTests.java    |   3 +-
 .../ResponseJoinProcessorSupplierTest.java         |   3 +-
 .../SubscriptionJoinProcessorSupplierTest.java     |  15 +-
 .../SubscriptionReceiveProcessorSupplierTest.java  |   9 +-
 .../state/internals/KeyValueStoreWrapperTest.java  |   3 +-
 .../internals/TimeOrderedKeyValueBufferTest.java   |  12 +-
 .../kafka/test/GenericInMemoryKeyValueStore.java   |  29 ++--
 .../GenericInMemoryTimestampedKeyValueStore.java   | 184 ---------------------
 ...nMemoryTimestampedKeyValueStoreWithHeaders.java |  30 ++--
 .../apache/kafka/streams/TopologyTestDriver.java   |   2 +-
 18 files changed, 117 insertions(+), 371 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index 5d21e622ae3..dc9fb21046b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -56,13 +56,10 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
     // to simplify implementation for methods which do not depend on store 
type.
     private StateStore store;
 
-    @SuppressWarnings("unchecked")
     public KeyValueStoreWrapper(final ProcessorContext<?, ?> context, final 
String storeName) {
-        final StateStore rawStore = context.getStateStore(storeName);
-
         // Try headers-aware timestamped store
         try {
-            headersStore = (TimestampedKeyValueStoreWithHeaders<K, V>) 
rawStore;
+            headersStore = context.getStateStore(storeName);
             store = headersStore;
             return;
         } catch (final ClassCastException e) {
@@ -71,12 +68,13 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
 
         // Try versioned store
         try {
-            versionedStore = (VersionedKeyValueStore<K, V>) rawStore;
+            versionedStore = context.getStateStore(storeName);
             store = versionedStore;
         } catch (final ClassCastException e) {
-            final String storeType = rawStore == null ? "null" : 
rawStore.getClass().getName();
+            store = context.getStateStore(storeName);
+            final String storeType = store == null ? "null" : 
store.getClass().getName();
             throw new InvalidStateStoreException("KTable source state store 
must implement either "
-                + "TimestampedKeyValueStore, 
TimestampedKeyValueStoreWithHeaders, or VersionedKeyValueStore. Got: " + 
storeType);
+                + "TimestampedKeyValueStoreWithHeaders, or 
VersionedKeyValueStore. Got: " + storeType);
         }
     }
 
@@ -88,9 +86,9 @@ public class KeyValueStoreWrapper<K, V> implements StateStore 
{
             final VersionedRecord<V> versionedRecord = versionedStore.get(key);
             return versionedRecord == null
                 ? null
-                : ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), null);
+                : ValueTimestampHeaders.make(versionedRecord.value(), 
versionedRecord.timestamp(), new RecordHeaders());
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either headers or versioned store");
     }
 
     public ValueTimestampHeaders<V> get(final K key, final long asOfTimestamp) 
{
@@ -114,7 +112,7 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
         if (versionedStore != null) {
             return versionedStore.put(key, value, timestamp);
         }
-        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped, headers, or versioned store");
+        throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either headers or versioned store");
     }
 
     public StateStore store() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java 
b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java
deleted file mode 100644
index 5f75d825b15..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestampHeaders.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.header.Headers;
-
-import java.util.Objects;
-
-/**
- * A key-value pair with timestamp and headers, used for testing KTable with 
headers support.
- */
-public class KeyValueTimestampHeaders<K, V> {
-    private final K key;
-    private final V value;
-    private final long timestamp;
-    private final Headers headers;
-
-    public KeyValueTimestampHeaders(final K key, final V value, final long 
timestamp, final Headers headers) {
-        this.key = key;
-        this.value = value;
-        this.timestamp = timestamp;
-        this.headers = headers;
-    }
-
-    public K key() {
-        return key;
-    }
-
-    public V value() {
-        return value;
-    }
-
-    public long timestamp() {
-        return timestamp;
-    }
-
-    public Headers headers() {
-        return headers;
-    }
-
-    @Override
-    public String toString() {
-        return "KeyValueTimestampHeaders{key=" + key + ", value=" + value + ", 
timestamp=" + timestamp + ", headers=" + headers + '}';
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        final KeyValueTimestampHeaders<?, ?> that = 
(KeyValueTimestampHeaders<?, ?>) o;
-        return timestamp == that.timestamp
-                && Objects.equals(key, that.key)
-                && Objects.equals(value, that.value)
-                && Objects.equals(headers, that.headers);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(key, value, timestamp, headers);
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index f87c3bbe87c..45adf9951f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -167,30 +168,30 @@ public class KTableFilterTest {
             assertNull(getter2.get("B"));
             assertNull(getter2.get("C"));
 
-            assertEquals(ValueTimestampHeaders.make(1, 5L, null), 
getter3.get("A"));
-            assertEquals(ValueTimestampHeaders.make(1, 10L, null), 
getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 15L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(1, 5L, new 
RecordHeaders()), getter3.get("A"));
+            assertEquals(ValueTimestampHeaders.make(1, 10L, new 
RecordHeaders()), getter3.get("B"));
+            assertEquals(ValueTimestampHeaders.make(1, 15L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic.pipeInput("A", 2, 10L);
             inputTopic.pipeInput("B", 2, 5L);
 
-            assertEquals(ValueTimestampHeaders.make(2, 10L, null), 
getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(2, 5L, null), 
getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(2, 10L, new 
RecordHeaders()), getter2.get("A"));
+            assertEquals(ValueTimestampHeaders.make(2, 5L, new 
RecordHeaders()), getter2.get("B"));
             assertNull(getter2.get("C"));
 
             assertNull(getter3.get("A"));
             assertNull(getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 15L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(1, 15L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic.pipeInput("A", 3, 15L);
 
             assertNull(getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(2, 5L, null), 
getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(2, 5L, new 
RecordHeaders()), getter2.get("B"));
             assertNull(getter2.get("C"));
 
-            assertEquals(ValueTimestampHeaders.make(3, 15L, null), 
getter3.get("A"));
+            assertEquals(ValueTimestampHeaders.make(3, 15L, new 
RecordHeaders()), getter3.get("A"));
             assertNull(getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 15L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(1, 15L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic.pipeInput("A", null, 10L);
             inputTopic.pipeInput("B", null, 20L);
@@ -201,7 +202,7 @@ public class KTableFilterTest {
 
             assertNull(getter3.get("A"));
             assertNull(getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 15L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(1, 15L, new 
RecordHeaders()), getter3.get("C"));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 00c9d372c8e..6ca6a8320d7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -130,44 +131,44 @@ public class KTableMapValuesTest {
             inputTopic1.pipeInput("B", "01", 10L);
             inputTopic1.pipeInput("C", "01", 30L);
 
-            assertEquals(ValueTimestampHeaders.make(1, 50L, null), 
getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(1, 10L, null), 
getter2.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 30L, null), 
getter2.get("C"));
+            assertEquals(ValueTimestampHeaders.make(1, 50L, new 
RecordHeaders()), getter2.get("A"));
+            assertEquals(ValueTimestampHeaders.make(1, 10L, new 
RecordHeaders()), getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(1, 30L, new 
RecordHeaders()), getter2.get("C"));
 
-            assertEquals(ValueTimestampHeaders.make(-1, 50L, null), 
getter3.get("A"));
-            assertEquals(ValueTimestampHeaders.make(-1, 10L, null), 
getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(-1, 30L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(-1, 50L, new 
RecordHeaders()), getter3.get("A"));
+            assertEquals(ValueTimestampHeaders.make(-1, 10L, new 
RecordHeaders()), getter3.get("B"));
+            assertEquals(ValueTimestampHeaders.make(-1, 30L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic1.pipeInput("A", "02", 25L);
             inputTopic1.pipeInput("B", "02", 20L);
 
-            assertEquals(ValueTimestampHeaders.make(2, 25L, null), 
getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(2, 20L, null), 
getter2.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 30L, null), 
getter2.get("C"));
+            assertEquals(ValueTimestampHeaders.make(2, 25L, new 
RecordHeaders()), getter2.get("A"));
+            assertEquals(ValueTimestampHeaders.make(2, 20L, new 
RecordHeaders()), getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(1, 30L, new 
RecordHeaders()), getter2.get("C"));
 
-            assertEquals(ValueTimestampHeaders.make(-2, 25L, null), 
getter3.get("A"));
-            assertEquals(ValueTimestampHeaders.make(-2, 20L, null), 
getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(-1, 30L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(-2, 25L, new 
RecordHeaders()), getter3.get("A"));
+            assertEquals(ValueTimestampHeaders.make(-2, 20L, new 
RecordHeaders()), getter3.get("B"));
+            assertEquals(ValueTimestampHeaders.make(-1, 30L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic1.pipeInput("A", "03", 35L);
 
-            assertEquals(ValueTimestampHeaders.make(3, 35L, null), 
getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(2, 20L, null), 
getter2.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 30L, null), 
getter2.get("C"));
+            assertEquals(ValueTimestampHeaders.make(3, 35L, new 
RecordHeaders()), getter2.get("A"));
+            assertEquals(ValueTimestampHeaders.make(2, 20L, new 
RecordHeaders()), getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(1, 30L, new 
RecordHeaders()), getter2.get("C"));
 
-            assertEquals(ValueTimestampHeaders.make(-3, 35L, null), 
getter3.get("A"));
-            assertEquals(ValueTimestampHeaders.make(-2, 20L, null), 
getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(-1, 30L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(-3, 35L, new 
RecordHeaders()), getter3.get("A"));
+            assertEquals(ValueTimestampHeaders.make(-2, 20L, new 
RecordHeaders()), getter3.get("B"));
+            assertEquals(ValueTimestampHeaders.make(-1, 30L, new 
RecordHeaders()), getter3.get("C"));
 
             inputTopic1.pipeInput("A", (String) null, 1L);
 
             assertNull(getter2.get("A"));
-            assertEquals(ValueTimestampHeaders.make(2, 20L, null), 
getter2.get("B"));
-            assertEquals(ValueTimestampHeaders.make(1, 30L, null), 
getter2.get("C"));
+            assertEquals(ValueTimestampHeaders.make(2, 20L, new 
RecordHeaders()), getter2.get("B"));
+            assertEquals(ValueTimestampHeaders.make(1, 30L, new 
RecordHeaders()), getter2.get("C"));
 
             assertNull(getter3.get("A"));
-            assertEquals(ValueTimestampHeaders.make(-2, 20L, null), 
getter3.get("B"));
-            assertEquals(ValueTimestampHeaders.make(-1, 30L, null), 
getter3.get("C"));
+            assertEquals(ValueTimestampHeaders.make(-2, 20L, new 
RecordHeaders()), getter3.get("B"));
+            assertEquals(ValueTimestampHeaders.make(-1, 30L, new 
RecordHeaders()), getter3.get("C"));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
index ff39dddabc1..308dba4943e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableReduceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
@@ -56,11 +57,11 @@ public class KTableReduceTest {
         context.setCurrentNode(new ProcessorNode<>("reduce", reduceProcessor, 
singleton("myStore")));
 
         reduceProcessor.process(new Record<>("A", new Change<>(singleton("a"), 
null), 10L));
-        assertEquals(ValueTimestampHeaders.make(singleton("a"), 10L, null), 
myStore.get("A"));
+        assertEquals(ValueTimestampHeaders.make(singleton("a"), 10L, new 
RecordHeaders()), myStore.get("A"));
         reduceProcessor.process(new Record<>("A", new Change<>(singleton("b"), 
singleton("a")), 15L));
-        assertEquals(ValueTimestampHeaders.make(singleton("b"), 15L, null), 
myStore.get("A"));
+        assertEquals(ValueTimestampHeaders.make(singleton("b"), 15L, new 
RecordHeaders()), myStore.get("A"));
         reduceProcessor.process(new Record<>("A", new Change<>(null, 
singleton("b")), 12L));
-        assertEquals(ValueTimestampHeaders.make(emptySet(), 15L, null), 
myStore.get("A"));
+        assertEquals(ValueTimestampHeaders.make(emptySet(), 15L, new 
RecordHeaders()), myStore.get("A"));
     }
 
     private Set<String> differenceNotNullArgs(final Set<String> left, final 
Set<String> right) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index e383729a79c..4680bcad4c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -221,29 +222,29 @@ public class KTableSourceTest {
             inputTopic1.pipeInput("B", "01", 20L);
             inputTopic1.pipeInput("C", "01", 15L);
 
-            assertEquals(ValueTimestampHeaders.make("01", 10L, null), 
getter1.get("A"));
-            assertEquals(ValueTimestampHeaders.make("01", 20L, null), 
getter1.get("B"));
-            assertEquals(ValueTimestampHeaders.make("01", 15L, null), 
getter1.get("C"));
+            assertEquals(ValueTimestampHeaders.make("01", 10L, new 
RecordHeaders()), getter1.get("A"));
+            assertEquals(ValueTimestampHeaders.make("01", 20L, new 
RecordHeaders()), getter1.get("B"));
+            assertEquals(ValueTimestampHeaders.make("01", 15L, new 
RecordHeaders()), getter1.get("C"));
 
             inputTopic1.pipeInput("A", "02", 30L);
             inputTopic1.pipeInput("B", "02", 5L);
 
-            assertEquals(ValueTimestampHeaders.make("02", 30L, null), 
getter1.get("A"));
-            assertEquals(ValueTimestampHeaders.make("02", 5L, null), 
getter1.get("B"));
-            assertEquals(ValueTimestampHeaders.make("01", 15L, null), 
getter1.get("C"));
+            assertEquals(ValueTimestampHeaders.make("02", 30L, new 
RecordHeaders()), getter1.get("A"));
+            assertEquals(ValueTimestampHeaders.make("02", 5L, new 
RecordHeaders()), getter1.get("B"));
+            assertEquals(ValueTimestampHeaders.make("01", 15L, new 
RecordHeaders()), getter1.get("C"));
 
             inputTopic1.pipeInput("A", "03", 29L);
 
-            assertEquals(ValueTimestampHeaders.make("03", 29L, null), 
getter1.get("A"));
-            assertEquals(ValueTimestampHeaders.make("02", 5L, null), 
getter1.get("B"));
-            assertEquals(ValueTimestampHeaders.make("01", 15L, null), 
getter1.get("C"));
+            assertEquals(ValueTimestampHeaders.make("03", 29L, new 
RecordHeaders()), getter1.get("A"));
+            assertEquals(ValueTimestampHeaders.make("02", 5L, new 
RecordHeaders()), getter1.get("B"));
+            assertEquals(ValueTimestampHeaders.make("01", 15L, new 
RecordHeaders()), getter1.get("C"));
 
             inputTopic1.pipeInput("A", null, 50L);
             inputTopic1.pipeInput("B", null, 3L);
 
             assertNull(getter1.get("A"));
             assertNull(getter1.get("B"));
-            assertEquals(ValueTimestampHeaders.make("01", 15L, null), 
getter1.get("C"));
+            assertEquals(ValueTimestampHeaders.make("01", 15L, new 
RecordHeaders()), getter1.get("C"));
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index d9b07fdbdf3..cd8767a2064 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -210,7 +210,7 @@ public class KTableTransformValuesTest {
 
         when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier);
         when(parentGetterSupplier.get()).thenReturn(parentGetter);
-        
when(parentGetter.get("Key")).thenReturn(ValueTimestampHeaders.make("Value", 
73L, null));
+        
when(parentGetter.get("Key")).thenReturn(ValueTimestampHeaders.make("Value", 
73L, new RecordHeaders()));
         final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(
             42L,
             23L,
@@ -242,7 +242,7 @@ public class KTableTransformValuesTest {
             new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
 
         when(context.getStateStore(QUERYABLE_NAME)).thenReturn(stateStore);
-        
when(stateStore.get("Key")).thenReturn(ValueTimestampHeaders.make("something", 
0L, null));
+        
when(stateStore.get("Key")).thenReturn(ValueTimestampHeaders.make("something", 
0L, new RecordHeaders()));
 
         final KTableValueGetter<String, String> getter = 
transformValues.view().get();
         getter.init(context);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
index 7d7c3da9287..a7ba164b1da 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListenerWithHeadersTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -49,8 +50,8 @@ public class TimestampedCacheFlushListenerWithHeadersTest {
             new Record<>(
                 "key",
                 new Change<>(
-                    ValueTimestampHeaders.make("newValue", 42L, null),
-                    ValueTimestampHeaders.make("oldValue", 21L, null)),
+                    ValueTimestampHeaders.make("newValue", 42L, new 
RecordHeaders()),
+                    ValueTimestampHeaders.make("oldValue", 21L, new 
RecordHeaders())),
                 73L));
 
         verify(context, times(2)).setCurrentNode(null);
@@ -69,7 +70,7 @@ public class TimestampedCacheFlushListenerWithHeadersTest {
         new TimestampedCacheFlushListenerWithHeaders<>(context).apply(
             new Record<>(
                 "key",
-                new Change<>(null, ValueTimestampHeaders.make("oldValue", 21L, 
null)),
+                new Change<>(null, ValueTimestampHeaders.make("oldValue", 21L, 
new RecordHeaders())),
                 73L));
 
         verify(context, times(2)).setCurrentNode(null);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index 19684cbb6fd..2d53b83a94f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -200,7 +201,7 @@ public class ForeignTableJoinProcessorSupplierTests {
             SubscriptionWrapper.VERSION_0,
             null
         );
-        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
+        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
         final Bytes key = COMBINED_KEY_SCHEMA.toBytes(fk, pk);
         stateStore.put(key, oldValue);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index dcd492feebe..e422d27992d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -62,7 +63,7 @@ public class ResponseJoinProcessorSupplierTest {
 
                 @Override
                 public ValueTimestampHeaders<V> get(final K key) {
-                    return ValueTimestampHeaders.make(map.get(key), -1, null);
+                    return ValueTimestampHeaders.make(map.get(key), -1, new 
RecordHeaders());
                 }
 
                 @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
index 310e47d5e51..a632c71c9fe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplierTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
@@ -37,7 +38,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class SubscriptionJoinProcessorSupplierTest {
     final Map<String, ValueTimestampHeaders<String>> fks = 
Collections.singletonMap(
-        "fk1", ValueTimestampHeaders.make("foo", 1L, null)
+        "fk1", ValueTimestampHeaders.make("foo", 1L, new RecordHeaders())
     );
     final KTableValueGetterSupplier<String, String> valueGetterSupplier = 
valueGetterSupplier(fks);
     final Processor<CombinedKey<String, String>,
@@ -67,7 +68,7 @@ public class SubscriptionJoinProcessorSupplierTest {
         final Record<CombinedKey<String, String>, 
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
             new Record<>(
                 new CombinedKey<>("fk1", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
             );
         processor.process(record);
@@ -101,7 +102,7 @@ public class SubscriptionJoinProcessorSupplierTest {
         final Record<CombinedKey<String, String>, 
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
             new Record<>(
                 new CombinedKey<>("fk1", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
             );
         processor.process(record);
@@ -136,7 +137,7 @@ public class SubscriptionJoinProcessorSupplierTest {
         final Record<CombinedKey<String, String>, 
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
             new Record<>(
                 new CombinedKey<>("fk1", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
         );
         processor.process(record);
@@ -171,7 +172,7 @@ public class SubscriptionJoinProcessorSupplierTest {
         final Record<CombinedKey<String, String>, 
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
             new Record<>(
                 new CombinedKey<>("fk1", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
             );
         processor.process(record);
@@ -205,7 +206,7 @@ public class SubscriptionJoinProcessorSupplierTest {
         Record<CombinedKey<String, String>, 
Change<ValueTimestampHeaders<SubscriptionWrapper<String>>>> record =
             new Record<>(
                 new CombinedKey<>("fk1", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
             );
         processor.process(record);
@@ -226,7 +227,7 @@ public class SubscriptionJoinProcessorSupplierTest {
 
         record = new Record<>(
                 new CombinedKey<>("fk9000", "pk1"),
-                new Change<>(ValueTimestampHeaders.make(newValue, 1L, null), 
null),
+                new Change<>(ValueTimestampHeaders.make(newValue, 1L, new 
RecordHeaders()), null),
                 1L
             );
         processor.process(record);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index 61729383315..ddfcadfc261 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -107,7 +108,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
             SubscriptionWrapper.VERSION_0,
             null
         );
-        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
+        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
         final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
         stateStore.put(key, oldValue);
@@ -158,7 +159,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
             SubscriptionWrapper.VERSION_1,
             1
         );
-        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
+        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
         final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
         stateStore.put(key, oldValue);
@@ -210,7 +211,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
             SubscriptionWrapper.VERSION_0,
             null
         );
-        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
+        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
         final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
         stateStore.put(key, oldValue);
@@ -262,7 +263,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
             SubscriptionWrapper.VERSION_1,
             1
         );
-        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, null);
+        final ValueTimestampHeaders<SubscriptionWrapper<String>> oldValue = 
ValueTimestampHeaders.make(oldWrapper, 0, new RecordHeaders());
 
         final Bytes key = COMBINED_KEY_SCHEMA.toBytes(FK, PK1);
         stateStore.put(key, oldValue);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
index d0546ca7d15..d5309922467 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapperTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -55,7 +56,7 @@ public class KeyValueStoreWrapperTest {
     private static final String STORE_NAME = "kvStore";
     private static final String KEY = "k";
     private static final ValueTimestampHeaders<String> VALUE_TIMESTAMP_HEADERS
-        = ValueTimestampHeaders.make("v", 8L, null);
+        = ValueTimestampHeaders.make("v", 8L, new RecordHeaders());
 
     @Mock
     private TimestampedKeyValueStoreWithHeaders<String, String> headersStore;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 29c05296d2c..3efbfdf5b53 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -306,7 +306,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         context.setRecordContext(recordContext);
         buffer.put(1L, new Record<>("A", new Change<>("new-value", 
"old-value"), 0L), recordContext);
         buffer.put(1L, new Record<>("B", new Change<>("new-value", null), 0L), 
recordContext);
-        assertThat(buffer.priorValueForBuffered("A"), 
is(Maybe.defined(ValueTimestampHeaders.make("old-value", -1, null))));
+        assertThat(buffer.priorValueForBuffered("A"), 
is(Maybe.defined(ValueTimestampHeaders.make("old-value", -1, new 
RecordHeaders()))));
         assertThat(buffer.priorValueForBuffered("B"), is(Maybe.defined(null)));
     }
 
@@ -475,7 +475,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
-        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new 
RecordHeaders()))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -598,7 +598,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
-        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new 
RecordHeaders()))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -722,7 +722,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
-        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new 
RecordHeaders()))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -848,7 +848,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
-        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new 
RecordHeaders()))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
@@ -971,7 +971,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
 
         assertThat(buffer.priorValueForBuffered("todelete"), 
is(Maybe.undefined()));
         assertThat(buffer.priorValueForBuffered("asdf"), 
is(Maybe.defined(null)));
-        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, null))));
+        assertThat(buffer.priorValueForBuffered("zxcv"), 
is(Maybe.defined(ValueTimestampHeaders.make("previous", -1, new 
RecordHeaders()))));
 
         // flush the buffer into a list in buffer order so we can make 
assertions about the contents.
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
index 203aae869ba..e6589e015ca 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryKeyValueStore.java
@@ -51,12 +51,12 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
         super(null);
         this.name = name;
 
-        this.map = new TreeMap<>();
+        map = new TreeMap<>();
     }
 
     @Override
     public String name() {
-        return this.name;
+        return name;
     }
 
     @Override
@@ -65,7 +65,7 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
             stateStoreContext.register(root, null);
         }
 
-        this.open = true;
+        open = true;
     }
 
     @Override
@@ -81,21 +81,21 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
 
     @Override
     public boolean isOpen() {
-        return this.open;
+        return open;
     }
 
     @Override
     public synchronized V get(final K key) {
-        return this.map.get(key);
+        return map.get(key);
     }
 
     @Override
     public synchronized void put(final K key,
         final V value) {
         if (value == null) {
-            this.map.remove(key);
+            map.remove(key);
         } else {
-            this.map.put(key, value);
+            map.put(key, value);
         }
     }
 
@@ -118,26 +118,25 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
 
     @Override
     public synchronized V delete(final K key) {
-        return this.map.remove(key);
+        return map.remove(key);
     }
 
     @Override
     public synchronized KeyValueIterator<K, V> range(final K from,
         final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(
-            name,
-            new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, 
to, true).entrySet().iterator()));
+        final TreeMap<K, V> copy = new TreeMap<>(map.subMap(from, true, to, 
true));
+        return new DelegatingPeekingKeyValueIterator<>(name, new 
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
     }
 
     @Override
     public synchronized KeyValueIterator<K, V> all() {
-        final TreeMap<K, V> copy = new TreeMap<>(this.map);
+        final TreeMap<K, V> copy = new TreeMap<>(map);
         return new DelegatingPeekingKeyValueIterator<>(name, new 
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
     }
 
     @Override
     public long approximateNumEntries() {
-        return this.map.size();
+        return map.size();
     }
 
     @Override
@@ -147,8 +146,8 @@ public class GenericInMemoryKeyValueStore<K extends 
Comparable, V>
 
     @Override
     public void close() {
-        this.map.clear();
-        this.open = false;
+        map.clear();
+        open = false;
     }
 
     private static class GenericInMemoryKeyValueIterator<K, V> implements 
KeyValueIterator<K, V> {
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
deleted file mode 100644
index 59880319583..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStore.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.internals.CacheFlushListener;
-import 
org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator;
-import org.apache.kafka.streams.state.internals.WrappedStateStore;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * This class is a generic version of the in-memory key-value store that is 
useful for testing when you
- *  need a basic KeyValueStore for arbitrary types and don't have/want to 
write a serde
- */
-@SuppressWarnings("deprecation")
-public class GenericInMemoryTimestampedKeyValueStore<K extends Comparable, V>
-    extends WrappedStateStore<StateStore, K, ValueAndTimestamp<V>>
-    implements TimestampedKeyValueStore<K, V> {
-
-    private final String name;
-    private final NavigableMap<K, ValueAndTimestamp<V>> map;
-    private volatile boolean open = false;
-
-    public GenericInMemoryTimestampedKeyValueStore(final String name) {
-        // it's not really a `WrappedStateStore` so we pass `null`
-        // however, we need to implement `WrappedStateStore` to make the store 
usable
-        super(null);
-        this.name = name;
-
-        this.map = new TreeMap<>();
-    }
-
-    @Override
-    public String name() {
-        return this.name;
-    }
-
-    @Override
-    public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
-        if (root != null) {
-            stateStoreContext.register(root, null);
-        }
-
-        this.open = true;
-    }
-
-    @Override
-    public boolean setFlushListener(final CacheFlushListener<K, 
ValueAndTimestamp<V>> listener,
-                                    final boolean sendOldValues) {
-        return false;
-    }
-
-    @Override
-    public boolean persistent() {
-        return false;
-    }
-
-    @Override
-    public boolean isOpen() {
-        return this.open;
-    }
-
-    @Override
-    public synchronized ValueAndTimestamp<V> get(final K key) {
-        return this.map.get(key);
-    }
-
-    @Override
-    public synchronized void put(final K key,
-                                 final ValueAndTimestamp<V> value) {
-        if (value == null) {
-            this.map.remove(key);
-        } else {
-            this.map.put(key, value);
-        }
-    }
-
-    @Override
-    public synchronized ValueAndTimestamp<V> putIfAbsent(final K key,
-                                                         final 
ValueAndTimestamp<V> value) {
-        final ValueAndTimestamp<V> originalValue = get(key);
-        if (originalValue == null) {
-            put(key, value);
-        }
-        return originalValue;
-    }
-
-    @Override
-    public synchronized void putAll(final List<KeyValue<K, 
ValueAndTimestamp<V>>> entries) {
-        for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
-            put(entry.key, entry.value);
-        }
-    }
-
-    @Override
-    public synchronized ValueAndTimestamp<V> delete(final K key) {
-        return this.map.remove(key);
-    }
-
-    @Override
-    public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> range(final 
K from,
-        final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(
-            name,
-            new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, 
to, true).entrySet().iterator()));
-    }
-
-    @Override
-    public synchronized KeyValueIterator<K, ValueAndTimestamp<V>> all() {
-        final TreeMap<K, ValueAndTimestamp<V>> copy = new TreeMap<>(this.map);
-        return new DelegatingPeekingKeyValueIterator<>(name, new 
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
-    }
-
-    @Override
-    public long approximateNumEntries() {
-        return this.map.size();
-    }
-
-    @Override
-    public void commit(final Map<TopicPartition, Long> changelogOffsets) {
-        // do-nothing since it is in-memory
-    }
-
-    @Override
-    public void close() {
-        this.map.clear();
-        this.open = false;
-    }
-
-    private static class GenericInMemoryKeyValueIterator<K, V> implements 
KeyValueIterator<K, ValueAndTimestamp<V>> {
-        private final Iterator<Entry<K, ValueAndTimestamp<V>>> iter;
-
-        private GenericInMemoryKeyValueIterator(final Iterator<Map.Entry<K, 
ValueAndTimestamp<V>>> iter) {
-            this.iter = iter;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<K, ValueAndTimestamp<V>> next() {
-            final Map.Entry<K, ValueAndTimestamp<V>> entry = iter.next();
-            return new KeyValue<>(entry.getKey(), entry.getValue());
-        }
-
-        @Override
-        public void close() {
-            // do nothing
-        }
-
-        @Override
-        public K peekNextKey() {
-            throw new UnsupportedOperationException("peekNextKey() not 
supported in " + getClass().getName());
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
index d264f647ead..220fad8ae0d 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/GenericInMemoryTimestampedKeyValueStoreWithHeaders.java
@@ -38,7 +38,6 @@ import java.util.TreeMap;
  * This class is a generic version of the in-memory key-value store that is 
useful for testing when you
  *  need a basic KeyValueStore for arbitrary types and don't have/want to 
write a serde
  */
-@SuppressWarnings("deprecation")
 public class GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends 
Comparable, V>
     extends WrappedStateStore<StateStore, K, ValueTimestampHeaders<V>>
     implements TimestampedKeyValueStoreWithHeaders<K, V> {
@@ -53,12 +52,12 @@ public class 
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
         super(null);
         this.name = name;
 
-        this.map = new TreeMap<>();
+        map = new TreeMap<>();
     }
 
     @Override
     public String name() {
-        return this.name;
+        return name;
     }
 
     @Override
@@ -67,7 +66,7 @@ public class 
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
             stateStoreContext.register(root, null);
         }
 
-        this.open = true;
+        open = true;
     }
 
     @Override
@@ -83,21 +82,21 @@ public class 
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
 
     @Override
     public boolean isOpen() {
-        return this.open;
+        return open;
     }
 
     @Override
     public synchronized ValueTimestampHeaders<V> get(final K key) {
-        return this.map.get(key);
+        return map.get(key);
     }
 
     @Override
     public synchronized void put(final K key,
                                  final ValueTimestampHeaders<V> value) {
         if (value == null) {
-            this.map.remove(key);
+            map.remove(key);
         } else {
-            this.map.put(key, value);
+            map.put(key, value);
         }
     }
 
@@ -120,26 +119,25 @@ public class 
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
 
     @Override
     public synchronized ValueTimestampHeaders<V> delete(final K key) {
-        return this.map.remove(key);
+        return map.remove(key);
     }
 
     @Override
     public synchronized KeyValueIterator<K, ValueTimestampHeaders<V>> 
range(final K from,
                                                                             
final K to) {
-        return new DelegatingPeekingKeyValueIterator<>(
-            name,
-            new GenericInMemoryKeyValueIterator<>(this.map.subMap(from, true, 
to, true).entrySet().iterator()));
+        final TreeMap<K, ValueTimestampHeaders<V>> copy = new 
TreeMap<>(map.subMap(from, true, to, true));
+        return new DelegatingPeekingKeyValueIterator<>(name, new 
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
     }
 
     @Override
     public synchronized KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
-        final TreeMap<K, ValueTimestampHeaders<V>> copy = new 
TreeMap<>(this.map);
+        final TreeMap<K, ValueTimestampHeaders<V>> copy = new TreeMap<>(map);
         return new DelegatingPeekingKeyValueIterator<>(name, new 
GenericInMemoryKeyValueIterator<>(copy.entrySet().iterator()));
     }
 
     @Override
     public long approximateNumEntries() {
-        return this.map.size();
+        return map.size();
     }
 
     @Override
@@ -149,8 +147,8 @@ public class 
GenericInMemoryTimestampedKeyValueStoreWithHeaders<K extends Compar
 
     @Override
     public void close() {
-        this.map.clear();
-        this.open = false;
+        map.clear();
+        open = false;
     }
 
     private static class GenericInMemoryKeyValueIterator<K, V> implements 
KeyValueIterator<K, ValueTimestampHeaders<V>> {
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 9f927f5893a..ae56c798776 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1581,7 +1581,7 @@ public class TopologyTestDriver implements Closeable {
         public void put(final K key,
                         final V value,
                         final long windowStartTimestamp) {
-            inner.put(key, ValueTimestampHeaders.make(value, 
ConsumerRecord.NO_TIMESTAMP, null), windowStartTimestamp);
+            inner.put(key, ValueTimestampHeaders.make(value, 
ConsumerRecord.NO_TIMESTAMP, new RecordHeaders()), windowStartTimestamp);
         }
 
         @Override

Reply via email to