iit2009060 commented on code in PR #21430:
URL: https://github.com/apache/kafka/pull/21430#discussion_r2810691129
##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -530,6 +530,152 @@ def verify_metadata_no_upgraded_yet(self,
end_of_upgrade_message):
if len(found) > 0:
raise Exception("Kafka Streams failed with 'group member
upgraded to metadata 8 too early'")
+
+
+ @cluster(num_nodes=6)
+ @matrix(from_version=[str(LATEST_3_9)],
+ metadata_quorum=[quorum.combined_kraft])
+ def test_rocksdb_format_v5_upgrade(self, from_version, metadata_quorum):
+ """
+ Tests RocksDB file format v5 → v6 upgrade compatibility (KAFKA-20096).
+
+ Background:
+ - Kafka 3.9 uses RocksDB 7.9.2 which writes format v5
+ - Kafka 4.0+ uses RocksDB 9.7.3 which defaults to format v6
+ - RocksDB 9.7.3 CAN read format v5 (backward compatible)
+
+ Test flow:
+ 1. Start 3 Kafka Streams instances with version 3.9
+ 2. Process data to create state stores in format v5
+ 3. Rolling upgrade each instance to DEV_VERSION (4.0+)
+ 4. Verify upgrade succeeds (new RocksDB reads old format)
+ 5. Verify processing continues correctly
+
+ Expected result: Upgrade succeeds without errors
+ """
+ to_version = str(DEV_VERSION)
+
+ # === SETUP PHASE ===
+ # Create Kafka broker and Streams services
+ self.set_up_services()
+
+ # Start data producer (continuously produces records to 'data' topic)
+ self.driver.start()
+ self.logger.info("Data driver started - producing test records")
+
+ # === PHASE 1: Start all nodes with OLD version (3.9) ===
+ # This creates state stores in RocksDB format v5
+ self.logger.info(f"PHASE 1: Starting all 3 Streams instances with
version {from_version}")
+ self.logger.info(f" RocksDB 7.9.2 will create state stores in
format v5")
+ self.start_all_nodes_with(from_version, {})
+
+ # Give time to process records and build state stores
+ # State stores will be written in format v5 on disk at:
+ #
/mnt/streams-upgrade-test/kafka-streams/<app-id>/0_0/rocksdb/<store-name>/
+ self.logger.info("Waiting 30 seconds for state stores to be populated
with format v5 data...")
+ time.sleep(30)
+
+ # === PHASE 2: Rolling upgrade to NEW version (4.0+) ===
+ # Each instance upgraded one-by-one to minimize downtime
+ self.logger.info(f"PHASE 2: Rolling upgrade to version {to_version}")
+ self.logger.info(f" RocksDB 9.7.3 must successfully READ
format v5 state stores")
+
+ counter = 1
+ random.seed()
+ random.shuffle(self.processors) # Randomize upgrade order
+ for p in self.processors:
+ # CRITICAL: CLEAN_NODE_ENABLED = False
+ # This keeps existing state stores intact upg(f upgraormat v5
files remain on disk)
+ # New RocksDB 9.7.3 must be able to read these v5 files
+ p.CLEAN_NODE_ENABLED = False
+
+ self.logger.info(f" Upgrading instance #{counter} to
{to_version}")
+ self.logger.info(f" - Stop instance (running {from_version})")
+ self.logger.info(f" - Start instance with {to_version}")
+ self.logger.info(f" - RocksDB 9.7.3 will open existing format
v5 state stores")
+
+ # upgrade_from=None means no metadata migration needed (no
protocol change)
+ # We're only testing RocksDB format compatibility here
+ self.do_stop_start_bounce(p, None, to_version, counter, {})
+
+ self.logger.info(f" ✓ Instance #{counter} successfully upgraded
and processing")
+ counter = counter + 1
+
+ # === PHASE 3: Verification ===
+ # If we reach here, all instances upgraded successfully
+ # This means RocksDB 9.7.3 successfully read format v5 state stores
+ self.logger.info("PHASE 3: Verification")
+ self.logger.info(" Waiting 30 seconds to verify continued
processing...")
+ time.sleep(30)
+
+ # The do_stop_start_bounce already verified that each instance
processes records
+ # (it waits for "processed * records from topic=data" in logs)
+ self.logger.info(" ✓ All instances processing records successfully")
+ self.logger.info(" ✓ RocksDB 9.7.3 successfully read format v5 state
stores")
+ self.logger.info(" ✓ Upgrade test PASSED")
+
+ # === CLEANUP ===
+ self.stop_and_await()
+ self.logger.info("Test completed successfully")
+
+ @cluster(num_nodes=6)
+ @matrix(to_version=[str(LATEST_3_9)],
+ metadata_quorum=[quorum.combined_kraft])
+ def test_rocksdb_format_v5_downgrade(self, to_version, metadata_quorum):
+ """
+ Tests RocksDB file format v6 → v5 downgrade compatibility
(KAFKA-20096).
Review Comment:
Yes, I have tested on my local , it does not work i.e downgrade from v6 to
v5 does not work.
`HUTDOWN_CLIENT. The streams client is going to shut down now.
(org.apache.kafka.streams.KafkaStreams)
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store
data-store at location /mnt/streams/StreamsUpgradeTest/0_2/rocksdb/data-store
at
org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:330)
at org.apache.kafka.st
`
Yes added test for safe downgrade and safe upgrade.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]