Copilot commented on code in PR #435:
URL: https://github.com/apache/fluss-rust/pull/435#discussion_r2901160043


##########
bindings/python/test/conftest.py:
##########
@@ -20,14 +20,25 @@
 If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster.
 Otherwise, a Fluss cluster is started automatically via testcontainers.
 
+The first pytest-xdist worker to run starts the cluster; other workers
+detect it via port check and reuse it (matching the C++ test pattern).
+Containers are cleaned up after all workers finish via pytest_unconfigure.
+
 Run with:
-    uv run maturin develop && uv run pytest test/ -v
+    uv run maturin develop && uv run pytest test/ -v -n auto
 """
 
+import asyncio
 import os
 import socket
+import subprocess
 import time
 
+# Disable testcontainers Ryuk reaper — it would kill containers when the
+# first xdist worker exits, while other workers are still running.
+# We handle cleanup ourselves in pytest_unconfigure.
+os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")

Review Comment:
   Ryuk is disabled unconditionally at import time. That changes behavior for 
non-xdist runs too and can leave orphaned containers if a local single-process 
test run is interrupted before pytest_unconfigure executes. Consider disabling 
Ryuk only when xdist is active (e.g., when PYTEST_XDIST_WORKER is set), and 
otherwise keep the default reaper behavior.
   ```suggestion
   # Disable testcontainers Ryuk reaper for xdist workers — it would kill
   # containers when the first xdist worker exits, while other workers are
   # still running. We handle cleanup ourselves in pytest_unconfigure.
   if os.environ.get("PYTEST_XDIST_WORKER"):
       os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true")
   ```



##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +48,77 @@
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
     start = time.time()
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a shell command, return exit code."""
+    return subprocess.run(cmd, shell=True, capture_output=True).returncode
+
+
+def _start_cluster():
+    """Start the Fluss Docker cluster via testcontainers.
+
+    If another worker already started the cluster (detected via port check),
+    reuse it. If container creation fails (name conflict from a racing worker),
+    wait for the other worker's cluster to become ready.
+    """
+    # Reuse cluster started by another parallel worker or previous run.
+    if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):

Review Comment:
   The reuse path returns as soon as the plaintext port is connectable, but 
other listeners (and/or TabletServer readiness) may still be initializing. 
Since some tests create their own connections without retry logic, this can 
make parallel runs flaky. Consider reusing only after _all_ports_ready() (or a 
lightweight Fluss-level readiness check) succeeds instead of checking a single 
port.
   ```suggestion
       # Only reuse once all expected ports are ready, to avoid flakiness when
       # other listeners or TabletServer are still initializing.
       if _all_ports_ready(timeout=5):
   ```



##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +48,77 @@
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
     start = time.time()
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a shell command, return exit code."""
+    return subprocess.run(cmd, shell=True, capture_output=True).returncode
+
+
+def _start_cluster():
+    """Start the Fluss Docker cluster via testcontainers.
+
+    If another worker already started the cluster (detected via port check),
+    reuse it. If container creation fails (name conflict from a racing worker),
+    wait for the other worker's cluster to become ready.
+    """
+    # Reuse cluster started by another parallel worker or previous run.
+    if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1):
+        print("Reusing existing cluster via port check.")
         return
 
     from testcontainers.core.container import DockerContainer
-    from testcontainers.core.network import Network
 
-    network = Network()
-    network.create()
+    print("Starting Fluss cluster via testcontainers...")
 
-    zookeeper = (
-        DockerContainer("zookeeper:3.9.2")
-        .with_network(network)
-        .with_name("zookeeper-python-test")
-    )
+    # Create a named network via Docker CLI (idempotent, avoids orphaned
+    # random-named networks when multiple xdist workers race).
+    _run_cmd(f"docker network create {NETWORK_NAME} 2>/dev/null || true")

Review Comment:
   The `docker network create ... 2>/dev/null || true` invocation is 
POSIX-shell specific and depends on shell redirection/`||`. If these tests are 
expected to run on non-POSIX shells (e.g., Windows), this will fail. Consider 
performing the idempotent network creation using `subprocess.run([...])` and 
handling the 'already exists' error code/message explicitly, without shell 
operators.



##########
bindings/python/test/conftest.py:
##########
@@ -37,47 +48,77 @@
 FLUSS_VERSION = "0.9.0-incubating"
 BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS")
 
+# Container / network names
+NETWORK_NAME = "fluss-python-test-network"
+ZOOKEEPER_NAME = "zookeeper-python-test"
+COORDINATOR_NAME = "coordinator-server-python-test"
+TABLET_SERVER_NAME = "tablet-server-python-test"
+
+# Fixed host ports (must match across workers)
+COORDINATOR_PORT = 9123
+TABLET_SERVER_PORT = 9124
+PLAIN_CLIENT_PORT = 9223
+PLAIN_CLIENT_TABLET_PORT = 9224
+
+ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, 
PLAIN_CLIENT_TABLET_PORT]
+
 
 def _wait_for_port(host, port, timeout=60):
     """Wait for a TCP port to become available."""
     start = time.time()
     while time.time() - start < timeout:
         try:
             with socket.create_connection((host, port), timeout=1):
-                return
+                return True
         except (ConnectionRefusedError, TimeoutError, OSError):
             time.sleep(1)
-    raise TimeoutError(f"Port {port} on {host} not available after {timeout}s")
+    return False
 
 
[email protected](scope="session")
-def fluss_cluster():
-    """Start a Fluss cluster using testcontainers, or use an existing one."""
-    if BOOTSTRAP_SERVERS_ENV:
-        yield (BOOTSTRAP_SERVERS_ENV, BOOTSTRAP_SERVERS_ENV)
+def _all_ports_ready(timeout=60):
+    """Wait for all cluster ports to become available."""
+    deadline = time.time() + timeout
+    for port in ALL_PORTS:
+        remaining = deadline - time.time()
+        if remaining <= 0 or not _wait_for_port("localhost", port, 
timeout=remaining):
+            return False
+    return True
+
+
+def _run_cmd(cmd):
+    """Run a shell command, return exit code."""
+    return subprocess.run(cmd, shell=True, capture_output=True).returncode
+
+

Review Comment:
   _run_cmd uses shell=True and discards stdout/stderr. Even though the current 
command is built from constants, shell=True plus suppressed output makes 
failures harder to diagnose and relies on shell semantics. Prefer running 
docker via an argv list (no shell) and include stderr in the error path (or 
raise when the command fails).
   ```suggestion
       """Run a command, return exit code.
   
       The command is executed without a shell. On failure, stderr is printed
       to aid debugging while preserving the return-code-based API.
       """
       # Support both string and list commands without relying on shell parsing.
       if isinstance(cmd, str):
           # Simple whitespace split is sufficient for constant commands used 
here.
           cmd_to_run = cmd.split()
       else:
           cmd_to_run = cmd
   
       result = subprocess.run(cmd_to_run, shell=False, capture_output=True, 
text=True)
       if result.returncode != 0:
           print(f"Command {cmd!r} failed with exit code {result.returncode}")
           if result.stderr:
               print(result.stderr)
       return result.returncode
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to