Bill Bejeck created KAFKA-20396:
-----------------------------------

             Summary: After unclean shutdown Kafka Streams fails to start
                 Key: KAFKA-20396
                 URL: https://issues.apache.org/jira/browse/KAFKA-20396
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 4.3.0
            Reporter: Bill Bejeck
            Assignee: Eduwer Camacaro
             Fix For: 4.3.0


Related to KIP-1035

After an unclean shutdown the Kafka Streams instance fails to start and doesn't 
properly handle the invalid state from the state store. For RocksDBStores, we 
are now saving the state of a state store in a ColumnFamily, but during an 
unclean shutdown the store stays open, and during initialization we throw an 
exception if EOS is enabled.

It looks like this is a regression since it stops the whole instance and 
doesn't start again.


{code:text}
10:38:18 WARN  [STREAMS] ProcessorStateManager - stream-client 
[my-cluster-1-core] State store core-store did not find checkpoint offsets 
while stores are not empty, since under EOS it has the risk of getting 
uncommitted data in stores we have to treat it as a task corruption error and 
wipe out the local state of task 1_9 before re-bootstrapping
10:38:18 WARN  [STREAMS] StateDirectory - Failed to register startup state 
stores for task 1_9: Tasks [1_9] are corrupted and hence need to be 
re-initialized
Exception in thread "main" 
org.apache.kafka.streams.errors.ProcessorStateException: State store core-store 
didn't find a valid state, since under EOS it has the risk of getting 
uncommitted data in stores
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:261)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:170)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:77)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:48)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:77)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:144)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:263)
        at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:144)
        at 
org.apache.kafka.streams.processor.internals.StateDirectory.initializeStartupStores(StateDirectory.java:265)
        at org.apache.kafka.streams.KafkaStreams.start(KafkaStreams.java:1397)

{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to