Bill Bejeck created KAFKA-20396:
-----------------------------------
Summary: After unclean shutdown Kafka Streams fails to start
Key: KAFKA-20396
URL: https://issues.apache.org/jira/browse/KAFKA-20396
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 4.3.0
Reporter: Bill Bejeck
Assignee: Eduwer Camacaro
Fix For: 4.3.0
Related to KIP-1035
After an unclean shutdown the Kafka Streams instance fails to start and doesn't
properly handle the invalid state from the state store. For RocksDBStores, we
are now saving the state of a state store in a ColumnFamily, but during an
unclean shutdown the store stays open, and during initialization we throw an
exception if EOS is enabled.
It looks like this is a regression since it stops the whole instance and
doesn't start again.
{code:text}
10:38:18 WARN [STREAMS] ProcessorStateManager - stream-client
[my-cluster-1-core] State store core-store did not find checkpoint offsets
while stores are not empty, since under EOS it has the risk of getting
uncommitted data in stores we have to treat it as a task corruption error and
wipe out the local state of task 1_9 before re-bootstrapping
10:38:18 WARN [STREAMS] StateDirectory - Failed to register startup state
stores for task 1_9: Tasks [1_9] are corrupted and hence need to be
re-initialized
Exception in thread "main"
org.apache.kafka.streams.errors.ProcessorStateException: State store core-store
didn't find a valid state, since under EOS it has the risk of getting
uncommitted data in stores
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:261)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:170)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:77)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:48)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:77)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:144)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:263)
at
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:144)
at
org.apache.kafka.streams.processor.internals.StateDirectory.initializeStartupStores(StateDirectory.java:265)
at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:1397)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)