This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 15793d8b096 KAFKA-20396: Fix RocksDBStore initialization error 
handling (#21946)
15793d8b096 is described below

commit 15793d8b0967e633a399b51e38da7351a914a9c3
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Apr 7 15:53:15 2026 -0500

    KAFKA-20396: Fix RocksDBStore initialization error handling (#21946)
    
    Throw a TaskCorruptedException when the initialization phase encounters
    an invalid state and EOS is enabled. ProcesorStateManager already
    handles this exception by wiping the state store out.
    
    Also, added a new refactor to the testing class for segment-based
    stores, which makes it easier to test for corruption in these stores.
    
    Reviewers: Nick Telfford <[email protected]>,  Bill Bejeck
     <[email protected]>, Matthias Sax <[email protected]>
---
 .../streams/errors/ProcessorStateException.java    |   2 +-
 .../streams/errors/TaskCorruptedException.java     |   6 +
 .../streams/state/internals/KeyValueSegments.java  |   1 +
 .../state/internals/LogicalKeyValueSegments.java   |   1 +
 .../streams/state/internals/RocksDBStore.java      |  22 ++-
 .../internals/SessionSegmentsWithHeaders.java      |   1 +
 .../state/internals/TimestampedSegments.java       |   1 +
 .../internals/TimestampedSegmentsWithHeaders.java  |   1 +
 .../state/internals/WindowSegmentsWithHeaders.java |   1 +
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  97 ++++++++++++-
 .../state/internals/AbstractSegmentsTest.java      | 152 +++++++++++++++++++++
 .../state/internals/KeyValueSegmentsTest.java      |  38 +-----
 .../internals/LogicalKeyValueSegmentsTest.java     |  33 +----
 .../streams/state/internals/RocksDBStoreTest.java  |   6 +-
 .../internals/SessionSegmentsWithHeadersTest.java  |  28 +---
 .../state/internals/TimestampedSegmentsTest.java   |  32 +----
 .../TimestampedSegmentsWithHeadersTest.java        |  30 +---
 .../kafka/test/InternalMockProcessorContext.java   |  18 +++
 18 files changed, 318 insertions(+), 152 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 1e40b79fd86..e09f773584a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.errors;
 
 /**
- * Indicates a processor state operation (e.g. put, get) has failed.
+ * Indicates a processor state operation (e.g. init, put, get) has failed.
  *
  * @see org.apache.kafka.streams.processor.StateStore
  */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 8fda0fb19b1..09f5f9a4f2b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -44,6 +44,12 @@ public class TaskCorruptedException extends StreamsException 
{
         this.corruptedTasks = corruptedTasks;
     }
 
+    public TaskCorruptedException(final Set<TaskId> corruptedTasks,
+                                  final ProcessorStateException e) {
+        super("Tasks " + corruptedTasks + " are corrupted and hence need to be 
re-initialized", e);
+        this.corruptedTasks = corruptedTasks;
+    }
+
     public Set<TaskId> corruptedTasks() {
         return corruptedTasks;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index d2def2c010f..84161c04760 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -42,6 +42,7 @@ class KeyValueSegments extends 
AbstractSegments<KeyValueSegment> {
 
     @Override
     protected void openSegmentDB(final KeyValueSegment segment, final 
StateStoreContext context) {
+        segment.setTaskId(context.taskId());
         segment.openDB(context.appConfigs(), context.stateDir());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 01c91efd602..9fbdf762c8d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -93,6 +93,7 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     @Override
     public void openExisting(final StateStoreContext context, final long 
streamTime) {
         metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
+        physicalStore.setTaskId(context.taskId());
         physicalStore.openDB(context.appConfigs(), context.stateDir());
         position.merge(physicalStore.getPosition());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a150d15c5d2..dcf91dec16b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,8 +28,10 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
 import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.query.Position;
@@ -140,6 +142,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
     protected StateStoreContext context;
     protected Position position;
+    private TaskId taskId;
 
     public RocksDBStore(final String name,
                         final String metricsScope) {
@@ -165,6 +168,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
     @Override
     public void init(final StateStoreContext stateStoreContext,
                      final StateStore root) {
+        this.taskId = stateStoreContext.taskId();
         // open the DB dir
         metricsRecorder.init(metricsImpl(stateStoreContext), 
stateStoreContext.taskId());
         openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());
@@ -256,11 +260,15 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
                     // For segmented stores, the overall position is composed 
of multiple underlying stores, so merge this store's position into it.
                     position.merge(existingPositionOrEmpty);
                 }
-            } catch (final StreamsException fatal) {
-                final String fatalMessage = "State store " + name + " didn't 
find a valid state, since under EOS it has the risk of getting uncommitted data 
in stores";
+            } catch (final ProcessorStateException e) {
+                final String message = "State store " + name + " didn't find a 
valid state, since under EOS it has the risk of getting uncommitted data in 
stores";
+                throw new TaskCorruptedException(Set.of(taskId), new 
ProcessorStateException(message, e));
+            }  catch (final StreamsException fatal) {
+                final String fatalMessage = "Fatal error while opening store " 
+ name;
+                throw new ProcessorStateException(fatalMessage, fatal);
+            } catch (final RocksDBException fatal) {
+                final String fatalMessage = "Error opening store " + name;
                 throw new ProcessorStateException(fatalMessage, fatal);
-            } catch (final RocksDBException e) {
-                throw new ProcessorStateException("Error opening store " + 
name, e);
             }
         } catch (final RuntimeException e) {
             closeNativeResources();
@@ -283,6 +291,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         }
     }
 
+    void setTaskId(final TaskId taskId) {
+        this.taskId = taskId;
+    }
+
     private void addValueProvidersToMetricsRecorder() {
         final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
         if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
@@ -1001,7 +1013,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
         /**
          * Initializes the ColumnFamily.
          * @return the position of the store based on the data in the 
ColumnFamily. If no offset position is found, an empty position is returned.
-         * @throws StreamsException if an invalid state is found and 
ignoreInvalidState is false
+         * @throws ProcessorStateException if an invalid state is found and 
ignoreInvalidState is false
          */
         Position open(final RocksDBStore.DBAccessor accessor, final boolean 
ignoreInvalidState) throws RocksDBException, StreamsException;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
index 9d940a26453..88ef15e4643 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
@@ -42,6 +42,7 @@ class SessionSegmentsWithHeaders extends 
AbstractSegments<SessionSegmentWithHead
 
     @Override
     protected void openSegmentDB(final SessionSegmentWithHeaders segment, 
final StateStoreContext context) {
+        segment.setTaskId(context.taskId());
         segment.openDB(context.appConfigs(), context.stateDir());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index f63b3f64d59..3c372cba4da 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -42,6 +42,7 @@ class TimestampedSegments extends 
AbstractSegments<TimestampedSegment> {
 
     @Override
     protected void openSegmentDB(final TimestampedSegment segment, final 
StateStoreContext context) {
+        segment.setTaskId(context.taskId());
         segment.openDB(context.appConfigs(), context.stateDir());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
index 9b6433d041e..ba2cc0537ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -42,6 +42,7 @@ class TimestampedSegmentsWithHeaders extends 
AbstractSegments<TimestampedSegment
 
     @Override
     protected void openSegmentDB(final TimestampedSegmentWithHeaders segment, 
final StateStoreContext context) {
+        segment.setTaskId(context.taskId());
         segment.openDB(context.appConfigs(), context.stateDir());
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
index c6fc08f467e..657dcde5c43 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
@@ -47,6 +47,7 @@ class WindowSegmentsWithHeaders extends 
AbstractSegments<WindowSegmentWithHeader
 
     @Override
     protected void openSegmentDB(final WindowSegmentWithHeaders segment, final 
StateStoreContext context) {
+        segment.setTaskId(context.taskId());
         segment.openDB(context.appConfigs(), context.stateDir());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index a9d794c81c5..c4ab91aac11 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -25,8 +25,12 @@ import 
org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -34,6 +38,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -54,6 +59,12 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
 import org.rocksdb.WriteBatch;
 
 import java.io.File;
@@ -70,6 +81,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SimpleTimeZone;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -82,6 +94,7 @@ import static org.hamcrest.Matchers.hasEntry;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -97,6 +110,9 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
     final long retention = 1000;
     final long segmentInterval = 60_000L;
     final String storeName = "bytes-store";
+    private final Serializer<String> stringSerializer = new StringSerializer();
+    private final Deserializer<String> stringDeserializer = new 
StringDeserializer();
+    private final Serializer<Long> longSerializer = new LongSerializer();
 
     public SegmentedBytesStore.KeySchema schema;
 
@@ -133,13 +149,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         bytesStore = getBytesStore();
 
         stateDir = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext<>(
-            stateDir,
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
+        context = getProcessorContext();
         bytesStore.init(context, bytesStore);
     }
 
@@ -148,6 +158,28 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         bytesStore.close();
     }
 
+    private InternalMockProcessorContext<?, ?> getProcessorContext() {
+        return new InternalMockProcessorContext<>(
+            stateDir,
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
+    }
+
+    private InternalMockProcessorContext<?, ?> getEOSProcessorContext() {
+        final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
+        streamsProps.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        return new InternalMockProcessorContext<>(
+                stateDir,
+                Serdes.String(),
+                Serdes.Long(),
+                new MockRecordCollector(),
+                new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
+                new StreamsConfig(streamsProps));
+    }
+
     abstract AbstractRocksDBSegmentedBytesStore<S> getBytesStore();
 
     abstract AbstractSegments<S> newSegments();
@@ -514,6 +546,57 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         shouldRestoreToByteStore();
     }
 
+    @ParameterizedTest
+    @MethodSource("getKeySchemas")
+    public void 
shouldThrowTaskCorruptedExceptionIfSegmentIsInInvalidState(final 
SegmentedBytesStore.KeySchema schema) throws Exception {
+        before(schema);
+        final InternalMockProcessorContext<?, ?> eosContext = 
getEOSProcessorContext();
+        bytesStore = getBytesStore();
+        bytesStore.init(eosContext, bytesStore);
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), 
serializeValue(30));
+        bytesStore.put(serializeKey(new Windowed<>("a", windows[1])), 
serializeValue(50));
+        bytesStore.close();
+        bytesStore = getBytesStore();
+        overwritePersistedStoreStatusToOpen();
+        final TaskCorruptedException thrown = 
assertThrows(TaskCorruptedException.class, () -> bytesStore.init(eosContext, 
bytesStore));
+        assertEquals("Tasks [0_0] are corrupted and hence need to be 
re-initialized", thrown.getMessage());
+    }
+
+    private void overwritePersistedStoreStatusToOpen() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+        final Long openState = 1L;
+
+
+        final String dbPath = new File(new File(stateDir, "bytes-store"), 
"bytes-store.0").getAbsolutePath();
+        final List<ColumnFamilyDescriptor> existingColumnFamilies = 
RocksDB.listColumnFamilies(new Options(), dbPath).stream()
+                .map(b -> new ColumnFamilyDescriptor(b, columnFamilyOptions))
+                .collect(Collectors.toList());
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(existingColumnFamilies.size());
+        RocksDB db = null;
+        ColumnFamilyHandle offsetsColumnFamily = null;
+        try {
+            db = RocksDB.open(
+                    dbOptions,
+                    new File(new File(stateDir, "bytes-store"), 
"bytes-store.0").getAbsolutePath(),
+                    existingColumnFamilies,
+                    columnFamilies);
+            final byte[] statusKey = stringSerializer.serialize(null, 
"status");
+
+            offsetsColumnFamily = columnFamilies.get(columnFamilies.size() - 
1);
+            db.put(offsetsColumnFamily, statusKey, 
longSerializer.serialize(null, openState));
+        } finally {
+            if (db != null) {
+                db.close();
+            }
+            for (final ColumnFamilyHandle columnFamily : columnFamilies) {
+                columnFamily.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
     private void shouldRestoreToByteStore() {
         bytesStore.init(context, bytesStore);
         // 0 segments initially.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
new file mode 100644
index 00000000000..d729d07f181
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+abstract class AbstractSegmentsTest<S extends Segments> {
+    protected S segments;
+    protected InternalMockProcessorContext<?, ?> context;
+    private final Serializer<String> stringSerializer = new StringSerializer();
+    private final Serializer<Long> longSerializer = new LongSerializer();
+
+
+    abstract S getSegments();
+
+
+    @BeforeEach
+    public void setUp() {
+        context = getProcessorContext();
+        segments = getSegments();
+        segments.openExisting(context, 0L);
+    }
+
+    private InternalMockProcessorContext<?, ?> getProcessorContext() {
+        return new InternalMockProcessorContext<>(
+                TestUtils.tempDirectory(),
+                Serdes.String(),
+                Serdes.Long(),
+                new MockRecordCollector(),
+                new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
+                new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
+    }
+
+    private InternalMockProcessorContext<?, ?> getEOSProcessorContext() {
+        final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
+        streamsProps.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE_V2);
+        return new InternalMockProcessorContext<>(
+                TestUtils.tempDirectory(),
+                Serdes.String(),
+                Serdes.Long(),
+                new MockRecordCollector(),
+                new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())),
+                new StreamsConfig(streamsProps));
+    }
+
+    @AfterEach
+    public void tearDown() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldThrowTaskCorruptedExceptionIfSegmentIsInInvalidState() 
throws Exception {
+        context = getEOSProcessorContext();
+        segments = getSegments();
+        segments.openExisting(context, 0L);
+        final Segment segment = segments.getOrCreateSegmentIfLive(0, context, 
1L);
+
+        assertTrue(segment.isOpen());
+        segments.close();
+
+        simulateUncleanShutdownForAllSegments();
+        segments = getSegments();
+        final TaskCorruptedException thrown = 
assertThrows(TaskCorruptedException.class, () -> segments.openExisting(context, 
0L));
+        assertEquals("Tasks [0_0] are corrupted and hence need to be 
re-initialized", thrown.getMessage());
+    }
+
+    private void simulateUncleanShutdownForAllSegments() throws Exception {
+        for (final File dbDir : 
Objects.requireNonNull(context.stateDir().listFiles())) {
+            for (final File storeDir : 
Objects.requireNonNull(dbDir.listFiles())) {
+                final DBOptions dbOptions = new DBOptions();
+                final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+                final Long openState = 1L;
+                final String dbPath = storeDir.getAbsolutePath();
+                final List<ColumnFamilyDescriptor> existingColumnFamilies = 
RocksDB.listColumnFamilies(new Options(), dbPath).stream()
+                        .map(b -> new ColumnFamilyDescriptor(b, 
columnFamilyOptions))
+                        .collect(Collectors.toList());
+                final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(existingColumnFamilies.size());
+                RocksDB db = null;
+                ColumnFamilyHandle offsetsColumnFamily = null;
+                try {
+                    db = RocksDB.open(
+                            dbOptions,
+                            storeDir.getAbsolutePath(),
+                            existingColumnFamilies,
+                            columnFamilies);
+                    final byte[] statusKey = stringSerializer.serialize(null, 
"status");
+
+                    offsetsColumnFamily = 
columnFamilies.get(columnFamilies.size() - 1);
+                    db.put(offsetsColumnFamily, statusKey, 
longSerializer.serialize(null, openState));
+                } finally {
+                    if (db != null) {
+                        db.close();
+                    }
+                    for (final ColumnFamilyHandle columnFamily : 
columnFamilies) {
+                        columnFamily.close();
+                    }
+                    dbOptions.close();
+                    columnFamilyOptions.close();
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index ee0c41802bb..4bc468d9785 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -16,16 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -43,34 +33,18 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class KeyValueSegmentsTest {
+public class KeyValueSegmentsTest extends 
AbstractSegmentsTest<KeyValueSegments> {
 
     private static final int NUM_SEGMENTS = 5;
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private static final String METRICS_SCOPE = "test-state-id";
-    private InternalMockProcessorContext context;
-    private KeyValueSegments segments;
-    private File stateDirectory;
     private final String storeName = "test";
 
-    @BeforeEach
-    public void createContext() {
-        stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext<>(
-            stateDirectory,
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
-        segments = new KeyValueSegments(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
-        segments.openExisting(context, -1L);
-    }
 
-    @AfterEach
-    public void close() {
-        segments.close();
+    @Override
+    KeyValueSegments getSegments() {
+        return new KeyValueSegments(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
     }
 
     @Test
@@ -294,7 +268,7 @@ public class KeyValueSegmentsTest {
 
         segments = new KeyValueSegments(storeName,  METRICS_SCOPE, 
NUM_SEGMENTS * segmentInterval, segmentInterval);
 
-        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + 
File.separator + storeName;
+        final String storeDirectoryPath = context.stateDir().getAbsolutePath() 
+ File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
         //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
@@ -319,7 +293,7 @@ public class KeyValueSegmentsTest {
 
     @Test
     public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() 
throws Exception {
-        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + 
File.separator + storeName;
+        final String storeDirectoryPath = context.stateDir().getAbsolutePath() 
+ File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
         //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 2fbbfe4e947..95cb3fe1fda 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -16,20 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -43,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class LogicalKeyValueSegmentsTest {
+public class LogicalKeyValueSegmentsTest extends 
AbstractSegmentsTest<LogicalKeyValueSegments> {
 
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
@@ -51,32 +42,16 @@ public class LogicalKeyValueSegmentsTest {
     private static final String METRICS_SCOPE = "metrics-scope";
     private static final String DB_FILE_DIR = "rocksdb";
 
-    private InternalMockProcessorContext<?, ?> context;
 
-    private LogicalKeyValueSegments segments;
-
-    @BeforeEach
-    public void setUp() {
-        context = new InternalMockProcessorContext<>(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
-        segments = new LogicalKeyValueSegments(
+    @Override
+    public LogicalKeyValueSegments getSegments() {
+        return new LogicalKeyValueSegments(
             STORE_NAME,
             DB_FILE_DIR,
             RETENTION_PERIOD,
             SEGMENT_INTERVAL,
             new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
         );
-        segments.openExisting(context, 0L);
-    }
-
-    @AfterEach
-    public void tearDown() {
-        segments.close();
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 128534a1bb8..7734646a5c2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
@@ -273,8 +274,9 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
 
         overwritePersistedStoreStatusToOpen();
 
-        final ProcessorStateException stateException = 
assertThrows(ProcessorStateException.class, () -> rocksDBStore.init(eosContext, 
rocksDBStore));
-        assertEquals("State store " + DB_NAME + " didn't find a valid state, 
since under EOS it has the risk of getting uncommitted data in stores", 
stateException.getMessage());
+        final TaskCorruptedException stateException = 
assertThrows(TaskCorruptedException.class, () -> rocksDBStore.init(eosContext, 
rocksDBStore));
+        assertEquals("Tasks [0_0] are corrupted and hence need to be 
re-initialized", stateException.getMessage());
+        assertEquals("State store " + DB_NAME + " didn't find a valid state, 
since under EOS it has the risk of getting uncommitted data in stores", 
stateException.getCause().getMessage());
         rocksDBStore.close();
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
index eea7d60a16f..bf2c298b80c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
@@ -16,16 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -36,28 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class SessionSegmentsWithHeadersTest {
+public class SessionSegmentsWithHeadersTest extends 
AbstractSegmentsTest<SessionSegmentsWithHeaders> {
 
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private static final String METRICS_SCOPE = "test-state-id";
-    private InternalMockProcessorContext context;
-    private SessionSegmentsWithHeaders segments;
-    private File stateDirectory;
     private final String storeName = "test";
 
-    @BeforeEach
-    public void createContext() {
-        stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext<>(
-            stateDirectory,
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
-        segments = new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
-        segments.openExisting(context, -1L);
+    @Override
+    public SessionSegmentsWithHeaders getSegments() {
+        return new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
     }
 
     @AfterEach
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index f9c608dbb93..1ccafcd0105 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -16,16 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -43,29 +35,17 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TimestampedSegmentsTest {
+public class TimestampedSegmentsTest extends 
AbstractSegmentsTest<TimestampedSegments> {
 
     private static final int NUM_SEGMENTS = 5;
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private static final String METRICS_SCOPE = "test-state-id";
-    private InternalMockProcessorContext context;
-    private TimestampedSegments segments;
-    private File stateDirectory;
     private final String storeName = "test";
 
-    @BeforeEach
-    public void createContext() {
-        stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext<>(
-            stateDirectory,
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
-        segments = new TimestampedSegments(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
-        segments.openExisting(context, -1L);
+    @Override
+    TimestampedSegments getSegments() {
+        return new TimestampedSegments(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
     }
 
     @AfterEach
@@ -295,7 +275,7 @@ public class TimestampedSegmentsTest {
 
         segments = new TimestampedSegments(storeName, METRICS_SCOPE, 
NUM_SEGMENTS * segmentInterval, segmentInterval);
 
-        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + 
File.separator + storeName;
+        final String storeDirectoryPath = context.stateDir().getAbsolutePath() 
+ File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
         //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
@@ -320,7 +300,7 @@ public class TimestampedSegmentsTest {
 
     @Test
     public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() 
throws Exception {
-        final String storeDirectoryPath = stateDirectory.getAbsolutePath() + 
File.separator + storeName;
+        final String storeDirectoryPath = context.stateDir().getAbsolutePath() 
+ File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
         //noinspection ResultOfMethodCallIgnored
         storeDirectory.mkdirs();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
index b3651cf7b25..1e1b40e7c6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -16,16 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
-
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
@@ -36,28 +27,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TimestampedSegmentsWithHeadersTest {
-
+public class TimestampedSegmentsWithHeadersTest extends 
AbstractSegmentsTest<TimestampedSegmentsWithHeaders> {
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private static final String METRICS_SCOPE = "test-state-id";
-    private InternalMockProcessorContext context;
-    private TimestampedSegmentsWithHeaders segments;
-    private File stateDirectory;
     private final String storeName = "test";
 
-    @BeforeEach
-    public void createContext() {
-        stateDirectory = TestUtils.tempDirectory();
-        context = new InternalMockProcessorContext<>(
-            stateDirectory,
-            Serdes.String(),
-            Serdes.Long(),
-            new MockRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
-        );
-        segments = new TimestampedSegmentsWithHeaders(storeName, 
METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
-        segments.openExisting(context, -1L);
+    @Override
+    TimestampedSegmentsWithHeaders getSegments() {
+        return new TimestampedSegmentsWithHeaders(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
     }
 
     @AfterEach
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 0f4012f779a..03796f56be5 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -202,6 +202,24 @@ public class InternalMockProcessorContext<KOut, VOut>
         );
     }
 
+    public InternalMockProcessorContext(final File stateDir,
+                                        final Serde<?> keySerde,
+                                        final Serde<?> valueSerde,
+                                        final RecordCollector collector,
+                                        final ThreadCache cache,
+                                        final StreamsConfig config) {
+        this(
+                stateDir,
+                keySerde,
+                valueSerde,
+                new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
+                config,
+                () -> collector,
+                cache,
+                Time.SYSTEM
+        );
+    }
+
     public InternalMockProcessorContext(final File stateDir,
                                         final Serde<?> keySerde,
                                         final Serde<?> valueSerde,


Reply via email to