Liu created FLINK-39387:
---------------------------

             Summary: Fix flaky scheduler benchmark tests caused by thread 
assertion failure in async TDD creation
                 Key: FLINK-39387
                 URL: https://issues.apache.org/jira/browse/FLINK-39387
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
            Reporter: Liu


h2. Summary

Fix flaky scheduler benchmark tests caused by asynchronous TDD creation
h3. Problem

After 
[FLINK-38114|https://issues.apache.org/jira/browse/FLINK-38114(https://issues.apache.org/jira/browse/FLINK-38114)]
 introduced asynchronous TDD (TaskDeploymentDescriptor) creation in 
{{{}Execution.deploy(){}}}, all 21 scheduler benchmark tests (e.g., 
{{{}PartitionReleaseInBatchJobBenchmarkTest{}}}) become flaky with the 
following error:
{code:java|title=Error Message}
IllegalStateException: Cannot leave terminal state CANCELED to transition to 
SCHEDULED.
{code}
h3. Root Cause

{{FLINK-38114}} changed {{Execution.deploy()}} to use an asynchronous flow:
{code:java}
CompletableFuture.supplyAsync(initOffloadedTaskRestoreRef(...), executor)   // 
step 1: runs on scheduledExecutorService
    .thenComposeAsync(tryGetTaskDeploymentDescriptorForSlot(slot), 
jobMasterMainThreadExecutor)  // step 2: dispatched to mainThreadExecutor
{code}
{{SchedulerBenchmarkUtils.createAndInitScheduler()}} uses 
{{ComponentMainThreadExecutorServiceAdapter.forMainThread()}} as the 
{{{}mainThreadExecutor{}}}. Internally, it wraps a 
{{DirectScheduledExecutorService}} whose {{execute()}} method runs the command 
*on the calling thread* and includes an assertion:
{code:java}
assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
{code}
When step 1 completes on the {{scheduledExecutorService}} thread, 
{{thenComposeAsync}} dispatches the step 2 callback to 
{{{}jobMasterMainThreadExecutor.execute(callback){}}}. Since 
{{DirectScheduledExecutorService.execute()}} runs inline on the calling thread 
(which is the {{scheduledExecutorService}} thread, *not* the test main thread), 
the assertion fails with an {{{}AssertionError{}}}.

This triggers the following chain:
 # {{AssertionError}} -> {{markFailed()}} -> {{processFail()}}
 # {{notifySchedulerNgAboutInternalTaskFailure()}} -> scheduler failover
 # Scheduler cancels all executions in the affected region (including sink 
executions)
 # Subsequent {{transitionState(SCHEDULED)}} on sink executions fails because 
they are already in {{CANCELED}} state.

h3. Fix

Following the same approach as 
[FLINK-38970|https://issues.apache.org/jira/browse/FLINK-38970(https://issues.apache.org/jira/browse/FLINK-38970)]
 (which fixed the identical issue in {{{}AdaptiveBatchSchedulerTest{}}}), 
replace {{ComponentMainThreadExecutorServiceAdapter.forMainThread()}} in 
{{SchedulerBenchmarkUtils}} with a {{SynchronousComponentMainThreadExecutor}} 
that:
 * Uses {{DirectScheduledExecutorService}} to execute tasks synchronously on 
the calling thread.
 * Overrides {{assertRunningInMainThread()}} as a no-op, skipping strict thread 
identity checks.
 * Avoids {{AssertionError}} when {{CompletableFuture}} callbacks are 
dispatched from background threads.

h3. Affected Tests

All 21 scheduler benchmark tests that use 
{{{}SchedulerBenchmarkUtils.createAndInitScheduler(){}}}:
 * {{PartitionReleaseInBatchJobBenchmarkTest}}
 * {{DeployingDownstreamTasksInBatchJobBenchmarkTest}}
 * {{SchedulingDownstreamTasksInBatchJobBenchmarkTest}}
 * ... and others under {{org.apache.flink.runtime.scheduler.benchmark}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to