This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 794d2b625fa KAFKA-20396: Fix RocksDBStore initialization error
handling (#21946)
794d2b625fa is described below
commit 794d2b625fa83ea2208c96682239ca8261d5cc3d
Author: Eduwer Camacaro <[email protected]>
AuthorDate: Tue Apr 7 15:53:15 2026 -0500
KAFKA-20396: Fix RocksDBStore initialization error handling (#21946)
Throw a TaskCorruptedException when the initialization phase encounters
an invalid state and EOS is enabled. ProcesorStateManager already
handles this exception by wiping the state store out.
Also, added a new refactor to the testing class for segment-based
stores, which makes it easier to test for corruption in these stores.
Reviewers: Nick Telfford <[email protected]>, Bill Bejeck
<[email protected]>, Matthias Sax <[email protected]>
---
.../streams/errors/ProcessorStateException.java | 2 +-
.../streams/errors/TaskCorruptedException.java | 6 +
.../streams/state/internals/KeyValueSegments.java | 1 +
.../state/internals/LogicalKeyValueSegments.java | 1 +
.../streams/state/internals/RocksDBStore.java | 22 ++-
.../internals/SessionSegmentsWithHeaders.java | 1 +
.../state/internals/TimestampedSegments.java | 1 +
.../internals/TimestampedSegmentsWithHeaders.java | 1 +
.../state/internals/WindowSegmentsWithHeaders.java | 1 +
.../AbstractRocksDBSegmentedBytesStoreTest.java | 97 ++++++++++++-
.../state/internals/AbstractSegmentsTest.java | 152 +++++++++++++++++++++
.../state/internals/KeyValueSegmentsTest.java | 38 +-----
.../internals/LogicalKeyValueSegmentsTest.java | 33 +----
.../streams/state/internals/RocksDBStoreTest.java | 6 +-
.../internals/SessionSegmentsWithHeadersTest.java | 28 +---
.../state/internals/TimestampedSegmentsTest.java | 32 +----
.../TimestampedSegmentsWithHeadersTest.java | 30 +---
.../kafka/test/InternalMockProcessorContext.java | 18 +++
18 files changed, 318 insertions(+), 152 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 1e40b79fd86..e09f773584a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.errors;
/**
- * Indicates a processor state operation (e.g. put, get) has failed.
+ * Indicates a processor state operation (e.g. init, put, get) has failed.
*
* @see org.apache.kafka.streams.processor.StateStore
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 8fda0fb19b1..09f5f9a4f2b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -44,6 +44,12 @@ public class TaskCorruptedException extends StreamsException
{
this.corruptedTasks = corruptedTasks;
}
+ public TaskCorruptedException(final Set<TaskId> corruptedTasks,
+ final ProcessorStateException e) {
+ super("Tasks " + corruptedTasks + " are corrupted and hence need to be
re-initialized", e);
+ this.corruptedTasks = corruptedTasks;
+ }
+
public Set<TaskId> corruptedTasks() {
return corruptedTasks;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index d2def2c010f..84161c04760 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -42,6 +42,7 @@ class KeyValueSegments extends
AbstractSegments<KeyValueSegment> {
@Override
protected void openSegmentDB(final KeyValueSegment segment, final
StateStoreContext context) {
+ segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index 01c91efd602..9fbdf762c8d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -93,6 +93,7 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
@Override
public void openExisting(final StateStoreContext context, final long
streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
+ physicalStore.setTaskId(context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
position.merge(physicalStore.getPosition());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 7b616eb6b88..d99e9a9c346 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -28,8 +28,10 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.query.Position;
@@ -140,6 +142,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
protected StateStoreContext context;
protected Position position;
+ private TaskId taskId;
public RocksDBStore(final String name,
final String metricsScope) {
@@ -165,6 +168,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
@Override
public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
+ this.taskId = stateStoreContext.taskId();
// open the DB dir
metricsRecorder.init(metricsImpl(stateStoreContext),
stateStoreContext.taskId());
openDB(stateStoreContext.appConfigs(), stateStoreContext.stateDir());
@@ -256,11 +260,15 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
// For segmented stores, the overall position is composed
of multiple underlying stores, so merge this store's position into it.
position.merge(existingPositionOrEmpty);
}
- } catch (final StreamsException fatal) {
- final String fatalMessage = "State store " + name + " didn't
find a valid state, since under EOS it has the risk of getting uncommitted data
in stores";
+ } catch (final ProcessorStateException e) {
+ final String message = "State store " + name + " didn't find a
valid state, since under EOS it has the risk of getting uncommitted data in
stores";
+ throw new TaskCorruptedException(Set.of(taskId), new
ProcessorStateException(message, e));
+ } catch (final StreamsException fatal) {
+ final String fatalMessage = "Fatal error while opening store "
+ name;
+ throw new ProcessorStateException(fatalMessage, fatal);
+ } catch (final RocksDBException fatal) {
+ final String fatalMessage = "Error opening store " + name;
throw new ProcessorStateException(fatalMessage, fatal);
- } catch (final RocksDBException e) {
- throw new ProcessorStateException("Error opening store " +
name, e);
}
} catch (final RuntimeException e) {
closeNativeResources();
@@ -283,6 +291,10 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
}
}
+ void setTaskId(final TaskId taskId) {
+ this.taskId = taskId;
+ }
+
private void addValueProvidersToMetricsRecorder() {
final TableFormatConfig tableFormatConfig =
userSpecifiedOptions.tableFormatConfig();
if (tableFormatConfig instanceof
BlockBasedTableConfigWithAccessibleCache) {
@@ -1001,7 +1013,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
/**
* Initializes the ColumnFamily.
* @return the position of the store based on the data in the
ColumnFamily. If no offset position is found, an empty position is returned.
- * @throws StreamsException if an invalid state is found and
ignoreInvalidState is false
+ * @throws ProcessorStateException if an invalid state is found and
ignoreInvalidState is false
*/
Position open(final RocksDBStore.DBAccessor accessor, final boolean
ignoreInvalidState) throws RocksDBException, StreamsException;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
index 9d940a26453..88ef15e4643 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
@@ -42,6 +42,7 @@ class SessionSegmentsWithHeaders extends
AbstractSegments<SessionSegmentWithHead
@Override
protected void openSegmentDB(final SessionSegmentWithHeaders segment,
final StateStoreContext context) {
+ segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index f63b3f64d59..3c372cba4da 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -42,6 +42,7 @@ class TimestampedSegments extends
AbstractSegments<TimestampedSegment> {
@Override
protected void openSegmentDB(final TimestampedSegment segment, final
StateStoreContext context) {
+ segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
index 9b6433d041e..ba2cc0537ea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -42,6 +42,7 @@ class TimestampedSegmentsWithHeaders extends
AbstractSegments<TimestampedSegment
@Override
protected void openSegmentDB(final TimestampedSegmentWithHeaders segment,
final StateStoreContext context) {
+ segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
index c6fc08f467e..657dcde5c43 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowSegmentsWithHeaders.java
@@ -47,6 +47,7 @@ class WindowSegmentsWithHeaders extends
AbstractSegments<WindowSegmentWithHeader
@Override
protected void openSegmentDB(final WindowSegmentWithHeaders segment, final
StateStoreContext context) {
+ segment.setTaskId(context.taskId());
segment.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index a9d794c81c5..c4ab91aac11 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -25,8 +25,12 @@ import
org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.record.internal.RecordBatch;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -34,6 +38,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
@@ -54,6 +59,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
import org.rocksdb.WriteBatch;
import java.io.File;
@@ -70,6 +81,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SimpleTimeZone;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -82,6 +94,7 @@ import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -97,6 +110,9 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
final long retention = 1000;
final long segmentInterval = 60_000L;
final String storeName = "bytes-store";
+ private final Serializer<String> stringSerializer = new StringSerializer();
+ private final Deserializer<String> stringDeserializer = new
StringDeserializer();
+ private final Serializer<Long> longSerializer = new LongSerializer();
public SegmentedBytesStore.KeySchema schema;
@@ -133,13 +149,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore = getBytesStore();
stateDir = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext<>(
- stateDir,
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
+ context = getProcessorContext();
bytesStore.init(context, bytesStore);
}
@@ -148,6 +158,28 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
bytesStore.close();
}
+ private InternalMockProcessorContext<?, ?> getProcessorContext() {
+ return new InternalMockProcessorContext<>(
+ stateDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
+ }
+
+ private InternalMockProcessorContext<?, ?> getEOSProcessorContext() {
+ final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
+ streamsProps.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ return new InternalMockProcessorContext<>(
+ stateDir,
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
+ new StreamsConfig(streamsProps));
+ }
+
abstract AbstractRocksDBSegmentedBytesStore<S> getBytesStore();
abstract AbstractSegments<S> newSegments();
@@ -514,6 +546,57 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
shouldRestoreToByteStore();
}
+ @ParameterizedTest
+ @MethodSource("getKeySchemas")
+ public void
shouldThrowTaskCorruptedExceptionIfSegmentIsInInvalidState(final
SegmentedBytesStore.KeySchema schema) throws Exception {
+ before(schema);
+ final InternalMockProcessorContext<?, ?> eosContext =
getEOSProcessorContext();
+ bytesStore = getBytesStore();
+ bytesStore.init(eosContext, bytesStore);
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[0])),
serializeValue(30));
+ bytesStore.put(serializeKey(new Windowed<>("a", windows[1])),
serializeValue(50));
+ bytesStore.close();
+ bytesStore = getBytesStore();
+ overwritePersistedStoreStatusToOpen();
+ final TaskCorruptedException thrown =
assertThrows(TaskCorruptedException.class, () -> bytesStore.init(eosContext,
bytesStore));
+ assertEquals("Tasks [0_0] are corrupted and hence need to be
re-initialized", thrown.getMessage());
+ }
+
+ private void overwritePersistedStoreStatusToOpen() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+ final Long openState = 1L;
+
+
+ final String dbPath = new File(new File(stateDir, "bytes-store"),
"bytes-store.0").getAbsolutePath();
+ final List<ColumnFamilyDescriptor> existingColumnFamilies =
RocksDB.listColumnFamilies(new Options(), dbPath).stream()
+ .map(b -> new ColumnFamilyDescriptor(b, columnFamilyOptions))
+ .collect(Collectors.toList());
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(existingColumnFamilies.size());
+ RocksDB db = null;
+ ColumnFamilyHandle offsetsColumnFamily = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(stateDir, "bytes-store"),
"bytes-store.0").getAbsolutePath(),
+ existingColumnFamilies,
+ columnFamilies);
+ final byte[] statusKey = stringSerializer.serialize(null,
"status");
+
+ offsetsColumnFamily = columnFamilies.get(columnFamilies.size() -
1);
+ db.put(offsetsColumnFamily, statusKey,
longSerializer.serialize(null, openState));
+ } finally {
+ if (db != null) {
+ db.close();
+ }
+ for (final ColumnFamilyHandle columnFamily : columnFamilies) {
+ columnFamily.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
private void shouldRestoreToByteStore() {
bytesStore.init(context, bytesStore);
// 0 segments initially.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
new file mode 100644
index 00000000000..d729d07f181
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSegmentsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+abstract class AbstractSegmentsTest<S extends Segments> {
+ protected S segments;
+ protected InternalMockProcessorContext<?, ?> context;
+ private final Serializer<String> stringSerializer = new StringSerializer();
+ private final Serializer<Long> longSerializer = new LongSerializer();
+
+
+ abstract S getSegments();
+
+
+ @BeforeEach
+ public void setUp() {
+ context = getProcessorContext();
+ segments = getSegments();
+ segments.openExisting(context, 0L);
+ }
+
+ private InternalMockProcessorContext<?, ?> getProcessorContext() {
+ return new InternalMockProcessorContext<>(
+ TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
+ }
+
+ private InternalMockProcessorContext<?, ?> getEOSProcessorContext() {
+ final Properties streamsProps = StreamsTestUtils.getStreamsConfig();
+ streamsProps.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
+ return new InternalMockProcessorContext<>(
+ TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics())),
+ new StreamsConfig(streamsProps));
+ }
+
+ @AfterEach
+ public void tearDown() {
+ segments.close();
+ }
+
+ @Test
+ public void shouldThrowTaskCorruptedExceptionIfSegmentIsInInvalidState()
throws Exception {
+ context = getEOSProcessorContext();
+ segments = getSegments();
+ segments.openExisting(context, 0L);
+ final Segment segment = segments.getOrCreateSegmentIfLive(0, context,
1L);
+
+ assertTrue(segment.isOpen());
+ segments.close();
+
+ simulateUncleanShutdownForAllSegments();
+ segments = getSegments();
+ final TaskCorruptedException thrown =
assertThrows(TaskCorruptedException.class, () -> segments.openExisting(context,
0L));
+ assertEquals("Tasks [0_0] are corrupted and hence need to be
re-initialized", thrown.getMessage());
+ }
+
+ private void simulateUncleanShutdownForAllSegments() throws Exception {
+ for (final File dbDir :
Objects.requireNonNull(context.stateDir().listFiles())) {
+ for (final File storeDir :
Objects.requireNonNull(dbDir.listFiles())) {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+ final Long openState = 1L;
+ final String dbPath = storeDir.getAbsolutePath();
+ final List<ColumnFamilyDescriptor> existingColumnFamilies =
RocksDB.listColumnFamilies(new Options(), dbPath).stream()
+ .map(b -> new ColumnFamilyDescriptor(b,
columnFamilyOptions))
+ .collect(Collectors.toList());
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(existingColumnFamilies.size());
+ RocksDB db = null;
+ ColumnFamilyHandle offsetsColumnFamily = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ storeDir.getAbsolutePath(),
+ existingColumnFamilies,
+ columnFamilies);
+ final byte[] statusKey = stringSerializer.serialize(null,
"status");
+
+ offsetsColumnFamily =
columnFamilies.get(columnFamilies.size() - 1);
+ db.put(offsetsColumnFamily, statusKey,
longSerializer.serialize(null, openState));
+ } finally {
+ if (db != null) {
+ db.close();
+ }
+ for (final ColumnFamilyHandle columnFamily :
columnFamilies) {
+ columnFamily.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index ee0c41802bb..4bc468d9785 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -16,16 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -43,34 +33,18 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class KeyValueSegmentsTest {
+public class KeyValueSegmentsTest extends
AbstractSegmentsTest<KeyValueSegments> {
private static final int NUM_SEGMENTS = 5;
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private static final String METRICS_SCOPE = "test-state-id";
- private InternalMockProcessorContext context;
- private KeyValueSegments segments;
- private File stateDirectory;
private final String storeName = "test";
- @BeforeEach
- public void createContext() {
- stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext<>(
- stateDirectory,
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
- segments = new KeyValueSegments(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
- segments.openExisting(context, -1L);
- }
- @AfterEach
- public void close() {
- segments.close();
+ @Override
+ KeyValueSegments getSegments() {
+ return new KeyValueSegments(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@Test
@@ -294,7 +268,7 @@ public class KeyValueSegmentsTest {
segments = new KeyValueSegments(storeName, METRICS_SCOPE,
NUM_SEGMENTS * segmentInterval, segmentInterval);
- final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
+ final String storeDirectoryPath = context.stateDir().getAbsolutePath()
+ File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
//noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
@@ -319,7 +293,7 @@ public class KeyValueSegmentsTest {
@Test
public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat()
throws Exception {
- final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
+ final String storeDirectoryPath = context.stateDir().getAbsolutePath()
+ File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
//noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
index 2fbbfe4e947..95cb3fe1fda 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java
@@ -16,20 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -43,7 +34,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class LogicalKeyValueSegmentsTest {
+public class LogicalKeyValueSegmentsTest extends
AbstractSegmentsTest<LogicalKeyValueSegments> {
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
@@ -51,32 +42,16 @@ public class LogicalKeyValueSegmentsTest {
private static final String METRICS_SCOPE = "metrics-scope";
private static final String DB_FILE_DIR = "rocksdb";
- private InternalMockProcessorContext<?, ?> context;
- private LogicalKeyValueSegments segments;
-
- @BeforeEach
- public void setUp() {
- context = new InternalMockProcessorContext<>(
- TestUtils.tempDirectory(),
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
- segments = new LogicalKeyValueSegments(
+ @Override
+ public LogicalKeyValueSegments getSegments() {
+ return new LogicalKeyValueSegments(
STORE_NAME,
DB_FILE_DIR,
RETENTION_PERIOD,
SEGMENT_INTERVAL,
new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME)
);
- segments.openExisting(context, 0L);
- }
-
- @AfterEach
- public void tearDown() {
- segments.close();
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 128534a1bb8..7734646a5c2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
@@ -273,8 +274,9 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
overwritePersistedStoreStatusToOpen();
- final ProcessorStateException stateException =
assertThrows(ProcessorStateException.class, () -> rocksDBStore.init(eosContext,
rocksDBStore));
- assertEquals("State store " + DB_NAME + " didn't find a valid state,
since under EOS it has the risk of getting uncommitted data in stores",
stateException.getMessage());
+ final TaskCorruptedException stateException =
assertThrows(TaskCorruptedException.class, () -> rocksDBStore.init(eosContext,
rocksDBStore));
+ assertEquals("Tasks [0_0] are corrupted and hence need to be
re-initialized", stateException.getMessage());
+ assertEquals("State store " + DB_NAME + " didn't find a valid state,
since under EOS it has the risk of getting uncommitted data in stores",
stateException.getCause().getMessage());
rocksDBStore.close();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
index eea7d60a16f..bf2c298b80c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
@@ -16,16 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -36,28 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class SessionSegmentsWithHeadersTest {
+public class SessionSegmentsWithHeadersTest extends
AbstractSegmentsTest<SessionSegmentsWithHeaders> {
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private static final String METRICS_SCOPE = "test-state-id";
- private InternalMockProcessorContext context;
- private SessionSegmentsWithHeaders segments;
- private File stateDirectory;
private final String storeName = "test";
- @BeforeEach
- public void createContext() {
- stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext<>(
- stateDirectory,
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
- segments = new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
- segments.openExisting(context, -1L);
+ @Override
+ public SessionSegmentsWithHeaders getSegments() {
+ return new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@AfterEach
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
index f9c608dbb93..1ccafcd0105 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java
@@ -16,16 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -43,29 +35,17 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TimestampedSegmentsTest {
+public class TimestampedSegmentsTest extends
AbstractSegmentsTest<TimestampedSegments> {
private static final int NUM_SEGMENTS = 5;
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private static final String METRICS_SCOPE = "test-state-id";
- private InternalMockProcessorContext context;
- private TimestampedSegments segments;
- private File stateDirectory;
private final String storeName = "test";
- @BeforeEach
- public void createContext() {
- stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext<>(
- stateDirectory,
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
- segments = new TimestampedSegments(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
- segments.openExisting(context, -1L);
+ @Override
+ TimestampedSegments getSegments() {
+ return new TimestampedSegments(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@AfterEach
@@ -295,7 +275,7 @@ public class TimestampedSegmentsTest {
segments = new TimestampedSegments(storeName, METRICS_SCOPE,
NUM_SEGMENTS * segmentInterval, segmentInterval);
- final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
+ final String storeDirectoryPath = context.stateDir().getAbsolutePath()
+ File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
//noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
@@ -320,7 +300,7 @@ public class TimestampedSegmentsTest {
@Test
public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat()
throws Exception {
- final String storeDirectoryPath = stateDirectory.getAbsolutePath() +
File.separator + storeName;
+ final String storeDirectoryPath = context.stateDir().getAbsolutePath()
+ File.separator + storeName;
final File storeDirectory = new File(storeDirectoryPath);
//noinspection ResultOfMethodCallIgnored
storeDirectory.mkdirs();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
index b3651cf7b25..1e1b40e7c6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -16,16 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
-import org.apache.kafka.test.InternalMockProcessorContext;
-import org.apache.kafka.test.MockRecordCollector;
-import org.apache.kafka.test.TestUtils;
-
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -36,28 +27,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TimestampedSegmentsWithHeadersTest {
-
+public class TimestampedSegmentsWithHeadersTest extends
AbstractSegmentsTest<TimestampedSegmentsWithHeaders> {
private static final long SEGMENT_INTERVAL = 100L;
private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
private static final String METRICS_SCOPE = "test-state-id";
- private InternalMockProcessorContext context;
- private TimestampedSegmentsWithHeaders segments;
- private File stateDirectory;
private final String storeName = "test";
- @BeforeEach
- public void createContext() {
- stateDirectory = TestUtils.tempDirectory();
- context = new InternalMockProcessorContext<>(
- stateDirectory,
- Serdes.String(),
- Serdes.Long(),
- new MockRecordCollector(),
- new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
- );
- segments = new TimestampedSegmentsWithHeaders(storeName,
METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
- segments.openExisting(context, -1L);
+ @Override
+ TimestampedSegmentsWithHeaders getSegments() {
+ return new TimestampedSegmentsWithHeaders(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
}
@AfterEach
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 0f4012f779a..03796f56be5 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -202,6 +202,24 @@ public class InternalMockProcessorContext<KOut, VOut>
);
}
+ public InternalMockProcessorContext(final File stateDir,
+ final Serde<?> keySerde,
+ final Serde<?> valueSerde,
+ final RecordCollector collector,
+ final ThreadCache cache,
+ final StreamsConfig config) {
+ this(
+ stateDir,
+ keySerde,
+ valueSerde,
+ new StreamsMetricsImpl(new Metrics(), "mock", new MockTime()),
+ config,
+ () -> collector,
+ cache,
+ Time.SYSTEM
+ );
+ }
+
public InternalMockProcessorContext(final File stateDir,
final Serde<?> keySerde,
final Serde<?> valueSerde,