This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5729bb614fb MINOR: code cleanup (#21767)
5729bb614fb is described below
commit 5729bb614fbd3397a7894d3831fb46ddc555a27b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 02:46:23 2026 -0700
MINOR: code cleanup (#21767)
Remove unnecessary overrides and unused code.
Reviewers: TengYao Chi <[email protected]>
---
.../internals/MeteredTimestampedKeyValueStore.java | 16 ---------
.../MeteredTimestampedKeyValueStoreTest.java | 15 --------
.../kafka/test/InternalMockProcessorContext.java | 40 ----------------------
3 files changed, 71 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 4a4533cd804..bbc7214a842 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -98,22 +98,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
}
- RawAndDeserializedValue<V> getWithBinary(final K key) {
- try {
- return maybeMeasureLatency(
- () -> {
- final byte[] rawValue = wrapped().get(serializeKey(key));
- return new RawAndDeserializedValue<>(rawValue,
deserializeValue(rawValue));
- },
- time,
- getSensor
- );
- } catch (final ProcessorStateException e) {
- final String message = String.format(e.getMessage(), key);
- throw new ProcessorStateException(message, e);
- }
- }
-
public boolean putIfDifferentValues(
final K key,
final ValueAndTimestamp<V> newValue,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 7583d452d33..602be7a9e6f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -40,7 +40,6 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
-import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
import org.apache.kafka.test.KeyValueIteratorStub;
import org.junit.jupiter.api.Test;
@@ -61,8 +60,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -233,18 +230,6 @@ public class MeteredTimestampedKeyValueStoreTest {
assertTrue((Double) metric.metricValue() > 0);
}
- @Test
- public void shouldGetWithBinary() {
- setUp();
- when(inner.get(KEY_BYTES)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
-
- init();
-
- final RawAndDeserializedValue<String> valueWithBinary =
metered.getWithBinary(KEY);
- assertEquals(VALUE_AND_TIMESTAMP, valueWithBinary.value);
- assertArrayEquals(VALUE_AND_TIMESTAMP_BYTES, valueWithBinary.rawValue);
- }
-
@Test
public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
setUp();
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 2fa2cee1f6c..0f4012f779a 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -397,14 +397,6 @@ public class InternalMockProcessorContext<KOut, VOut>
this.timestamp = timestamp;
}
- @Override
- public long timestamp() {
- if (recordContext == null) {
- return timestamp;
- }
- return recordContext.timestamp();
- }
-
@Override
public long currentSystemTimeMs() {
return time.milliseconds();
@@ -415,38 +407,6 @@ public class InternalMockProcessorContext<KOut, VOut>
throw new UnsupportedOperationException("this method is not supported
in InternalMockProcessorContext");
}
- @Override
- public String topic() {
- if (recordContext == null) {
- return null;
- }
- return recordContext.topic();
- }
-
- @Override
- public int partition() {
- if (recordContext == null) {
- return -1;
- }
- return recordContext.partition();
- }
-
- @Override
- public long offset() {
- if (recordContext == null) {
- return -1L;
- }
- return recordContext.offset();
- }
-
- @Override
- public Headers headers() {
- if (recordContext == null) {
- return new RecordHeaders();
- }
- return recordContext.headers();
- }
-
@Override
public TaskType taskType() {
return taskType;