This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fe8415a163 KAFKA-9195: Use Randomized State Directory Names in 
Streams System Tests (#21879)
6fe8415a163 is described below

commit 6fe8415a163b50efe84607a938b7eaaac92dcc6b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sat Mar 28 00:55:16 2026 +0100

    KAFKA-9195: Use Randomized State Directory Names in Streams System Tests 
(#21879)
    
    This PR fixes Streams system test state directory isolation
    
    Currently, Streams system tests set state.dir to PERSISTENT_ROOT
    (/mnt/streams). Since the state directory path is state.dir +
    application.id + task.id, tests with the same application ID can
    interfere with each other if state directories aren't properly cleaned
    up, leading to flaky failures.
    
    This PR sets state.dir to a randomized path inside PERSISTENT_ROOT
    (/mnt/streams/<uuid>) to ensure test isolation even when state cleanup
    is missed.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 tests/kafkatest/services/streams.py | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)

diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 4b49017fc42..49ba32ad8e1 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -15,6 +15,7 @@
 
 import os.path
 import signal
+import uuid
 from . import streams_property
 from . import consumer_property
 from ducktape.services.service import Service
@@ -216,6 +217,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
JmxMixin, Service):
                      'user_test_args3': user_test_args3,
                      'user_test_args4': user_test_args4}
         self.log_level = "DEBUG"
+        # Create a randomized state directory path to prevent test interference
+        self.state_dir = os.path.join(self.PERSISTENT_ROOT, str(uuid.uuid4()))
 
     @property
     def node(self):
@@ -299,7 +302,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
JmxMixin, Service):
         return cmd
 
     def prop_file(self):
-        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT, 
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
+        cfg = KafkaConfig(**{streams_property.STATE_DIR: self.state_dir, 
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
         return cfg.render()
 
     def start_node(self, node):
@@ -339,7 +342,7 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
         self.UPGRADE_FROM = upgrade_from
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       streams_property.PROCESSING_GUARANTEE: 
self.PROCESSING_GUARANTEE,
                       streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL,
@@ -427,7 +430,7 @@ class 
StreamsBrokerCompatibilityService(StreamsTestBaseService):
                                                                 processingMode)
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       # the old broker (< 2.4) does not support configuration 
replication.factor=-1
                       "replication.factor": 1,
@@ -527,7 +530,7 @@ class 
StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
         self.JOIN_TOPIC = None
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       'topology.optimization': self.OPTIMIZED_CONFIG,
                       'input.topic': self.INPUT_TOPIC,
@@ -567,7 +570,7 @@ class 
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
 
     def prop_file(self):
         properties = self.extra_properties.copy()
-        properties[streams_property.STATE_DIR] = self.PERSISTENT_ROOT
+        properties[streams_property.STATE_DIR] = self.state_dir
         properties[streams_property.KAFKA_SERVERS] = 
self.kafka.bootstrap_servers()
 
         if self.UPGRADE_FROM is not None:
@@ -614,7 +617,7 @@ class 
StreamsNamedRepartitionTopicService(StreamsTestBaseService):
         self.AGGREGATION_TOPIC = None
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       'input.topic': self.INPUT_TOPIC,
                       'aggregation.topic': self.AGGREGATION_TOPIC,
@@ -638,7 +641,7 @@ class StaticMemberTestService(StreamsTestBaseService):
         self.GROUP_INSTANCE_ID = group_instance_id
         self.NUM_THREADS = num_threads
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       streams_property.NUM_THREADS: self.NUM_THREADS,
                       consumer_property.GROUP_INSTANCE_ID: 
self.GROUP_INSTANCE_ID,
@@ -704,7 +707,7 @@ class 
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
         return cmd
 
     def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+        properties = {streams_property.STATE_DIR: self.state_dir,
                       streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
                       'source.topic': self.SOURCE_TOPIC,
                       'sink.topic': self.SINK_TOPIC,

Reply via email to