This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 80542aaaab test(amber): fix ConcurrentModificationException flake in 
RegionExecutionCoordinatorSpec (#5562)
80542aaaab is described below

commit 80542aaaab476b675b10dbd54787c75982913b91
Author: Kunwoo (Chris) <[email protected]>
AuthorDate: Thu Jun 11 22:17:54 2026 -0700

    test(amber): fix ConcurrentModificationException flake in 
RegionExecutionCoordinatorSpec (#5562)
    
    ### What changes were proposed in this PR?
    
    `RegionExecutionCoordinatorSpec`'s *"retry EndWorker failures…"* test
    polled the `ControllerRpcProbe.calls` buffer from the test thread
    (`waitUntil(endWorkerCalls.size >= 2)`) while the coordinator's 200 ms
    `EndWorker` retry appended to it from the kill-retry timer thread. That
    read racing an append tripped Scala 2.13's `MutationTracker` and
    surfaced as a non-deterministic
    `java.util.ConcurrentModificationException`.
    
    The `calls` buffer is test-only — production has no such buffer and
    never reads it — so the race is a property of the test, not the source.
    Rather than make the test helper thread-safe, this fixes the test: it
    waits on a `CountDownLatch` (counted down from the probe callback once
    the retry's `EndWorker` is recorded) instead of polling, so the test
    thread never iterates the buffer while the timer thread appends. The
    real timer-thread retry still runs, so the production path is exercised
    faithfully — the accesses are just ordered (append → latch → read)
    instead of overlapping. No production code is changed;
    `ControllerRpcProbe` keeps its plain `ArrayBuffer`.
    
    ### Any related issues, documentation, discussions?
    
    Resolves #5546
    
    ### How was this PR tested?
    
    `RegionExecutionCoordinatorSpec` + `WorkflowExecutionCoordinatorSpec` →
    10/10 pass. The retry test is race-free by construction: its only reads
    of the call buffer happen after the latch `await` returns — i.e. after
    the timer thread has finished appending — so no read can overlap an
    append.
    
    ```
    sbt 'WorkflowExecutionService/testOnly 
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec'
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Anthropic Claude Opus 4.8)
---
 .../scheduling/RegionExecutionCoordinatorSpec.scala          | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
index 6efbe5e4ca..9e6cb227e5 100644
--- 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinatorSpec.scala
@@ -35,6 +35,7 @@ import 
org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.flatspec.AnyFlatSpecLike
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.atomic
 
 /**
@@ -84,11 +85,17 @@ class RegionExecutionCoordinatorSpec
 
   it should "retry EndWorker failures and delay gracefulStop until a retry 
succeeds" in {
     val attempts = new atomic.AtomicInteger(0)
+    // The first EndWorker is sent on the test thread; the retry is sent later 
from the coordinator's
+    // kill-retry timer thread. Block on this latch — counted down from the 
probe callback once the
+    // retry's call has been recorded — instead of polling `endWorkerCalls` 
from the test thread, so
+    // the test never iterates the call buffer while the timer thread is 
appending to it.
+    val retryAttempted = new CountDownLatch(1)
     val fixture = createSingleRegionFixture(endWorkerResponse =
       _ =>
         if (attempts.incrementAndGet() == 1) {
           Some(transientEndWorkerFailure)
         } else {
+          retryAttempted.countDown()
           None
         }
     )
@@ -96,7 +103,10 @@ class RegionExecutionCoordinatorSpec
     launchRegion(fixture.coordinator)
     val completion = requestRegionCompletion(fixture.coordinator)
 
-    waitUntil(fixture.rpcProbe.endWorkerCalls.size >= 2)
+    assert(
+      retryAttempted.await(testTimeout.inMilliseconds, TimeUnit.MILLISECONDS),
+      "EndWorker was not retried within the deadline"
+    )
     assert(completion.poll.isEmpty)
     assert(!fixture.coordinator.isCompleted)
     assert(fixture.actorRefService.hasActorRef(fixture.workerId))

Reply via email to