This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3fd16d0c9b0 KAFKA-20203: setFlushListener should pass record.headers() 
instead of empty headers (#21802)
3fd16d0c9b0 is described below

commit 3fd16d0c9b055748be52d1eedb7bcad8d49dc8a7
Author: ChickenchickenLove <[email protected]>
AuthorDate: Sat Mar 28 06:34:53 2026 +0900

    KAFKA-20203: setFlushListener should pass record.headers() instead of empty 
headers (#21802)
    
    Make `setFlushListener` pass `record.headers()` instead of empty
    headers.  Already done correctly for kv- and session-store case. This PR
    closes the gap  for window-store case.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../state/internals/MeteredWindowStore.java        |  4 +-
 .../state/internals/MeteredKeyValueStoreTest.java  | 65 ++++++++++++++++++++++
 .../state/internals/MeteredSessionStoreTest.java   | 64 +++++++++++++++++++++
 .../state/internals/MeteredWindowStoreTest.java    | 47 ++++++++++++++++
 4 files changed, 178 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 5455f635a74..bbc451feafa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -205,8 +205,8 @@ public class MeteredWindowStore<K, V>
                 record -> listener.apply(
                     record.withKey(WindowKeySchema.fromStoreKey(record.key(), 
windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
                         .withValue(new Change<>(
-                            record.value().newValue != null ? 
serdes.valueFrom(record.value().newValue, new RecordHeaders()) : null,
-                            record.value().oldValue != null ? 
serdes.valueFrom(record.value().oldValue, new RecordHeaders()) : null,
+                            record.value().newValue != null ? 
serdes.valueFrom(record.value().newValue, record.headers()) : null,
+                            record.value().oldValue != null ? 
serdes.valueFrom(record.value().oldValue, record.headers()) : null,
                             record.value().isLatest
                         ))
                 ),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 42fee48f61d..a50d833edff 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -30,23 +32,33 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KeyValueIteratorStub;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -61,14 +73,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -350,6 +365,56 @@ public class MeteredKeyValueStoreTest {
         assertTrue(metered.setFlushListener(null, false));
     }
 
+    @Test
+    public void 
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+        final String headerKey = "flush";
+        final Deserializer<String> valueDeserializer = 
mock(Deserializer.class);
+        final Serde<String> valueSerde = 
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+        when(valueDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn(VALUE);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(new Metrics(), "test", new MockTime());
+        final InternalMockProcessorContext<?, ?> processorContext = new 
InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            MockRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 1024L, 
streamsMetrics),
+            Time.SYSTEM
+        );
+
+        final InMemoryKeyValueStore innerStore = new 
InMemoryKeyValueStore(STORE_NAME);
+        final CachingKeyValueStore cachingStore = new CachingKeyValueStore(
+            innerStore,
+            CachingKeyValueStore.CacheType.KEY_VALUE_STORE
+        );
+        final MeteredKeyValueStore<String, String> meteredStore = new 
MeteredKeyValueStore<>(
+            cachingStore,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            valueSerde
+        );
+        meteredStore.init(processorContext, meteredStore);
+        assertTrue(meteredStore.setFlushListener(record -> { }, false));
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+
+        processorContext.setRecordContext(new ProcessorRecordContext(0L, 0L, 
0, "topic", headers));
+        meteredStore.put(KEY, VALUE);
+        meteredStore.commit(Map.of());
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(valueDeserializer).deserialize(anyString(), 
headersCaptor.capture(), any(byte[].class));
+
+        final Header capturedLastHeader = 
headersCaptor.getValue().lastHeader(headerKey);
+        assertNotNull(capturedLastHeader);
+        assertThat(new String(capturedLastHeader.value(), 
StandardCharsets.UTF_8), equalTo("new"));
+    }
+
     @Test
     public void shouldNotThrowNullPointerExceptionIfGetReturnsNull() {
         setUp();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index f3ca4cf3786..78b49786f75 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -30,26 +32,35 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KeyValueIteratorStub;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -68,10 +79,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -626,6 +639,57 @@ public class MeteredSessionStoreTest {
         assertTrue(store.setFlushListener(null, false));
     }
 
+    @Test
+    public void 
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+        final String headerKey = "flush";
+        final Deserializer<String> valueDeserializer = 
mock(Deserializer.class);
+        final Serde<String> valueSerde = 
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+        when(valueDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn(VALUE);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(new Metrics(), "test", new MockTime());
+        final InternalMockProcessorContext<?, ?> processorContext = new 
InternalMockProcessorContext<>(
+            TestUtils.tempDirectory(),
+            Serdes.String(),
+            Serdes.String(),
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            MockRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 1024L, 
streamsMetrics),
+            Time.SYSTEM
+        );
+
+        final InMemorySessionStore inner = new InMemorySessionStore(
+            "flush-listener-inner",
+            RETENTION_PERIOD,
+            STORE_TYPE
+        );
+        final CachingSessionStore cachingStore = new 
CachingSessionStore(inner, 100L);
+        final MeteredSessionStore<String, String> metered = new 
MeteredSessionStore<>(
+            cachingStore,
+            STORE_TYPE,
+            Serdes.String(),
+            valueSerde,
+            new MockTime()
+        );
+        metered.init(processorContext, metered);
+        assertTrue(metered.setFlushListener(record -> { }, false));
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+
+        processorContext.setRecordContext(new 
ProcessorRecordContext(END_TIMESTAMP, 0L, 0, "topic", headers));
+        metered.put(WINDOWED_KEY, VALUE);
+        metered.commit(Map.of());
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(valueDeserializer).deserialize(anyString(), 
headersCaptor.capture(), any(byte[].class));
+
+        final Header capturedLastHeader = 
headersCaptor.getValue().lastHeader(headerKey);
+        assertThat(capturedLastHeader, not(nullValue()));
+        assertThat(new String(capturedLastHeader.value(), 
StandardCharsets.UTF_8), equalTo("new"));
+    }
+
     @Test
     public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
         setUpWithoutContext();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 6b60bacb6bc..35685a96276 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
@@ -35,6 +37,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -47,10 +50,12 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.nio.charset.StandardCharsets;
 import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Map;
@@ -71,10 +76,12 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -389,6 +396,46 @@ public class MeteredWindowStoreTest {
         assertTrue(metered.setFlushListener(null, false));
     }
 
+    @Test
+    public void 
shouldPassRecordHeadersToValueDeserializerWhenFlushListenerIsSet() {
+        final String headerKey = "flush";
+        final Deserializer<String> valueDeserializer = 
mock(Deserializer.class);
+        final Serde<String> valueSerde = 
Serdes.serdeFrom(Serdes.String().serializer(), valueDeserializer);
+        when(valueDeserializer.deserialize(anyString(), any(Headers.class), 
any(byte[].class))).thenReturn("value");
+
+        final InMemoryWindowStore inner = new InMemoryWindowStore(
+            "flush-listener-inner",
+            RETENTION_PERIOD,
+            WINDOW_SIZE_MS,
+            false,
+            STORE_TYPE
+        );
+        final CachingWindowStore cachingStore = new CachingWindowStore(inner, 
WINDOW_SIZE_MS, 100L);
+        final MeteredWindowStore<String, String> metered = new 
MeteredWindowStore<>(
+            cachingStore,
+            WINDOW_SIZE_MS,
+            STORE_TYPE,
+            new MockTime(),
+            Serdes.String(),
+            valueSerde
+        );
+        metered.init(context, metered);
+        assertTrue(metered.setFlushListener(record -> { }, false));
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add(headerKey, "new".getBytes(StandardCharsets.UTF_8));
+        
+        context.setRecordContext(new ProcessorRecordContext(TIMESTAMP, 0L, 0, 
"topic", headers));
+        metered.put(KEY, "value", TIMESTAMP);
+
+        final ArgumentCaptor<Headers> headersCaptor = 
ArgumentCaptor.forClass(Headers.class);
+        verify(valueDeserializer).deserialize(anyString(), 
headersCaptor.capture(), any(byte[].class));
+
+        final Header capturedLastHeader = 
headersCaptor.getValue().lastHeader(headerKey);
+        assertThat(capturedLastHeader, not(nullValue()));
+        assertThat(new String(capturedLastHeader.value(), 
StandardCharsets.UTF_8), equalTo("new"));
+    }
+
     @Test
     public void shouldNotSetFlushListenerOnWrappedNoneCachingStore() {
         assertFalse(store.setFlushListener(null, false));

Reply via email to