This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 34ea9dc64e7 KAFKA-20203: setFlushListener should pass record.headers()
instead of empty headers (#21802)
34ea9dc64e7 is described below
commit 34ea9dc64e7c417539d1f13178e43ff96ab2583a
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sat Mar 28 06:34:53 2026 +0900
KAFKA-20203: setFlushListener should pass record.headers() instead of empty
headers (#21802)
Make `setFlushListener` pass `record.headers()` instead of empty
headers. Already done correctly for kv- and session-store case. This PR
closes the gap for window-store case.
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/MeteredWindowStore.java | 4 +-
.../state/internals/MeteredKeyValueStoreTest.java | 65 ++++++++++++++++++++++
.../state/internals/MeteredSessionStoreTest.java | 64 +++++++++++++++++++++
.../state/internals/MeteredWindowStoreTest.java | 47 ++++++++++++++++
4 files changed, 178 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 53ec42339bf..25d98162bd8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -204,8 +204,8 @@ public class MeteredWindowStore<K, V>
record -> listener.apply(
record.withKey(WindowKeySchema.fromStoreKey(record.key(),
windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
.withValue(new Change<>(
- record.value().newValue != null ?
serdes.valueFrom(record.value().newValue, new RecordHeaders()) : null,
- record.value().oldValue != null ?
serdes.valueFrom(record.value().oldValue, new RecordHeaders()) : null,
+ record.value().newValue != null ?
serdes.valueFrom(record.value().newValue, record.headers()) : null,
+ record.value().oldValue != null ?
serdes.valueFrom(record.value().oldValue, record.headers()) : null,
record.value().isLatest
))
),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 42fee48f61d..a50d833edff 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -30,23 +32,33 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KeyValueIteratorStub;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -61,14 +73,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -350,6 +365,56 @@ public class MeteredKeyValueStoreTest {
assertTrue(metered.setFlushListener(null, false));
}
+ @Test
+ public void
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+ final String headerKey = "flush";
+ final Deserializer<String> valueDeserializer =
mock(Deserializer.class);
+ final Serde<String> valueSerde =
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+ when(valueDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn(VALUE);
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(new Metrics(), "test", new MockTime());
+ final InternalMockProcessorContext<?, ?> processorContext = new
InternalMockProcessorContext<>(
+ TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.String(),
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ MockRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 1024L,
streamsMetrics),
+ Time.SYSTEM
+ );
+
+ final InMemoryKeyValueStore innerStore = new
InMemoryKeyValueStore(STORE_NAME);
+ final CachingKeyValueStore cachingStore = new CachingKeyValueStore(
+ innerStore,
+ CachingKeyValueStore.CacheType.KEY_VALUE_STORE
+ );
+ final MeteredKeyValueStore<String, String> meteredStore = new
MeteredKeyValueStore<>(
+ cachingStore,
+ STORE_TYPE,
+ new MockTime(),
+ Serdes.String(),
+ valueSerde
+ );
+ meteredStore.init(processorContext, meteredStore);
+ assertTrue(meteredStore.setFlushListener(record -> { }, false));
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+
+ processorContext.setRecordContext(new ProcessorRecordContext(0L, 0L,
0, "topic", headers));
+ meteredStore.put(KEY, VALUE);
+ meteredStore.commit(Map.of());
+
+ final ArgumentCaptor<Headers> headersCaptor =
ArgumentCaptor.forClass(Headers.class);
+ verify(valueDeserializer).deserialize(anyString(),
headersCaptor.capture(), any(byte[].class));
+
+ final Header capturedLastHeader =
headersCaptor.getValue().lastHeader(headerKey);
+ assertNotNull(capturedLastHeader);
+ assertThat(new String(capturedLastHeader.value(),
StandardCharsets.UTF_8), equalTo("new"));
+ }
+
@Test
public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() {
setUp();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f3ca4cf3786..78b49786f75 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -18,6 +18,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -30,26 +32,35 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KeyValueIteratorStub;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -68,10 +79,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -626,6 +639,57 @@ public class MeteredSessionStoreTest {
assertTrue(store.setFlushListener(null, false));
}
+ @Test
+ public void
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+ final String headerKey = "flush";
+ final Deserializer<String> valueDeserializer =
mock(Deserializer.class);
+ final Serde<String> valueSerde =
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+ when(valueDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn(VALUE);
+
+ final StreamsMetricsImpl streamsMetrics =
+ new StreamsMetricsImpl(new Metrics(), "test", new MockTime());
+ final InternalMockProcessorContext<?, ?> processorContext = new
InternalMockProcessorContext<>(
+ TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.String(),
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ MockRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 1024L,
streamsMetrics),
+ Time.SYSTEM
+ );
+
+ final InMemorySessionStore inner = new InMemorySessionStore(
+ "flush-listener-inner",
+ RETENTION_PERIOD,
+ STORE_TYPE
+ );
+ final CachingSessionStore cachingStore = new
CachingSessionStore(inner, 100L);
+ final MeteredSessionStore<String, String> metered = new
MeteredSessionStore<>(
+ cachingStore,
+ STORE_TYPE,
+ Serdes.String(),
+ valueSerde,
+ new MockTime()
+ );
+ metered.init(processorContext, metered);
+ assertTrue(metered.setFlushListener(record -> { }, false));
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+
+ processorContext.setRecordContext(new
ProcessorRecordContext(END_TIMESTAMP, 0L, 0, "topic", headers));
+ metered.put(WINDOWED_KEY, VALUE);
+ metered.commit(Map.of());
+
+ final ArgumentCaptor<Headers> headersCaptor =
ArgumentCaptor.forClass(Headers.class);
+ verify(valueDeserializer).deserialize(anyString(),
headersCaptor.capture(), any(byte[].class));
+
+ final Header capturedLastHeader =
headersCaptor.getValue().lastHeader(headerKey);
+ assertThat(capturedLastHeader, not(nullValue()));
+ assertThat(new String(capturedLastHeader.value(),
StandardCharsets.UTF_8), equalTo("new"));
+ }
+
@Test
public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
setUpWithoutContext();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 6b60bacb6bc..35685a96276 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -17,6 +17,8 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
@@ -35,6 +37,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -47,10 +50,12 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.nio.charset.StandardCharsets;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
@@ -71,10 +76,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -389,6 +396,46 @@ public class MeteredWindowStoreTest {
assertTrue(metered.setFlushListener(null, false));
}
+ @Test
+ public void
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+ final String headerKey = "flush";
+ final Deserializer<String> valueDeserializer =
mock(Deserializer.class);
+ final Serde<String> valueSerde =
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+ when(valueDeserializer.deserialize(anyString(), any(Headers.class),
any(byte[].class))).thenReturn("value");
+
+ final InMemoryWindowStore inner = new InMemoryWindowStore(
+ "flush-listener-inner",
+ RETENTION_PERIOD,
+ WINDOW_SIZE_MS,
+ false,
+ STORE_TYPE
+ );
+ final CachingWindowStore cachingStore = new CachingWindowStore(inner,
WINDOW_SIZE_MS, 100L);
+ final MeteredWindowStore<String, String> metered = new
MeteredWindowStore<>(
+ cachingStore,
+ WINDOW_SIZE_MS,
+ STORE_TYPE,
+ new MockTime(),
+ Serdes.String(),
+ valueSerde
+ );
+ metered.init(context, metered);
+ assertTrue(metered.setFlushListener(record -> { }, false));
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+
+ context.setRecordContext(new ProcessorRecordContext(TIMESTAMP, 0L, 0,
"topic", headers));
+ metered.put(KEY, "value", TIMESTAMP);
+
+ final ArgumentCaptor<Headers> headersCaptor =
ArgumentCaptor.forClass(Headers.class);
+ verify(valueDeserializer).deserialize(anyString(),
headersCaptor.capture(), any(byte[].class));
+
+ final Header capturedLastHeader =
headersCaptor.getValue().lastHeader(headerKey);
+ assertThat(capturedLastHeader, not(nullValue()));
+ assertThat(new String(capturedLastHeader.value(),
StandardCharsets.UTF_8), equalTo("new"));
+ }
+
@Test
public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
assertFalse(store.setFlushListener(null, false));