[
https://issues.apache.org/jira/browse/KAFKA-20096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18057151#comment-18057151
]
Arpit Goyal edited comment on KAFKA-20096 at 2/8/26 12:39 PM:
--------------------------------------------------------------
[~mjsax] While testing, I figured out we never create rocksdb store because of
the topology we defined in StreamsUpgradeTest.
{code:java}
final KTable<String, Integer> dataTable = builder.table(
"data",
Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
When you create a KTable without explicit materialization , Streams optimiser
sees "This KTable is never actually queried, just passed through to a stream" →
No state store created!
After adding explicit materialization
{code:java}
final KTable<String, Integer> dataTable = builder.table(
"data",
Consumed.with(stringSerde, intSerde),
org.apache.kafka.streams.kstream.Materialized.as("data-store"));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
I am able to see rocksdb store created within the
/mnt/streams/StreamUpgradeTest path.
I wrote two test:
# Upgrade path -[3.9 to 4.1]
Worked successfully.
{code:java}
[2026-02-08 12:27:33,351] DEBUG stream-thread
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] Main
Consumer poll completed in 100 ms and fetched 0 records from partitions []
(org.apache.kafka.streams.processor.internals.StreamThread)
[2026-02-08 12:27:33,352] DEBUG stream-thread
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] task
[0_0] Acquired state directory lock
(org.apache.kafka.streams.processor.internals.StreamTask)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName [name=put-rate,
group=stream-state-metrics, description=The average number of calls to put per
second,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-latency-avg, group=stream-state-metrics, description=The average
latency of calls to put,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-latency-max, group=stream-state-metrics, description=The maximum
latency of calls to put,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-if-absent-rate, group=stream-state-metrics, description=The average
number of calls to put-if-absent per second,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
{code}
2. Downgrade Path -[4.1 to 3.9]
Failed error
{code:java}
[2026-02-08 12:08:04,078] DEBUG stream-thread
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1]
standby-task [0_2] Acquired state directory lock
(org.apache.kafka.streams.processor.internals.StandbyTask)
[2026-02-08 12:08:04,203] ERROR stream-thread
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] Get
exceptions for the following tasks:
{0_2=org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store data-store at location
/mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store}
(org.apache.kafka.streams.processor.internals.TaskManager)
[2026-02-08 12:08:04,204] ERROR stream-client
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061] Encountered the
following exception during processing and the registered exception handler
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
(org.apache.kafka.streams.KafkaStreams)
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
data-store at location /mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
at
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:70)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:255)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:176)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:114)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:159)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:159)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:234)
at
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1016)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:1001)
at
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:915)
{code}
was (Author: JIRAUSER301926):
[~mjsax] While testing, I figured out we never create rocksdb store because of
the topology we defined in StreamsUpgradeTest.
{code:java}
final KTable<String, Integer> dataTable = builder.table(
"data",
Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
When you create a KTable without explicit materialization , Streams optimiser
sees "This KTable is never actually queried, just passed through to a stream" →
No state store created!
After adding explicit materialization
{code:java}
final KTable<String, Integer> dataTable = builder.table(
"data",
Consumed.with(stringSerde, intSerde),
org.apache.kafka.streams.kstream.Materialized.as("data-store"));
final KStream<String, Integer> dataStream = dataTable.toStream(); {code}
I am able to see rocksdb store created within the
/mnt/streams/StreamUpgradeTest path.
I wrote two test:
# Upgrade path -[3.9 to 4.1]
Worked successfully.
{code:java}
[2026-02-08 12:27:33,351] DEBUG stream-thread
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] Main
Consumer poll completed in 100 ms and fetched 0 records from partitions []
(org.apache.kafka.streams.processor.internals.StreamThread)
[2026-02-08 12:27:33,352] DEBUG stream-thread
[StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1] task
[0_0] Acquired state directory lock
(org.apache.kafka.streams.processor.internals.StreamTask)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName [name=put-rate,
group=stream-state-metrics, description=The average number of calls to put per
second,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-latency-avg, group=stream-state-metrics, description=The average
latency of calls to put,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-latency-max, group=stream-state-metrics, description=The maximum
latency of calls to put,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
[2026-02-08 12:27:33,352] DEBUG Registering metric MetricName
[name=put-if-absent-rate, group=stream-state-metrics, description=The average
number of calls to put-if-absent per second,
tags={thread-id=StreamsUpgradeTest-92a9095f-3b02-4c06-9fb9-32dae466aab6-StreamThread-1,
task-id=0_0, rocksdb-state-id=data-store}]
(org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter)
{code}
2. Downgrade Path -[4.1 to 3.9]
Failed error
{code:java}
[2026-02-08 12:08:04,078] DEBUG stream-thread
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1]
standby-task [0_2] Acquired state directory lock
(org.apache.kafka.streams.processor.internals.StandbyTask)
[2026-02-08 12:08:04,203] ERROR stream-thread
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061-StreamThread-1] Get
exceptions for the following tasks:
{0_2=org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store data-store at location
/mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store}
(org.apache.kafka.streams.processor.internals.TaskManager)
[2026-02-08 12:08:04,204] ERROR stream-client
[StreamsUpgradeTest-3f7c5d37-8227-43f8-a0c6-5e9090529061] Encountered the
following exception during processing and the registered exception handler
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
(org.apache.kafka.streams.KafkaStreams)
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
data-store at location /mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
at
org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:70)
at
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:255)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:176)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:114)
at
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:159)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:159)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:234)
at
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103)
at
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1016)
at
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:1001)
at
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:915)
{code}
> RocksDB compatibility not documented
> ------------------------------------
>
> Key: KAFKA-20096
> URL: https://issues.apache.org/jira/browse/KAFKA-20096
> Project: Kafka
> Issue Type: Improvement
> Components: docs, streams, system tests
> Affects Versions: 4.0.0
> Reporter: Matthias J. Sax
> Assignee: Arpit Goyal
> Priority: Critical
>
> With 4.0 release, we upgraded RocksDB from 7.9 to 9.7
> (https://issues.apache.org/jira/browse/KAFKA-15443) – however, we did miss
> that RocksDB introduces a file format version bump from version 5 to version
> 6 with RocksDB 8.6.
> While this does not impact the upgrade path, it does impact the downgrade
> path. This limitation is no documented though. – We should also investigate
> `RocksDBConfigSetter` which seems to allow to configure the used file-format
> version via `tableconfig.setFormVersion`.
> We should also double check system test coverage for upgrade/downgrade path
> with RocksDB – ideally we should catch such issue; seems there is some
> testing gap.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)