mjsax commented on code in PR #21430:
URL: https://github.com/apache/kafka/pull/21430#discussion_r2785966889


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -530,6 +530,152 @@ def verify_metadata_no_upgraded_yet(self, 
end_of_upgrade_message):
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member 
upgraded to metadata 8 too early'")
 
+
+
+    @cluster(num_nodes=6)
+    @matrix(from_version=[str(LATEST_3_9)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_rocksdb_format_v5_upgrade(self, from_version, metadata_quorum):
+        """
+        Tests RocksDB file format v5 → v6 upgrade compatibility (KAFKA-20096).

Review Comment:
   Not sure if we need this test? We do have existing upgrade tests, which do 
test this implicitly.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -530,6 +530,152 @@ def verify_metadata_no_upgraded_yet(self, 
end_of_upgrade_message):
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member 
upgraded to metadata 8 too early'")
 
+
+
+    @cluster(num_nodes=6)
+    @matrix(from_version=[str(LATEST_3_9)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_rocksdb_format_v5_upgrade(self, from_version, metadata_quorum):
+        """
+        Tests RocksDB file format v5 → v6 upgrade compatibility (KAFKA-20096).
+        
+        Background:
+        - Kafka 3.9 uses RocksDB 7.9.2 which writes format v5
+        - Kafka 4.0+ uses RocksDB 9.7.3 which defaults to format v6
+        - RocksDB 9.7.3 CAN read format v5 (backward compatible)
+        
+        Test flow:
+        1. Start 3 Kafka Streams instances with version 3.9
+        2. Process data to create state stores in format v5
+        3. Rolling upgrade each instance to DEV_VERSION (4.0+)
+        4. Verify upgrade succeeds (new RocksDB reads old format)
+        5. Verify processing continues correctly
+        
+        Expected result: Upgrade succeeds without errors
+        """
+        to_version = str(DEV_VERSION)
+        
+        # === SETUP PHASE ===
+        # Create Kafka broker and Streams services
+        self.set_up_services()
+        
+        # Start data producer (continuously produces records to 'data' topic)
+        self.driver.start()
+        self.logger.info("Data driver started - producing test records")
+        
+        # === PHASE 1: Start all nodes with OLD version (3.9) ===
+        # This creates state stores in RocksDB format v5
+        self.logger.info(f"PHASE 1: Starting all 3 Streams instances with 
version {from_version}")
+        self.logger.info(f"         RocksDB 7.9.2 will create state stores in 
format v5")
+        self.start_all_nodes_with(from_version, {})
+        
+        # Give time to process records and build state stores
+        # State stores will be written in format v5 on disk at:
+        # 
/mnt/streams-upgrade-test/kafka-streams/<app-id>/0_0/rocksdb/<store-name>/
+        self.logger.info("Waiting 30 seconds for state stores to be populated 
with format v5 data...")
+        time.sleep(30)
+        
+        # === PHASE 2: Rolling upgrade to NEW version (4.0+) ===
+        # Each instance upgraded one-by-one to minimize downtime
+        self.logger.info(f"PHASE 2: Rolling upgrade to version {to_version}")
+        self.logger.info(f"         RocksDB 9.7.3 must successfully READ 
format v5 state stores")
+        
+        counter = 1
+        random.seed()
+        random.shuffle(self.processors)  # Randomize upgrade order
+        for p in self.processors:
+            # CRITICAL: CLEAN_NODE_ENABLED = False
+            # This keeps existing state stores intact  upg(f upgraormat v5 
files remain on disk)
+            # New RocksDB 9.7.3 must be able to read these v5 files
+            p.CLEAN_NODE_ENABLED = False
+            
+            self.logger.info(f"  Upgrading instance #{counter} to 
{to_version}")
+            self.logger.info(f"    - Stop instance (running {from_version})")
+            self.logger.info(f"    - Start instance with {to_version}")
+            self.logger.info(f"    - RocksDB 9.7.3 will open existing format 
v5 state stores")
+            
+            # upgrade_from=None means no metadata migration needed (no 
protocol change)
+            # We're only testing RocksDB format compatibility here
+            self.do_stop_start_bounce(p, None, to_version, counter, {})
+            
+            self.logger.info(f"    ✓ Instance #{counter} successfully upgraded 
and processing")
+            counter = counter + 1
+        
+        # === PHASE 3: Verification ===
+        # If we reach here, all instances upgraded successfully
+        # This means RocksDB 9.7.3 successfully read format v5 state stores
+        self.logger.info("PHASE 3: Verification")
+        self.logger.info("  Waiting 30 seconds to verify continued 
processing...")
+        time.sleep(30)
+        
+        # The do_stop_start_bounce already verified that each instance 
processes records
+        # (it waits for "processed * records from topic=data" in logs)
+        self.logger.info("  ✓ All instances processing records successfully")
+        self.logger.info("  ✓ RocksDB 9.7.3 successfully read format v5 state 
stores")
+        self.logger.info("  ✓ Upgrade test PASSED")
+        
+        # === CLEANUP ===
+        self.stop_and_await()
+        self.logger.info("Test completed successfully")
+
+    @cluster(num_nodes=6)
+    @matrix(to_version=[str(LATEST_3_9)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_rocksdb_format_v5_downgrade(self, to_version, metadata_quorum):
+        """
+        Tests RocksDB file format v6 → v5 downgrade compatibility 
(KAFKA-20096).

Review Comment:
   Not sure if we can really downgrade from v5 to v6.
   
   The idea of the test would be, to run older KS should only understand v5, do 
an upgrade to latest KS but configure KS to _not_ use v6, but stay on v5. And 
finally, downgrade to the older KS version again.
   
   Because we did configure latest KS to stay on v5, the downgrade should work 
now.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -530,6 +530,152 @@ def verify_metadata_no_upgraded_yet(self, 
end_of_upgrade_message):
             if len(found) > 0:
                 raise Exception("Kafka Streams failed with 'group member 
upgraded to metadata 8 too early'")
 
+
+
+    @cluster(num_nodes=6)
+    @matrix(from_version=[str(LATEST_3_9)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_rocksdb_format_v5_upgrade(self, from_version, metadata_quorum):
+        """
+        Tests RocksDB file format v5 → v6 upgrade compatibility (KAFKA-20096).
+        
+        Background:
+        - Kafka 3.9 uses RocksDB 7.9.2 which writes format v5
+        - Kafka 4.0+ uses RocksDB 9.7.3 which defaults to format v6
+        - RocksDB 9.7.3 CAN read format v5 (backward compatible)
+        
+        Test flow:
+        1. Start 3 Kafka Streams instances with version 3.9
+        2. Process data to create state stores in format v5
+        3. Rolling upgrade each instance to DEV_VERSION (4.0+)
+        4. Verify upgrade succeeds (new RocksDB reads old format)
+        5. Verify processing continues correctly
+        
+        Expected result: Upgrade succeeds without errors
+        """
+        to_version = str(DEV_VERSION)
+        
+        # === SETUP PHASE ===
+        # Create Kafka broker and Streams services
+        self.set_up_services()
+        
+        # Start data producer (continuously produces records to 'data' topic)
+        self.driver.start()
+        self.logger.info("Data driver started - producing test records")
+        
+        # === PHASE 1: Start all nodes with OLD version (3.9) ===
+        # This creates state stores in RocksDB format v5
+        self.logger.info(f"PHASE 1: Starting all 3 Streams instances with 
version {from_version}")
+        self.logger.info(f"         RocksDB 7.9.2 will create state stores in 
format v5")
+        self.start_all_nodes_with(from_version, {})
+        
+        # Give time to process records and build state stores
+        # State stores will be written in format v5 on disk at:
+        # 
/mnt/streams-upgrade-test/kafka-streams/<app-id>/0_0/rocksdb/<store-name>/
+        self.logger.info("Waiting 30 seconds for state stores to be populated 
with format v5 data...")
+        time.sleep(30)
+        
+        # === PHASE 2: Rolling upgrade to NEW version (4.0+) ===
+        # Each instance upgraded one-by-one to minimize downtime
+        self.logger.info(f"PHASE 2: Rolling upgrade to version {to_version}")
+        self.logger.info(f"         RocksDB 9.7.3 must successfully READ 
format v5 state stores")
+        
+        counter = 1
+        random.seed()
+        random.shuffle(self.processors)  # Randomize upgrade order
+        for p in self.processors:
+            # CRITICAL: CLEAN_NODE_ENABLED = False
+            # This keeps existing state stores intact  upg(f upgraormat v5 
files remain on disk)
+            # New RocksDB 9.7.3 must be able to read these v5 files
+            p.CLEAN_NODE_ENABLED = False
+            
+            self.logger.info(f"  Upgrading instance #{counter} to 
{to_version}")
+            self.logger.info(f"    - Stop instance (running {from_version})")
+            self.logger.info(f"    - Start instance with {to_version}")
+            self.logger.info(f"    - RocksDB 9.7.3 will open existing format 
v5 state stores")
+            
+            # upgrade_from=None means no metadata migration needed (no 
protocol change)
+            # We're only testing RocksDB format compatibility here
+            self.do_stop_start_bounce(p, None, to_version, counter, {})
+            
+            self.logger.info(f"    ✓ Instance #{counter} successfully upgraded 
and processing")
+            counter = counter + 1
+        
+        # === PHASE 3: Verification ===
+        # If we reach here, all instances upgraded successfully
+        # This means RocksDB 9.7.3 successfully read format v5 state stores
+        self.logger.info("PHASE 3: Verification")
+        self.logger.info("  Waiting 30 seconds to verify continued 
processing...")
+        time.sleep(30)
+        
+        # The do_stop_start_bounce already verified that each instance 
processes records
+        # (it waits for "processed * records from topic=data" in logs)
+        self.logger.info("  ✓ All instances processing records successfully")
+        self.logger.info("  ✓ RocksDB 9.7.3 successfully read format v5 state 
stores")
+        self.logger.info("  ✓ Upgrade test PASSED")
+        
+        # === CLEANUP ===
+        self.stop_and_await()
+        self.logger.info("Test completed successfully")
+
+    @cluster(num_nodes=6)
+    @matrix(to_version=[str(LATEST_3_9)],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_rocksdb_format_v5_downgrade(self, to_version, metadata_quorum):
+        """
+        Tests RocksDB file format v6 → v5 downgrade compatibility 
(KAFKA-20096).

Review Comment:
   We should actually add one more test. Start with old version of KS (and thus 
old file format), upgrade to new KS version (staying on old file format), do 
another Rolling bounce migrating to the new file format.
   
   This test would mimic the actual upgrade path users would take (if they want 
to play safe, before migrating to the new file format).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to