This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2713477dbac KAFKA-20173: Propagate headers into serde 6/N (#21850)
2713477dbac is described below
commit 2713477dbaca2809d8c61d3586f8021b0b8de4de
Author: Uladzislau Blok <[email protected]>
AuthorDate: Thu Apr 2 00:13:00 2026 +0200
KAFKA-20173: Propagate headers into serde 6/N (#21850)
This PR ensures headers are propagated in key related methods from
`WindowKeySchema` and `PrefixedWindowKeySchemas` Related tests are
updated
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/PrefixedWindowKeySchemas.java | 14 ++++----
.../streams/state/internals/WindowKeySchema.java | 11 +++---
...ctDualSchemaRocksDBSegmentedBytesStoreTest.java | 6 ++--
.../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +-
.../internals/AbstractWindowBytesStoreTest.java | 2 +-
.../state/internals/InMemoryWindowStoreTest.java | 6 ++--
...dSortedCacheWrappedWindowStoreIteratorTest.java | 41 ++++++++++++----------
...acheWrappedWindowStoreKeyValueIteratorTest.java | 6 ++--
.../state/internals/WindowKeySchemaTest.java | 37 +++++++++++--------
9 files changed, 70 insertions(+), 55 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
index d8ef4acedc7..9e2b05264f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
@@ -31,8 +31,6 @@ import java.util.List;
import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
public class PrefixedWindowKeySchemas {
private static final int PREFIX_SIZE = 1;
@@ -179,16 +177,18 @@ public class PrefixedWindowKeySchemas {
public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?>
serdes) {
- final byte[] serializedKey = serdes.rawKey(timeKey.key());
+ final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
return toStoreKeyBinary(serializedKey, timeKey.window().start(),
seqnum);
}
public static <K> Bytes toStoreKeyBinary(final K key,
final long timestamp,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?>
serdes) {
- final byte[] serializedKey = serdes.rawKey(key);
+ final byte[] serializedKey = serdes.rawKey(key, headers);
return toStoreKeyBinary(serializedKey, timestamp, seqnum);
}
@@ -312,8 +312,9 @@ public class PrefixedWindowKeySchemas {
public static <K> Bytes toStoreKeyBinary(final K key,
final long timestamp,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?>
serdes) {
- final byte[] serializedKey = serdes.rawKey(key);
+ final byte[] serializedKey = serdes.rawKey(key, headers);
return toStoreKeyBinary(serializedKey, timestamp, seqnum);
}
@@ -324,8 +325,9 @@ public class PrefixedWindowKeySchemas {
public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?>
serdes) {
- final byte[] serializedKey = serdes.rawKey(timeKey.key());
+ final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
return toStoreKeyBinary(serializedKey, timeKey.window().start(),
seqnum);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 2d9c8705cc4..d294f04b663 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -33,8 +33,6 @@ import java.util.List;
import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
private static final Logger LOG =
LoggerFactory.getLogger(WindowKeySchema.class);
@@ -168,8 +166,9 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
public static <K> Bytes toStoreKeyBinary(final K key,
final long timestamp,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?> serdes) {
- final byte[] serializedKey = serdes.rawKey(key);
+ final byte[] serializedKey = serdes.rawKey(key, headers);
return toStoreKeyBinary(serializedKey, timestamp, seqnum);
}
@@ -181,8 +180,9 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
public static <K> Bytes toStoreKeyBinary(final Windowed<K> timeKey,
final int seqnum,
+ final Headers headers,
final StateSerdes<K, ?> serdes) {
- final byte[] serializedKey = serdes.rawKey(timeKey.key());
+ final byte[] serializedKey = serdes.rawKey(timeKey.key(), headers);
return toStoreKeyBinary(serializedKey, timeKey.window().start(),
seqnum);
}
@@ -205,10 +205,11 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
}
static <K> K extractStoreKey(final byte[] binaryKey,
+ final Headers headers,
final StateSerdes<K, ?> serdes) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE -
SEQNUM_SIZE];
System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
- return serdes.keyFrom(bytes);
+ return serdes.keyFrom(bytes, headers);
}
static long extractStoreTimestamp(final byte[] binaryKey) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index 861ed349ed2..34405f2705c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -1694,9 +1694,9 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
final StateSerdes<String, Long> stateSerdes =
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
if (getBaseSchema() instanceof TimeFirstWindowKeySchema) {
if (changeLog) {
- return WindowKeySchema.toStoreKeyBinary(key, seq, stateSerdes);
+ return WindowKeySchema.toStoreKeyBinary(key, seq, new
RecordHeaders(), stateSerdes);
}
- return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq,
stateSerdes);
+ return TimeFirstWindowKeySchema.toStoreKeyBinary(key, seq, new
RecordHeaders(), stateSerdes);
} else if (getBaseSchema() instanceof TimeFirstSessionKeySchema) {
if (changeLog) {
return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
@@ -1710,7 +1710,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest {
private Bytes serializeKeyForIndex(final Windowed<String> key) {
final StateSerdes<String, Long> stateSerdes =
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
if (getIndexSchema() instanceof KeyFirstWindowKeySchema) {
- return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0,
stateSerdes);
+ return KeyFirstWindowKeySchema.toStoreKeyBinary(key, 0, new
RecordHeaders(), stateSerdes);
} else if (getIndexSchema() instanceof KeyFirstSessionKeySchema) {
return Bytes.wrap(KeyFirstSessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
} else {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 201db4bebe3..a9d794c81c5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -928,7 +928,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
if (schema instanceof SessionKeySchema) {
return Bytes.wrap(SessionKeySchema.toBinary(key,
stateSerdes.keySerializer(), new RecordHeaders(), "dummy"));
} else if (schema instanceof WindowKeySchema) {
- return WindowKeySchema.toStoreKeyBinary(key, 0, stateSerdes);
+ return WindowKeySchema.toStoreKeyBinary(key, 0, new
RecordHeaders(), stateSerdes);
} else {
throw new IllegalStateException("Unrecognized serde schema");
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index 32c03005e1c..56cf10aea7b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -1163,7 +1163,7 @@ public abstract class AbstractWindowBytesStoreTest {
<K> K extractStoreKey(final byte[] binaryKey,
final StateSerdes<K, ?> serdes) {
- return WindowKeySchema.extractStoreKey(binaryKey, serdes);
+ return WindowKeySchema.extractStoreKey(binaryKey, new RecordHeaders(),
serdes);
}
private Map<Integer, Set<String>> entriesByKey(final List<KeyValue<byte[],
byte[]>> changeLog,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index cdc23218e73..b9461f75d05 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -143,10 +143,10 @@ public class InMemoryWindowStoreTest extends
AbstractWindowBytesStoreTest {
final List<KeyValue<byte[], byte[]>> restorableEntries = new
LinkedList<>();
restorableEntries
- .add(new KeyValue<>(toStoreKeyBinary(1, 0L, 0, serdes).get(),
serdes.rawValue("one")));
- restorableEntries.add(new KeyValue<>(toStoreKeyBinary(2, WINDOW_SIZE,
0, serdes).get(),
+ .add(new KeyValue<>(toStoreKeyBinary(1, 0L, 0, new
RecordHeaders(), serdes).get(), serdes.rawValue("one")));
+ restorableEntries.add(new KeyValue<>(toStoreKeyBinary(2, WINDOW_SIZE,
0, new RecordHeaders(), serdes).get(),
serdes.rawValue("two")));
- restorableEntries.add(new KeyValue<>(toStoreKeyBinary(3, 2 *
WINDOW_SIZE, 0, serdes).get(),
+ restorableEntries.add(new KeyValue<>(toStoreKeyBinary(3, 2 *
WINDOW_SIZE, 0, new RecordHeaders(), serdes).get(),
serdes.rawValue("three")));
context.restore(STORE_NAME, restorableEntries);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
index d8e9fb7725b..0675aaea48d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -52,13 +54,14 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
@FunctionalInterface
private interface StoreKeySerializer<K> {
- Bytes serialize(final K key, final long ts, final int seq, final
StateSerdes<K, ?> serdes);
+ Bytes serialize(final K key, final long ts, final int seq, final
Headers headers, final StateSerdes<K, ?> serdes);
}
private final List<KeyValue<Long, byte[]>> windowStoreKvPairs = new
ArrayList<>();
private final ThreadCache cache = new ThreadCache(new
LogContext("testCache "), 1000000L, new MockStreamsMetrics(new Metrics()));
private final String namespace = "0.0-one";
private final StateSerdes<String, String> stateSerdes = new
StateSerdes<>("foo", Serdes.String(), Serdes.String());
+ private final Headers headers = new RecordHeaders();
private Function<byte[], Long> tsExtractor;
private StoreKeySerializer<String> storeKeySerializer;
@@ -97,14 +100,14 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
windowStoreKvPairs.add(v1);
expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
- final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10,
0, stateSerdes);
+ final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10,
0, headers, stateSerdes);
final byte[] valBytes = String.valueOf(t + 10).getBytes();
expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
}
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator = new
DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.range(
@@ -135,14 +138,14 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
final KeyValue<Long, byte[]> v1 = KeyValue.pair(t, v1Bytes);
windowStoreKvPairs.add(v1);
expectedKvPairs.add(KeyValue.pair(t, v1Bytes));
- final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10,
0, stateSerdes);
+ final Bytes keyBytes = storeKeySerializer.serialize("a", t + 10,
0, headers, stateSerdes);
final byte[] valBytes = String.valueOf(t + 10).getBytes();
expectedKvPairs.add(KeyValue.pair(t + 10, valBytes));
cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(keyBytes), new LRUCacheEntry(valBytes));
}
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
Collections.reverse(windowStoreKvPairs);
final KeyValueIterator<Long, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
@@ -170,9 +173,9 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
public void shouldPeekNextStoreKey(final SchemaType schemaType) {
setUp(schemaType);
windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
- cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0,
stateSerdes)), new LRUCacheEntry("b".getBytes()));
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator = new
DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.range(
namespace, SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(fromBytes),
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(toBytes)
@@ -191,9 +194,9 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
public void shouldPeekNextStoreKeyReverse(final SchemaType schemaType) {
setUp(schemaType);
windowStoreKvPairs.add(KeyValue.pair(10L, "a".getBytes()));
- cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0,
stateSerdes)), new LRUCacheEntry("b".getBytes()));
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.reverseRange(
@@ -214,9 +217,9 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
public void shouldPeekNextCacheKey(final SchemaType schemaType) {
setUp(schemaType);
windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
- cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L,
0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L,
0, headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.range(
@@ -241,9 +244,9 @@ public class
MergedSortedCacheWrappedWindowStoreIteratorTest {
public void shouldPeekNextCacheKeyReverse(final SchemaType schemaType) {
setUp(schemaType);
windowStoreKvPairs.add(KeyValue.pair(0L, "a".getBytes()));
- cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L,
0, stateSerdes)), new LRUCacheEntry("b".getBytes()));
- final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
stateSerdes);
- final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
stateSerdes);
+ cache.put(namespace,
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize("a", 10L,
0, headers, stateSerdes)), new LRUCacheEntry("b".getBytes()));
+ final Bytes fromBytes = storeKeySerializer.serialize("a", 0, 0,
headers, stateSerdes);
+ final Bytes toBytes = storeKeySerializer.serialize("a", 100, 0,
headers, stateSerdes);
final KeyValueIterator<Long, byte[]> storeIterator =
new DelegatingPeekingKeyValueIterator<>("store", new
KeyValueIteratorStub<>(windowStoreKvPairs.iterator()));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
cache.reverseRange(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
index f758f3054e7..ab9fd57fd87 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
@@ -45,7 +47,7 @@ public class
MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
@FunctionalInterface
private interface StoreKeySerializer<K> {
- Bytes serialize(final Windowed<K> key, final int seq, final
StateSerdes<K, ?> serdes);
+ Bytes serialize(final Windowed<K> key, final int seq, final Headers
headers, final StateSerdes<K, ?> serdes);
}
private static final SegmentedCacheFunction SINGLE_SEGMENT_CACHE_FUNCTION
= new SegmentedCacheFunction(null, -1) {
@@ -100,7 +102,7 @@ public class
MergedSortedCacheWrappedWindowStoreKeyValueIteratorTest {
cacheKvs = Collections.singleton(
KeyValue.pair(
SINGLE_SEGMENT_CACHE_FUNCTION.cacheKey(storeKeySerializer.serialize(
- new Windowed<>(cacheKey, cacheWindow), 0, new
StateSerdes<>("dummy", Serdes.String(), Serdes.ByteArray()))
+ new Windowed<>(cacheKey, cacheWindow), 0, new
RecordHeaders(), new StateSerdes<>("dummy", Serdes.String(),
Serdes.ByteArray()))
),
new LRUCacheEntry(cacheKey.getBytes())
)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 597fe5a8288..54b6b0f710d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
@@ -89,13 +90,18 @@ public class WindowKeySchemaTest {
R apply(A a, B b, C c);
}
+ @FunctionalInterface
+ interface QuadFunction<A, B, C, D, R> {
+ R apply(A a, B b, C c, D d);
+ }
+
private static final Map<SchemaType, TriFunction<byte[], Long, Integer,
Bytes>> BYTES_TO_STORE_BINARY_MAP = mkMap(
mkEntry(SchemaType.WindowKeySchema, WindowKeySchema::toStoreKeyBinary),
mkEntry(SchemaType.PrefixedKeyFirstSchema,
KeyFirstWindowKeySchema::toStoreKeyBinary),
mkEntry(SchemaType.PrefixedTimeFirstSchema,
TimeFirstWindowKeySchema::toStoreKeyBinary)
);
- private static final Map<SchemaType, TriFunction<Windowed<String>,
Integer, StateSerdes<String, byte[]>, Bytes>> SERDE_TO_STORE_BINARY_MAP = mkMap(
+ private static final Map<SchemaType, QuadFunction<Windowed<String>,
Integer, Headers, StateSerdes<String, byte[]>, Bytes>>
SERDE_TO_STORE_BINARY_MAP = mkMap(
mkEntry(SchemaType.WindowKeySchema, WindowKeySchema::toStoreKeyBinary),
mkEntry(SchemaType.PrefixedKeyFirstSchema,
KeyFirstWindowKeySchema::toStoreKeyBinary),
mkEntry(SchemaType.PrefixedTimeFirstSchema,
TimeFirstWindowKeySchema::toStoreKeyBinary)
@@ -129,6 +135,7 @@ public class WindowKeySchemaTest {
private KeySchema keySchema;
private final Serde<Windowed<String>> keySerde = new
WindowedSerdes.TimeWindowedSerde<>(serde, endTime - startTime);
private final StateSerdes<String, byte[]> stateSerdes = new
StateSerdes<>("dummy", serde, Serdes.ByteArray());
+ private final Headers headers = new RecordHeaders();
public SchemaType schemaType;
private enum SchemaType {
@@ -170,7 +177,7 @@ public class WindowKeySchemaTest {
return EXTRACT_SEQ_MAP.get(schemaType);
}
- private TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> getSerdeToStoreKey() {
+ private QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> getSerdeToStoreKey() {
return SERDE_TO_STORE_BINARY_MAP.get(schemaType);
}
@@ -476,18 +483,18 @@ public class WindowKeySchemaTest {
@ParameterizedTest
public void shouldConvertToBinaryAndBack(final SchemaType type) {
setup(type);
- final TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
- final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
stateSerdes);
+ final QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+ final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
headers, stateSerdes);
final Windowed<String> result;
if (schemaType == SchemaType.WindowKeySchema) {
result = WindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), headers,
stateSerdes.topic());
} else if (schemaType == SchemaType.PrefixedTimeFirstSchema) {
result = TimeFirstWindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), headers,
stateSerdes.topic());
} else {
result = KeyFirstWindowKeySchema.fromStoreKey(serialized.get(),
- endTime - startTime, stateSerdes.keyDeserializer(), new
RecordHeaders(), stateSerdes.topic());
+ endTime - startTime, stateSerdes.keyDeserializer(), headers,
stateSerdes.topic());
}
assertEquals(windowedKey, result);
}
@@ -496,8 +503,8 @@ public class WindowKeySchemaTest {
@ParameterizedTest
public void shouldExtractSequenceFromBinary(final SchemaType type) {
setup(type);
- final TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
- final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
stateSerdes);
+ final QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+ final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
headers, stateSerdes);
final Function<byte[], Integer> extractStoreSequence =
getExtractSeqFunc();
assertEquals(0, (int) extractStoreSequence.apply(serialized.get()));
}
@@ -506,8 +513,8 @@ public class WindowKeySchemaTest {
@ParameterizedTest
public void shouldExtractStartTimeFromBinary(final SchemaType type) {
setup(type);
- final TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
- final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
stateSerdes);
+ final QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+ final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
headers, stateSerdes);
final Function<byte[], Long> extractStoreTimestamp =
getExtractTimestampFunc();
assertEquals(startTime, (long)
extractStoreTimestamp.apply(serialized.get()));
}
@@ -516,8 +523,8 @@ public class WindowKeySchemaTest {
@ParameterizedTest
public void shouldExtractWindowFromBinary(final SchemaType type) {
setup(type);
- final TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
- final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
stateSerdes);
+ final QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+ final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
headers, stateSerdes);
final BiFunction<byte[], Long, Window> extractStoreWindow =
getExtractStoreWindow();
assertEquals(window, extractStoreWindow.apply(serialized.get(),
endTime - startTime));
}
@@ -526,8 +533,8 @@ public class WindowKeySchemaTest {
@ParameterizedTest
public void shouldExtractKeyBytesFromBinary(final SchemaType type) {
setup(type);
- final TriFunction<Windowed<String>, Integer, StateSerdes<String,
byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
- final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
stateSerdes);
+ final QuadFunction<Windowed<String>, Integer, Headers,
StateSerdes<String, byte[]>, Bytes> toStoreKeyBinary = getSerdeToStoreKey();
+ final Bytes serialized = toStoreKeyBinary.apply(windowedKey, 0,
headers, stateSerdes);
final Function<byte[], byte[]> extractStoreKeyBytes =
getExtractStorageKey();
assertArrayEquals(key.getBytes(),
extractStoreKeyBytes.apply(serialized.get()));
}