This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6fe8415a163 KAFKA-9195: Use Randomized State Directory Names in
Streams System Tests (#21879)
6fe8415a163 is described below
commit 6fe8415a163b50efe84607a938b7eaaac92dcc6b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sat Mar 28 00:55:16 2026 +0100
KAFKA-9195: Use Randomized State Directory Names in Streams System Tests
(#21879)
This PR fixes Streams system test state directory isolation
Currently, Streams system tests set state.dir to PERSISTENT_ROOT
(/mnt/streams). Since the state directory path is state.dir +
application.id + task.id, tests with the same application ID can
interfere with each other if state directories aren't properly cleaned
up, leading to flaky failures.
This PR sets state.dir to a randomized path inside PERSISTENT_ROOT
(/mnt/streams/<uuid>) to ensure test isolation even when state cleanup
is missed.
Reviewers: Matthias J. Sax <[email protected]>
---
tests/kafkatest/services/streams.py | 19 +++++++++++--------
1 file changed, 11 insertions(+), 8 deletions(-)
diff --git a/tests/kafkatest/services/streams.py
b/tests/kafkatest/services/streams.py
index 4b49017fc42..49ba32ad8e1 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -15,6 +15,7 @@
import os.path
import signal
+import uuid
from . import streams_property
from . import consumer_property
from ducktape.services.service import Service
@@ -216,6 +217,8 @@ class StreamsTestBaseService(KafkaPathResolverMixin,
JmxMixin, Service):
'user_test_args3': user_test_args3,
'user_test_args4': user_test_args4}
self.log_level = "DEBUG"
+ # Create a randomized state directory path to prevent test interference
+ self.state_dir = os.path.join(self.PERSISTENT_ROOT, str(uuid.uuid4()))
@property
def node(self):
@@ -299,7 +302,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin,
JmxMixin, Service):
return cmd
def prop_file(self):
- cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
+ cfg = KafkaConfig(**{streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers()})
return cfg.render()
def start_node(self, node):
@@ -339,7 +342,7 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
self.UPGRADE_FROM = upgrade_from
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE:
self.PROCESSING_GUARANTEE,
streams_property.GROUP_PROTOCOL: self.GROUP_PROTOCOL,
@@ -427,7 +430,7 @@ class
StreamsBrokerCompatibilityService(StreamsTestBaseService):
processingMode)
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
# the old broker (< 2.4) does not support configuration
replication.factor=-1
"replication.factor": 1,
@@ -527,7 +530,7 @@ class
StreamsOptimizedUpgradeTestService(StreamsTestBaseService):
self.JOIN_TOPIC = None
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
'topology.optimization': self.OPTIMIZED_CONFIG,
'input.topic': self.INPUT_TOPIC,
@@ -567,7 +570,7 @@ class
StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
def prop_file(self):
properties = self.extra_properties.copy()
- properties[streams_property.STATE_DIR] = self.PERSISTENT_ROOT
+ properties[streams_property.STATE_DIR] = self.state_dir
properties[streams_property.KAFKA_SERVERS] =
self.kafka.bootstrap_servers()
if self.UPGRADE_FROM is not None:
@@ -614,7 +617,7 @@ class
StreamsNamedRepartitionTopicService(StreamsTestBaseService):
self.AGGREGATION_TOPIC = None
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
'input.topic': self.INPUT_TOPIC,
'aggregation.topic': self.AGGREGATION_TOPIC,
@@ -638,7 +641,7 @@ class StaticMemberTestService(StreamsTestBaseService):
self.GROUP_INSTANCE_ID = group_instance_id
self.NUM_THREADS = num_threads
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
streams_property.NUM_THREADS: self.NUM_THREADS,
consumer_property.GROUP_INSTANCE_ID:
self.GROUP_INSTANCE_ID,
@@ -704,7 +707,7 @@ class
CooperativeRebalanceUpgradeService(StreamsTestBaseService):
return cmd
def prop_file(self):
- properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+ properties = {streams_property.STATE_DIR: self.state_dir,
streams_property.KAFKA_SERVERS:
self.kafka.bootstrap_servers(),
'source.topic': self.SOURCE_TOPIC,
'sink.topic': self.SINK_TOPIC,