Liu created FLINK-39387:
---------------------------
Summary: Fix flaky scheduler benchmark tests caused by thread
assertion failure in async TDD creation
Key: FLINK-39387
URL: https://issues.apache.org/jira/browse/FLINK-39387
Project: Flink
Issue Type: Improvement
Components: Runtime / Coordination
Reporter: Liu
h2. Summary
Fix flaky scheduler benchmark tests caused by asynchronous TDD creation
h3. Problem
After
[FLINK-38114|https://issues.apache.org/jira/browse/FLINK-38114(https://issues.apache.org/jira/browse/FLINK-38114)]
introduced asynchronous TDD (TaskDeploymentDescriptor) creation in
{{{}Execution.deploy(){}}}, all 21 scheduler benchmark tests (e.g.,
{{{}PartitionReleaseInBatchJobBenchmarkTest{}}}) become flaky with the
following error:
{code:java|title=Error Message}
IllegalStateException: Cannot leave terminal state CANCELED to transition to
SCHEDULED.
{code}
h3. Root Cause
{{FLINK-38114}} changed {{Execution.deploy()}} to use an asynchronous flow:
{code:java}
CompletableFuture.supplyAsync(initOffloadedTaskRestoreRef(...), executor) //
step 1: runs on scheduledExecutorService
.thenComposeAsync(tryGetTaskDeploymentDescriptorForSlot(slot),
jobMasterMainThreadExecutor) // step 2: dispatched to mainThreadExecutor
{code}
{{SchedulerBenchmarkUtils.createAndInitScheduler()}} uses
{{ComponentMainThreadExecutorServiceAdapter.forMainThread()}} as the
{{{}mainThreadExecutor{}}}. Internally, it wraps a
{{DirectScheduledExecutorService}} whose {{execute()}} method runs the command
*on the calling thread* and includes an assertion:
{code:java}
assert MainThreadValidatorUtil.isRunningInExpectedThread(main);
{code}
When step 1 completes on the {{scheduledExecutorService}} thread,
{{thenComposeAsync}} dispatches the step 2 callback to
{{{}jobMasterMainThreadExecutor.execute(callback){}}}. Since
{{DirectScheduledExecutorService.execute()}} runs inline on the calling thread
(which is the {{scheduledExecutorService}} thread, *not* the test main thread),
the assertion fails with an {{{}AssertionError{}}}.
This triggers the following chain:
# {{AssertionError}} -> {{markFailed()}} -> {{processFail()}}
# {{notifySchedulerNgAboutInternalTaskFailure()}} -> scheduler failover
# Scheduler cancels all executions in the affected region (including sink
executions)
# Subsequent {{transitionState(SCHEDULED)}} on sink executions fails because
they are already in {{CANCELED}} state.
h3. Fix
Following the same approach as
[FLINK-38970|https://issues.apache.org/jira/browse/FLINK-38970(https://issues.apache.org/jira/browse/FLINK-38970)]
(which fixed the identical issue in {{{}AdaptiveBatchSchedulerTest{}}}),
replace {{ComponentMainThreadExecutorServiceAdapter.forMainThread()}} in
{{SchedulerBenchmarkUtils}} with a {{SynchronousComponentMainThreadExecutor}}
that:
* Uses {{DirectScheduledExecutorService}} to execute tasks synchronously on
the calling thread.
* Overrides {{assertRunningInMainThread()}} as a no-op, skipping strict thread
identity checks.
* Avoids {{AssertionError}} when {{CompletableFuture}} callbacks are
dispatched from background threads.
h3. Affected Tests
All 21 scheduler benchmark tests that use
{{{}SchedulerBenchmarkUtils.createAndInitScheduler(){}}}:
* {{PartitionReleaseInBatchJobBenchmarkTest}}
* {{DeployingDownstreamTasksInBatchJobBenchmarkTest}}
* {{SchedulingDownstreamTasksInBatchJobBenchmarkTest}}
* ... and others under {{org.apache.flink.runtime.scheduler.benchmark}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)