This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 914051f4c90 KAFKA-20194: Fix StoreBuilderWrapper for header-stores
(#21950)
914051f4c90 is described below
commit 914051f4c9060a83c2c9f11935608adf9f6f0029
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 7 16:48:45 2026 -0700
KAFKA-20194: Fix StoreBuilderWrapper for header-stores (#21950)
StoreBuilderWrapper must recognize header-stores correctly. Otherwise,
we create changelog topics with incorrect topic configuration.
Also adding a fix to public API. The newly added methods to create a
ts-ks-headers-store and session-headesr-store builder (on `Stores`) are
incorrect (cf. KIP-1271).
Also updated the corresponding internal class name for session-headers
store.
Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedi
<[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 16 +-
...Test.java => SessionsStoreWithHeadersTest.java} | 168 +++++++++----------
.../TimestampedKeyValueStoreWithHeadersTest.java | 10 +-
.../TimestampedWindowStoreWithHeadersTest.java | 10 +-
.../AbstractConfigurableStoreFactory.java | 8 +-
.../internals/KeyValueStoreMaterializer.java | 2 +-
.../internals/SessionStoreMaterializer.java | 2 +-
.../internals/SubscriptionStoreFactory.java | 2 +-
.../processor/internals/StoreBuilderWrapper.java | 18 +-
.../org/apache/kafka/streams/state/Stores.java | 185 ++++++++++++---------
...rs.java => SessionStoreWithHeadersBuilder.java} | 4 +-
...KStreamSessionWindowAggregateProcessorTest.java | 2 +-
.../ForeignTableJoinProcessorSupplierTests.java | 2 +-
.../SubscriptionReceiveProcessorSupplierTest.java | 2 +-
.../state/QueryableStoreTypesWithHeadersTest.java | 8 +-
.../internals/GlobalStateStoreProviderTest.java | 2 +-
...onStoreWithHeadersSerializerSideEffectTest.java | 2 +-
.../StreamThreadStateStoreProviderTest.java | 2 +-
...ueStoreWithHeadersSerializerSideEffectTest.java | 2 +-
.../kafka/streams/state/internals/UtilsTest.java | 13 +-
20 files changed, 242 insertions(+), 218 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 25fb33dd182..5ba3f95f47d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -169,7 +169,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.String()))
@@ -222,7 +222,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.String()))
@@ -288,7 +288,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.String()))
@@ -341,7 +341,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
streamsBuilderForNewStore.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.String()))
@@ -1563,7 +1563,7 @@ public class HeadersStoreUpgradeIntegrationTest {
private void setupAndPopulateKeyValueStoreWithHeaders(final Properties
props) throws Exception {
final StreamsBuilder headersBuilder = new StreamsBuilder();
headersBuilder.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.String(),
Serdes.String()))
@@ -1622,7 +1622,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder newBuilder = new StreamsBuilder();
final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
newBuilder.addStateStore(
- Stores.sessionStoreBuilderWithHeaders(
+ Stores.sessionStoreWithHeadersBuilder(
isPersistent ?
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.String(),
@@ -1680,7 +1680,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder newBuilder = new StreamsBuilder();
final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
newBuilder.addStateStore(
- Stores.sessionStoreBuilderWithHeaders(
+ Stores.sessionStoreWithHeadersBuilder(
Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
Serdes.String(),
Serdes.String()))
@@ -1941,7 +1941,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final StreamsBuilder headersBuilder = new StreamsBuilder();
final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
headersBuilder.addStateStore(
- Stores.sessionStoreBuilderWithHeaders(
+ Stores.sessionStoreWithHeadersBuilder(
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.String(),
Serdes.String()))
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
similarity index 70%
copy from
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
copy to
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
index 1ae592e8901..bf18dce66e5 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
@@ -32,15 +32,15 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -65,10 +65,10 @@ import static
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("integration")
-public class TimestampedWindowStoreWithHeadersTest {
+public class SessionsStoreWithHeadersTest {
- private static final String STORE_NAME = "headers-window-store";
- private static final long WINDOW_SIZE_MS = 100L;
+ private static final String STORE_NAME = "headers-session-store";
+ private static final long WINDOW_GAP_MS = 100L;
private static final long RETENTION_MS = 1000L;
private String inputStream;
@@ -102,7 +102,7 @@ public class TimestampedWindowStoreWithHeadersTest {
}
@BeforeEach
- public void beforeTest(final TestInfo testInfo) throws
InterruptedException {
+ public void beforeTest(final TestInfo testInfo) {
this.testInfo = testInfo;
final String uniqueTestName = safeUniqueTestName(testInfo);
inputStream = "input-stream-" + uniqueTestName;
@@ -126,14 +126,14 @@ public class TimestampedWindowStoreWithHeadersTest {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.addStateStore(
- Stores.timestampedWindowStoreWithHeadersBuilder(
-
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.persistentSessionStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .process(() -> new
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
final Properties props = props();
@@ -152,7 +152,7 @@ public class TimestampedWindowStoreWithHeadersTest {
KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
// Window 2: [baseTimestamp + WINDOW_SIZE_MS, baseTimestamp + 2 *
WINDOW_SIZE_MS)
- numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_SIZE_MS,
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_GAP_MS,
EMPTY_HEADERS,
KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, null));
@@ -172,14 +172,14 @@ public class TimestampedWindowStoreWithHeadersTest {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.addStateStore(
- Stores.timestampedWindowStoreWithHeadersBuilder(
-
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.persistentSessionStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+ .process(() -> new
SessionStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
final Properties props = props();
@@ -199,7 +199,7 @@ public class TimestampedWindowStoreWithHeadersTest {
// verify changelog topic properties
final String changelogTopic =
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME +
"-changelog";
final Properties changelogTopicConfig =
CLUSTER.getLogConfig(changelogTopic);
- assertEquals("compact",
changelogTopicConfig.getProperty("cleanup.policy"));
+ assertEquals("compact,delete",
changelogTopicConfig.getProperty("cleanup.policy"));
}
@Test
@@ -207,14 +207,14 @@ public class TimestampedWindowStoreWithHeadersTest {
StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.addStateStore(
- Stores.timestampedWindowStoreWithHeadersBuilder(
-
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.persistentSessionStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .process(() -> new
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
final Properties props = props();
@@ -229,7 +229,7 @@ public class TimestampedWindowStoreWithHeadersTest {
initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 50, HEADERS2,
KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
- initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_SIZE_MS, EMPTY_HEADERS,
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + WINDOW_GAP_MS, EMPTY_HEADERS,
KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, "c100"));
IntegrationTestUtils.waitUntilMinRecordsReceived(
@@ -248,14 +248,14 @@ public class TimestampedWindowStoreWithHeadersTest {
streamsBuilder = new StreamsBuilder();
streamsBuilder.addStateStore(
- Stores.timestampedWindowStoreWithHeadersBuilder(
-
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.persistentSessionStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .process(() -> new
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
@@ -263,7 +263,7 @@ public class TimestampedWindowStoreWithHeadersTest {
// produce additional records to verify restored store works correctly
final Headers finalHeaders = new RecordHeaders().add("final",
"true".getBytes());
- final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS,
finalHeaders,
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS,
finalHeaders,
KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"),
KeyValue.pair(3, "c200"));
final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -282,14 +282,14 @@ public class TimestampedWindowStoreWithHeadersTest {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.addStateStore(
- Stores.timestampedWindowStoreBuilder(
- Stores.persistentTimestampedWindowStore(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(TimestampedWindowStoreContentCheckerProcessor::new,
STORE_NAME)
+ .process(SessionStoreContentCheckerProcessor::new, STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
@@ -307,7 +307,7 @@ public class TimestampedWindowStoreWithHeadersTest {
KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + 50,
KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3,
"c50"));
- initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + WINDOW_SIZE_MS,
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + WINDOW_GAP_MS,
KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"),
KeyValue.pair(3, null));
List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -330,14 +330,14 @@ public class TimestampedWindowStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedWindowStoreWithHeadersBuilder(
-
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.persistentSessionStoreWithHeaders(STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.Integer(),
Serdes.String()
)
)
.stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .process(() -> new
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .process(() -> new
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
.to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
@@ -345,7 +345,7 @@ public class TimestampedWindowStoreWithHeadersTest {
// produce additional records with headers to verify upgraded store
works
final Headers upgradedHeaders = new RecordHeaders().add("upgraded",
"true".getBytes());
- final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS,
upgradedHeaders,
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS,
upgradedHeaders,
KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"),
KeyValue.pair(3, "c200"));
receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -375,9 +375,9 @@ public class TimestampedWindowStoreWithHeadersTest {
*/
@SuppressWarnings("varargs")
@SafeVarargs
- private final int produceDataToTopic(final String topic,
- final long timestamp,
- final KeyValue<Integer, String>...
keyValues) {
+ private int produceDataToTopic(final String topic,
+ final long timestamp,
+ final KeyValue<Integer, String>...
keyValues) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
topic,
Arrays.asList(keyValues),
@@ -412,21 +412,21 @@ public class TimestampedWindowStoreWithHeadersTest {
}
/**
- * Processor for validating expected contents of a timestamped window
store with headers, and forwards
+ * Processor for validating expected contents of a session store with
headers, and forwards
* the number of failed checks downstream for consumption.
*/
- private static class
TimestampedWindowStoreWithHeadersContentCheckerProcessor implements
Processor<Integer, String, Integer, Integer> {
+ private static class SessionStoreWithHeadersContentCheckerProcessor
implements Processor<Integer, String, Integer, Integer> {
private ProcessorContext<Integer, Integer> context;
- private TimestampedWindowStoreWithHeaders<Integer, String> store;
+ private SessionStoreWithHeaders<Integer, String> store;
// whether the processor should write records to the store as they
arrive.
private final boolean writeToStore;
// in-memory copy of seen data, to validate for testing purposes.
- // Maps key -> windowStartTime -> ValueTimestampHeaders
- private final Map<Integer, Map<Long,
Optional<ValueTimestampHeaders<String>>>> data;
+ // Maps windowed-key -> AggregationWithHeaders
+ private final Map<Windowed<Integer>,
Optional<AggregationWithHeaders<String>>> data;
- TimestampedWindowStoreWithHeadersContentCheckerProcessor(final boolean
writeToStore) {
+ SessionStoreWithHeadersContentCheckerProcessor(final boolean
writeToStore) {
this.writeToStore = writeToStore;
this.data = new HashMap<>();
}
@@ -439,15 +439,12 @@ public class TimestampedWindowStoreWithHeadersTest {
@Override
public void process(final Record<Integer, String> record) {
- final long windowStartTime = record.timestamp() -
(record.timestamp() % WINDOW_SIZE_MS);
-
if (writeToStore) {
- final ValueTimestampHeaders<String> valueTimestampHeaders =
- ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers());
- store.put(record.key(), valueTimestampHeaders,
windowStartTime);
+ final AggregationWithHeaders<String> valueHeaders =
+ AggregationWithHeaders.make(record.value(),
record.headers());
+ store.put(new Windowed<>(record.key(), new
SessionWindow(record.timestamp(), record.timestamp())), valueHeaders);
- data.computeIfAbsent(record.key(), k -> new HashMap<>());
- data.get(record.key()).put(windowStartTime,
Optional.ofNullable(valueTimestampHeaders));
+ data.put(new Windowed<>(record.key(), new
SessionWindow(record.timestamp(), record.timestamp())),
Optional.ofNullable(valueHeaders));
}
//
@@ -461,43 +458,36 @@ public class TimestampedWindowStoreWithHeadersTest {
*/
private int checkStoreContents() {
int failedChecks = 0;
- for (final Map.Entry<Integer, Map<Long,
Optional<ValueTimestampHeaders<String>>>> keyEntry : data.entrySet()) {
- final Integer key = keyEntry.getKey();
-
- for (final Map.Entry<Long,
Optional<ValueTimestampHeaders<String>>> windowEntry :
keyEntry.getValue().entrySet()) {
- final Long windowStartTime = windowEntry.getKey();
- final ValueTimestampHeaders<String>
expectedValueTimestampHeaders =
- windowEntry.getValue().orElse(null);
-
- // validate fetch from store
- try (final
WindowStoreIterator<ValueTimestampHeaders<String>> iterator =
- store.fetch(key, windowStartTime,
windowStartTime)) {
- final ValueTimestampHeaders<String>
actualValueTimestampHeaders =
- iterator.hasNext() ? iterator.next().value : null;
- if (!Objects.equals(actualValueTimestampHeaders,
expectedValueTimestampHeaders)) {
- failedChecks++;
- }
- }
+ for (final Map.Entry<Windowed<Integer>,
Optional<AggregationWithHeaders<String>>> keyEntry : data.entrySet()) {
+ final Windowed<Integer> windowedKey = keyEntry.getKey();
+ final long sessionTime = windowedKey.window().start();
+
+ final AggregationWithHeaders<String> expectedValueTHeaders =
keyEntry.getValue().orElse(null);
+ final AggregationWithHeaders<String> actualValueHeaders =
store.fetchSession(windowedKey.key(), sessionTime, sessionTime);
+
+ if (!Objects.equals(actualValueHeaders,
expectedValueTHeaders)) {
+ failedChecks++;
}
}
+
return failedChecks;
}
}
/**
- * Processor for validating expected contents of a timestamped window
store (without headers).
- * Used for testing the upgrade path from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * Processor for validating expected contents of a session store (without
headers).
+ * Used for testing the upgrade path from SessionStore to
SessionStoreWithHeaders.
*/
- private static class TimestampedWindowStoreContentCheckerProcessor
implements Processor<Integer, String, Integer, Integer> {
+ private static class SessionStoreContentCheckerProcessor implements
Processor<Integer, String, Integer, Integer> {
private ProcessorContext<Integer, Integer> context;
- private TimestampedWindowStore<Integer, String> store;
+ private SessionStore<Integer, String> store;
// in-memory copy of seen data, to validate for testing purposes.
- // Maps key -> windowStartTime -> ValueAndTimestamp
- private final Map<Integer, Map<Long,
Optional<ValueAndTimestamp<String>>>> data;
+ // Maps windowed-key -> value
+ private final Map<Windowed<Integer>, Optional<String>> data;
- TimestampedWindowStoreContentCheckerProcessor() {
+ SessionStoreContentCheckerProcessor() {
this.data = new HashMap<>();
}
@@ -509,13 +499,10 @@ public class TimestampedWindowStoreWithHeadersTest {
@Override
public void process(final Record<Integer, String> record) {
- final long windowStartTime = record.timestamp() -
(record.timestamp() % WINDOW_SIZE_MS);
- final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(record.value(), record.timestamp());
- store.put(record.key(), valueAndTimestamp, windowStartTime);
+ store.put(new Windowed<>(record.key(), new
SessionWindow(record.timestamp(), record.timestamp())), record.value());
- data.computeIfAbsent(record.key(), k -> new HashMap<>());
- data.get(record.key()).put(windowStartTime,
Optional.ofNullable(valueAndTimestamp));
+ data.put(new Windowed<>(record.key(), new
SessionWindow(record.timestamp(), record.timestamp())),
Optional.ofNullable(record.value()));
final int failedChecks = checkStoreContents();
context.forward(record.withValue(failedChecks));
@@ -527,22 +514,15 @@ public class TimestampedWindowStoreWithHeadersTest {
*/
private int checkStoreContents() {
int failedChecks = 0;
- for (final Map.Entry<Integer, Map<Long,
Optional<ValueAndTimestamp<String>>>> keyEntry : data.entrySet()) {
- final Integer key = keyEntry.getKey();
-
- for (final Map.Entry<Long,
Optional<ValueAndTimestamp<String>>> windowEntry :
keyEntry.getValue().entrySet()) {
- final Long windowStartTime = windowEntry.getKey();
- final ValueAndTimestamp<String> expectedValueAndTimestamp
= windowEntry.getValue().orElse(null);
-
- // validate fetch from store
- try (final WindowStoreIterator<ValueAndTimestamp<String>>
iterator =
- store.fetch(key, windowStartTime,
windowStartTime)) {
- final ValueAndTimestamp<String>
actualValueAndTimestamp =
- iterator.hasNext() ? iterator.next().value : null;
- if (!Objects.equals(actualValueAndTimestamp,
expectedValueAndTimestamp)) {
- failedChecks++;
- }
- }
+ for (final Map.Entry<Windowed<Integer>, Optional<String>> keyEntry
: data.entrySet()) {
+ final Windowed<Integer> windowedKey = keyEntry.getKey();
+ final long sessionTime = windowedKey.window().start();
+
+ final String expectedValue = keyEntry.getValue().orElse(null);
+ final String actualValue =
store.fetchSession(windowedKey.key(), sessionTime, sessionTime);
+
+ if (!Objects.equals(actualValue, expectedValue)) {
+ failedChecks++;
}
}
return failedChecks;
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
index b7cbdb2d82d..bb110d86cdd 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -125,7 +125,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.Integer(),
Serdes.String()
@@ -173,7 +173,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.Integer(),
Serdes.String()
@@ -210,7 +210,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.Integer(),
Serdes.String()
@@ -264,7 +264,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.Integer(),
Serdes.String()
@@ -356,7 +356,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
streamsBuilder
.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
Serdes.Integer(),
Serdes.String()
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
index 1ae592e8901..012bc773df6 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
@@ -102,7 +102,7 @@ public class TimestampedWindowStoreWithHeadersTest {
}
@BeforeEach
- public void beforeTest(final TestInfo testInfo) throws
InterruptedException {
+ public void beforeTest(final TestInfo testInfo) {
this.testInfo = testInfo;
final String uniqueTestName = safeUniqueTestName(testInfo);
inputStream = "input-stream-" + uniqueTestName;
@@ -199,7 +199,7 @@ public class TimestampedWindowStoreWithHeadersTest {
// verify changelog topic properties
final String changelogTopic =
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME +
"-changelog";
final Properties changelogTopicConfig =
CLUSTER.getLogConfig(changelogTopic);
- assertEquals("compact",
changelogTopicConfig.getProperty("cleanup.policy"));
+ assertEquals("compact,delete",
changelogTopicConfig.getProperty("cleanup.policy"));
}
@Test
@@ -375,9 +375,9 @@ public class TimestampedWindowStoreWithHeadersTest {
*/
@SuppressWarnings("varargs")
@SafeVarargs
- private final int produceDataToTopic(final String topic,
- final long timestamp,
- final KeyValue<Integer, String>...
keyValues) {
+ private int produceDataToTopic(final String topic,
+ final long timestamp,
+ final KeyValue<Integer, String>...
keyValues) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
topic,
Arrays.asList(keyValues),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 99c8c5a2bc6..6ca5142994d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -27,13 +27,13 @@ import java.util.Set;
public abstract class AbstractConfigurableStoreFactory implements StoreFactory
{
private final Set<String> connectedProcessorNames = new HashSet<>();
private DslStoreSuppliers dslStoreSuppliers;
- private final DslStoreFormat defaultStoreDefaultFormat;
+ private final DslStoreFormat defaultDslStoreFormat;
private DslStoreFormat dslStoreFormatOverwrite;
public AbstractConfigurableStoreFactory(final DslStoreSuppliers
initialStoreSuppliers,
- final DslStoreFormat
defaultStoreDefaultFormat) {
+ final DslStoreFormat
defaultDslStoreFormat) {
this.dslStoreSuppliers = initialStoreSuppliers;
- this.defaultStoreDefaultFormat = defaultStoreDefaultFormat;
+ this.defaultDslStoreFormat = defaultDslStoreFormat;
}
@Override
@@ -57,7 +57,7 @@ public abstract class AbstractConfigurableStoreFactory
implements StoreFactory {
}
public DslStoreFormat dslStoreFormat() {
- return dslStoreFormatOverwrite == null ? defaultStoreDefaultFormat :
dslStoreFormatOverwrite;
+ return dslStoreFormatOverwrite == null ? defaultDslStoreFormat :
dslStoreFormatOverwrite;
}
protected DslStoreSuppliers dslStoreSuppliers() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 30b3b885c10..3e23c409df0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -57,7 +57,7 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
materialized.keySerde(),
materialized.valueSerde());
} else if (supplier instanceof HeadersBytesStoreSupplier) {
- builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ builder = Stores.timestampedKeyValueStoreWithHeadersBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index d1157086382..31fa0a58c9f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -70,7 +70,7 @@ public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
final StoreBuilder<?> builder;
if (supplier instanceof HeadersBytesStoreSupplier) {
- builder = Stores.sessionStoreBuilderWithHeaders(
+ builder = Stores.sessionStoreWithHeadersBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde()
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index 2aed2f74d34..94b59fd0c11 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -47,7 +47,7 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
@Override
public StoreBuilder<?> builder() {
StoreBuilder<?> builder;
- builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ builder = Stores.timestampedKeyValueStoreWithHeadersBuilder(
dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
dslStoreFormat())),
new Serdes.BytesSerde(),
subscriptionWrapperSerde
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index be915a8dfb6..9581f312702 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import
org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder;
import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -69,11 +71,14 @@ public class StoreBuilderWrapper implements StoreFactory {
return ((WindowStoreBuilder<?, ?>) builder).retentionPeriod();
} else if (builder instanceof TimestampedWindowStoreBuilder) {
return ((TimestampedWindowStoreBuilder<?, ?>)
builder).retentionPeriod();
+ } else if (builder instanceof
TimestampedWindowStoreWithHeadersBuilder) {
+ return ((TimestampedWindowStoreWithHeadersBuilder<?, ?>)
builder).retentionPeriod();
} else if (builder instanceof SessionStoreBuilder) {
return ((SessionStoreBuilder<?, ?>) builder).retentionPeriod();
+ } else if (builder instanceof SessionStoreWithHeadersBuilder) {
+ return ((SessionStoreWithHeadersBuilder<?, ?>)
builder).retentionPeriod();
} else {
- throw new IllegalStateException(
- "retentionPeriod is not supported when not a window
store");
+ throw new IllegalStateException("retentionPeriod is not supported
when not a window store");
}
}
@@ -82,8 +87,7 @@ public class StoreBuilderWrapper implements StoreFactory {
if (builder instanceof VersionedKeyValueStoreBuilder) {
return ((VersionedKeyValueStoreBuilder<?, ?>)
builder).historyRetention();
} else {
- throw new IllegalStateException(
- "historyRetention is not supported when not a versioned
store");
+ throw new IllegalStateException("historyRetention is not supported
when not a versioned store");
}
}
@@ -105,8 +109,10 @@ public class StoreBuilderWrapper implements StoreFactory {
@Override
public boolean isWindowStore() {
return builder instanceof WindowStoreBuilder
- || builder instanceof TimestampedWindowStoreBuilder
- || builder instanceof SessionStoreBuilder;
+ || builder instanceof TimestampedWindowStoreBuilder
+ || builder instanceof TimestampedWindowStoreWithHeadersBuilder
+ || builder instanceof SessionStoreBuilder
+ || builder instanceof SessionStoreWithHeadersBuilder;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 14c37ed93b4..d567d525dc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -31,7 +31,7 @@ import
org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreS
import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
-import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders;
+import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@@ -125,7 +125,7 @@ public final class Stores {
* Create a persistent {@link KeyValueBytesStoreSupplier} that stores
headers along with timestamps.
* <p>
* This store supplier can be passed into a
- * {@link
#timestampedKeyValueStoreBuilderWithHeaders(KeyValueBytesStoreSupplier, Serde,
Serde)}
+ * {@link
#timestampedKeyValueStoreWithHeadersBuilder(KeyValueBytesStoreSupplier, Serde,
Serde)}
* to build a {@link TimestampedKeyValueStoreWithHeaders}.
* <p>
* The store will persist key-value pairs along with record timestamps and
headers,
@@ -168,8 +168,10 @@ public final class Stores {
* @return an instance of {@link VersionedBytesStoreSupplier}
* @throws IllegalArgumentException if {@code historyRetention} can't be
represented as {@code long milliseconds}
*/
- public static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore(final String name,
-
final Duration historyRetention) {
+ public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(
+ final String name,
+ final Duration historyRetention
+ ) {
Objects.requireNonNull(name, "name cannot be null");
final String hrMsgPrefix =
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
final long historyRetentionMs =
validateMillisecondDuration(historyRetention, hrMsgPrefix);
@@ -206,9 +208,11 @@ public final class Stores {
* @return an instance of {@link VersionedBytesStoreSupplier}
* @throws IllegalArgumentException if {@code historyRetention} or {@code
segmentInterval} can't be represented as {@code long milliseconds}
*/
- public static VersionedBytesStoreSupplier
persistentVersionedKeyValueStore(final String name,
-
final Duration historyRetention,
-
final Duration segmentInterval) {
+ public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(
+ final String name,
+ final Duration historyRetention,
+ final Duration segmentInterval
+ ) {
Objects.requireNonNull(name, "name cannot be null");
final String hrMsgPrefix =
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
final long historyRetentionMs =
validateMillisecondDuration(historyRetention, hrMsgPrefix);
@@ -297,10 +301,12 @@ public final class Stores {
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code
windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller
than {@code windowSize}
*/
- public static WindowBytesStoreSupplier persistentWindowStore(final String
name,
- final
Duration retentionPeriod,
- final
Duration windowSize,
- final boolean
retainDuplicates) throws IllegalArgumentException {
+ public static WindowBytesStoreSupplier persistentWindowStore(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates
+ ) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE);
}
@@ -329,10 +335,12 @@ public final class Stores {
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code
windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller
than {@code windowSize}
*/
- public static WindowBytesStoreSupplier
persistentTimestampedWindowStore(final String name,
-
final Duration retentionPeriod,
-
final Duration windowSize,
-
final boolean retainDuplicates) throws IllegalArgumentException {
+ public static WindowBytesStoreSupplier persistentTimestampedWindowStore(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates
+ ) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE);
}
@@ -346,18 +354,22 @@ public final class Stores {
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller
than {@code windowSize}
*/
- public static WindowBytesStoreSupplier
persistentTimestampedWindowStoreWithHeaders(final String name,
-
final Duration retentionPeriod,
-
final Duration windowSize,
-
final boolean retainDuplicates) throws IllegalArgumentException {
+ public static WindowBytesStoreSupplier
persistentTimestampedWindowStoreWithHeaders(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates
+ ) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize,
retainDuplicates,
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
}
- private static WindowBytesStoreSupplier persistentWindowStore(final String
name,
- final
Duration retentionPeriod,
- final
Duration windowSize,
- final
boolean retainDuplicates,
- final
RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) {
+ private static WindowBytesStoreSupplier persistentWindowStore(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates,
+ final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType
+ ) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = validateMillisecondDuration(retentionPeriod,
rpMsgPrefix);
@@ -406,10 +418,12 @@ public final class Stores {
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code
windowSize} can't be represented as {@code long milliseconds}
* @throws IllegalArgumentException if {@code retentionPeriod} is smaller
than {@code windowSize}
*/
- public static WindowBytesStoreSupplier inMemoryWindowStore(final String
name,
- final Duration
retentionPeriod,
- final Duration
windowSize,
- final boolean
retainDuplicates) throws IllegalArgumentException {
+ public static WindowBytesStoreSupplier inMemoryWindowStore(
+ final String name,
+ final Duration retentionPeriod,
+ final Duration windowSize,
+ final boolean retainDuplicates
+ ) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String repartitionPeriodErrorMessagePrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -447,8 +461,10 @@ public final class Stores {
* contain the inactivity gap of the session and
the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
- public static SessionBytesStoreSupplier persistentSessionStore(final
String name,
- final
Duration retentionPeriod) {
+ public static SessionBytesStoreSupplier persistentSessionStore(
+ final String name,
+ final Duration retentionPeriod
+ ) {
return persistentSessionStore(name, retentionPeriod, false);
}
@@ -466,14 +482,18 @@ public final class Stores {
* contain the inactivity gap of the session and
the entire grace period.)
* @return an instance of a {@link SessionBytesStoreSupplier}
*/
- public static SessionBytesStoreSupplier
persistentSessionStoreWithHeaders(final String name,
-
final Duration retentionPeriod) {
+ public static SessionBytesStoreSupplier persistentSessionStoreWithHeaders(
+ final String name,
+ final Duration retentionPeriod
+ ) {
return persistentSessionStore(name, retentionPeriod, true);
}
- private static SessionBytesStoreSupplier persistentSessionStore(final
String name,
- final
Duration retentionPeriod,
- final
boolean withHeaders) {
+ private static SessionBytesStoreSupplier persistentSessionStore(
+ final String name,
+ final Duration retentionPeriod,
+ final boolean withHeaders
+ ) {
Objects.requireNonNull(name, "name cannot be null");
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionPeriodMs =
validateMillisecondDuration(retentionPeriod, msgPrefix);
@@ -521,9 +541,11 @@ public final class Stores {
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link
KeyValueStore}
*/
- public static <K, V> StoreBuilder<KeyValueStore<K, V>>
keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<KeyValueStore<K, V>>
keyValueStoreBuilder(
+ final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde,
Time.SYSTEM);
}
@@ -543,13 +565,35 @@ public final class Stores {
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link
KeyValueStore}
*/
- public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>>
timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>>
timestampedKeyValueStoreBuilder(
+ final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link
TimestampedKeyValueStoreWithHeaders}.
+ *
+ * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be
{@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is
{@code null} for put operations,
+ * it is treated as delete
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of {@link StoreBuilder} than can build a {@link
KeyValueStore}
+ */
+ public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K,
V>> timestampedKeyValueStoreWithHeadersBuilder(
+ final KeyValueBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier,
keySerde, valueSerde, Time.SYSTEM);
+ }
+
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link
VersionedKeyValueStore}.
*
@@ -561,9 +605,11 @@ public final class Stores {
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link
VersionedKeyValueStore}
*/
- public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>>
versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>>
versionedKeyValueStoreBuilder(
+ final VersionedBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new VersionedKeyValueStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
@@ -582,9 +628,11 @@ public final class Stores {
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link
WindowStore}
*/
- public static <K, V> StoreBuilder<WindowStore<K, V>>
windowStoreBuilder(final WindowBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
+ final WindowBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new WindowStoreBuilder<>(supplier, keySerde, valueSerde,
Time.SYSTEM);
}
@@ -604,9 +652,11 @@ public final class Stores {
* @param <V> value type
* @return an instance of {@link StoreBuilder} that can build a {@link
TimestampedWindowStore}
*/
- public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>>
timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>>
timestampedWindowStoreBuilder(
+ final WindowBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new TimestampedWindowStoreBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
@@ -624,7 +674,8 @@ public final class Stores {
public static <K, V> StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>>
timestampedWindowStoreWithHeadersBuilder(
final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new TimestampedWindowStoreWithHeadersBuilder<>(supplier,
keySerde, valueSerde, Time.SYSTEM);
}
@@ -640,9 +691,11 @@ public final class Stores {
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link
SessionStore}
*/
- public static <K, V> StoreBuilder<SessionStore<K, V>>
sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
+ public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(
+ final SessionBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde,
Time.SYSTEM);
}
@@ -658,29 +711,13 @@ public final class Stores {
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link
SessionStoreWithHeaders}
*/
- public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>>
sessionStoreBuilderWithHeaders(
+ public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>>
sessionStoreWithHeadersBuilder(
final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ final Serde<V> valueSerde
+ ) {
Objects.requireNonNull(supplier, "supplier cannot be null");
- return new SessionStoreBuilderWithHeaders<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
+ return new SessionStoreWithHeadersBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
- /**
- * Creates a {@link StoreBuilder} that can be used to build a {@link
TimestampedKeyValueStoreWithHeaders}.
- *
- * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be
{@code null})
- * @param keySerde the key serde to use
- * @param valueSerde the value serde to use; if the serialized bytes is
{@code null} for put operations,
- * it is treated as delete
- * @param <K> key type
- * @param <V> value type
- * @return an instance of {@link StoreBuilder} than can build a {@link
KeyValueStore}
- */
- public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K,
V>> timestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier
supplier,
-
final Serde<K> keySerde,
-
final Serde<V> valueSerde) {
- Objects.requireNonNull(supplier, "supplier cannot be null");
- return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier,
keySerde, valueSerde, Time.SYSTEM);
- }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
similarity index 98%
rename from
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
rename to
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
index d09886b3e08..c6224976fa9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
@@ -37,12 +37,12 @@ import java.util.Objects;
* {@link AggregationWithHeaders} as the value wrapper and wires up the
* header-aware store stack (change-logging, caching, metering).
*/
-public class SessionStoreBuilderWithHeaders<K, V>
+public class SessionStoreWithHeadersBuilder<K, V>
extends AbstractStoreBuilder<K, AggregationWithHeaders<V>,
SessionStoreWithHeaders<K, V>> {
private final SessionBytesStoreSupplier storeSupplier;
- public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier
storeSupplier,
+ public SessionStoreWithHeadersBuilder(final SessionBytesStoreSupplier
storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Time time) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 5c18ef5662c..a43b5689106 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -154,7 +154,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
final StoreBuilder<SessionStoreWithHeaders<String, Long>> storeBuilder
=
- Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(),
Serdes.Long())
+ Stores.sessionStoreWithHeadersBuilder(supplier, Serdes.String(),
Serdes.Long())
.withLoggingDisabled();
if (enableCaching && emitStrategy.type() !=
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index f06b59a544c..c3ba367a305 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -210,7 +210,7 @@ public class ForeignTableJoinProcessorSupplierTests {
private StoreBuilder<TimestampedKeyValueStoreWithHeaders<Bytes,
SubscriptionWrapper<String>>> storeBuilder() {
final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde =
new SubscriptionWrapperSerde<>(
PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
- return Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ return Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStore(
"Store"
),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index f3dd326adcd..7a7fb74cd38 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -519,7 +519,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
private StoreBuilder<TimestampedKeyValueStoreWithHeaders<Bytes,
SubscriptionWrapper<String>>> storeBuilder() {
final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde =
new SubscriptionWrapperSerde<>(
PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
- return Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ return Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.persistentTimestampedKeyValueStoreWithHeaders(
"Store"
),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
index e361daae09d..e59050d912f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
@@ -30,7 +30,7 @@ public class QueryableStoreTypesWithHeadersTest {
@Test
public void
shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValueStoreType()
{
final TimestampedKeyValueStoreWithHeaders<String, String> store =
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore("test-store"),
Serdes.String(),
Serdes.String())
@@ -45,7 +45,7 @@ public class QueryableStoreTypesWithHeadersTest {
@Test
public void
shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType() {
final TimestampedKeyValueStoreWithHeaders<String, String> store =
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore("test-store"),
Serdes.String(),
Serdes.String())
@@ -60,7 +60,7 @@ public class QueryableStoreTypesWithHeadersTest {
@Test
public void
shouldNotAcceptTimestampedKeyValueStoreWithHeadersForWindowStoreType() {
final TimestampedKeyValueStoreWithHeaders<String, String> store =
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore("test-store"),
Serdes.String(),
Serdes.String())
@@ -200,7 +200,7 @@ public class QueryableStoreTypesWithHeadersTest {
@Test
public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() {
final SessionStoreWithHeaders<String, String> store =
- Stores.sessionStoreBuilderWithHeaders(
+ Stores.sessionStoreWithHeadersBuilder(
Stores.inMemorySessionStore(
"test-session-store",
Duration.ofMillis(100)),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index cd8692137dd..f96f8e96d99 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -111,7 +111,7 @@ public class GlobalStateStoreProviderTest {
Serdes.String()).build());
stores.put(
"ts-kv-store-with-headers",
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore("ts-kv-store-with-headers"),
Serdes.String(),
Serdes.String()).build());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
index b5eda458c78..500fb9a7cfe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
@@ -135,7 +135,7 @@ public class
SessionStoreWithHeadersSerializerSideEffectTest {
// Create a session store with headers using our custom serializer
builder.addStateStore(
- Stores.sessionStoreBuilderWithHeaders(
+ Stores.sessionStoreWithHeadersBuilder(
Stores.inMemorySessionStore(
STORE_NAME,
Duration.ofMillis(10000L)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 180cd121283..0bd44815997 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -155,7 +155,7 @@ public class StreamThreadStateStoreProviderTest {
Serdes.String()),
"the-processor");
topology.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore("timestamped-kv-store-with-headers"),
Serdes.String(),
Serdes.String()),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
index 918e3e37d7a..2fe5671fea0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
@@ -132,7 +132,7 @@ public class
TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest {
// Create a timestamped key-value store with headers using our custom
serializer
builder.addStateStore(
- Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
Stores.inMemoryKeyValueStore(STORE_NAME),
new HeaderAddingSerde(), // Custom key serializer that adds
headers
Serdes.String()
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
index 51732bd8283..1d08e898996 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.internals.ByteUtils;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -55,7 +56,7 @@ public class UtilsTest {
// Header size's varint encoding cannot exceed 5 bytes (see @{link
ByteUtils#readVarint(ByteBuffer)})
private static final int MAX_VARINT_SIZE = 5;
private static final int OVERFLOW_HEADERS_SIZE = (1 + MAX_VARINT_SIZE) +
HEADERS.length + StateSerdes.TIMESTAMP_SIZE + VALUE.length;
- // 1 byte header size, 0 byte empty headers, and timetsamp
+ // 1 byte header size, 0 byte empty headers, and timestamp
private static final int MIN_SIZE = 1 + 0 + StateSerdes.TIMESTAMP_SIZE;
@Test
@@ -132,7 +133,7 @@ public class UtilsTest {
final Headers headers = new RecordHeaders().add("key1",
"value1".getBytes(StandardCharsets.UTF_8));
final ValueTimestampHeaders<String> input =
ValueTimestampHeaders.make(VALUE_STR, TIMESTAMP, headers);
try (
- final ValueTimestampHeadersSerializer<String> serializer = new
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+ final ValueTimestampHeadersSerializer<String> serializer = new
ValueTimestampHeadersSerializer<>(new StringSerializer());
final ValueAndTimestampSerde<String> stringSerde = new
ValueAndTimestampSerde<>(Serdes.String())
) {
final byte[] inputBytes = serializer.serialize(TOPIC, input);
@@ -157,9 +158,9 @@ public class UtilsTest {
@ParameterizedTest
@ValueSource(bytes = { 0x10, 0x11 })
- public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final int
invalidSize) {
+ public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final byte
invalidSize) {
final byte[] invalid = new byte[MIN_SIZE];
- invalid[0] = (byte) invalidSize; // header size
+ invalid[0] = invalidSize; // header size
assertFalse(hasEmptyHeaders(invalid));
}
@@ -186,12 +187,12 @@ public class UtilsTest {
}
private static byte[] timestampedValueWithEmptyHeaders(final byte[] value)
{
- // header size: 1 byte, emtpy headers: 0 byte, timestamp: 8 bytes,
plain value length
+ // header size: 1 byte, empty headers: 0 byte, timestamp: 8 bytes,
plain value length
final byte[] res = new byte[1 + 0 + StateSerdes.TIMESTAMP_SIZE +
value.length];
final ByteBuffer buf = ByteBuffer.wrap(res);
buf.put((byte) 0x00); // header size
buf.putLong(TIMESTAMP);
- buf.put(VALUE);
+ buf.put(value);
return res;
}