fresh-borzoni commented on code in PR #435:
URL: https://github.com/apache/fluss-rust/pull/435#discussion_r2901616933
##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +48,77 @@
FLUSS_VERSION = "0.9.0-incubating"
BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT,
PLAIN_CLIENT_TABLET_PORT]
+
def _wait_for_port(host, port, timeout=60):
"""Wait for a TCP port to become available."""
start = time.time()
while time.time() - start < timeout:
try:
with socket.create_connection((host, port), timeout=1):
- return
+ return True
except (ConnectionRefusedError, TimeoutError, OSError):
time.sleep(1)
- raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+ return False
[email protected](scope="session")
-def fluss_cluster():
- """Start a Fluss cluster using testcontainers, or use an existing one."""
- if BOOTSTRAP_SERVERS_ENV:
- yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+ """Wait for all cluster ports to become available."""
+ deadline = time.time() + timeout
+ for port in ALL_PORTS:
+ remaining = deadline - time.time()
+ if remaining <= 0 or not _wait_for_port("localhost", port,
timeout=remaining):
+ return False
+ return True
+
+
+def _run_cmd(cmd):
+ """Run a shell command, return exit code."""
+ return subprocess.run(cmd, shell=True, capture_output=True).returncode
+
+
+def _start_cluster():
+ """Start the Fluss Docker cluster via testcontainers.
+
+ If another worker already started the cluster (detected via port check),
+ reuse it. If container creation fails (name conflict from a racing worker),
+ wait for the other worker's cluster to become ready.
+ """
+ # Reuse cluster started by another parallel worker or previous run.
+ if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):
Review Comment:
not applicable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]