Copilot commented on code in PR #61679:
URL: https://github.com/apache/doris/pull/61679#discussion_r2980681947
##########
be/test/exec/pipeline/pipeline_task_test.cpp:
##########
@@ -1534,4 +1538,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
}
}
+// Test for the race condition fix between _wake_up_early and
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A)
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ }
+ _query_ctx->get_execution_dependency()->set_ready();
+
+ // Get the sink's finish dependency and block it to simulate a pending
async operation
+ // (e.g. runtime filter size sync RPC in flight).
+ auto* sink_finish_dep =
+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+ EXPECT_NE(sink_finish_dep, nullptr);
+ sink_finish_dep->block();
+
+ // Drive the task to EOS so it will enter the Defer's pending-finish check.
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ // EOS reached but still blocked on finish dependency: not done yet.
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ }
+
+ // Now unblock the finish dependency (simulates the async op completing)
and activate the
+ // debug point. The debug point fires inside the else-if branch — after
_is_pending_finish()
+ // returns false but before the second _wake_up_early read — and calls
set_wake_up_early() +
+ // terminate(). This precisely reproduces the race where Thread B's writes
land between
+ // Thread A's two reads of _wake_up_early.
+ sink_finish_dep->set_ready();
+ config::enable_debug_points = true;
+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ EXPECT_TRUE(task->_eos);
+ EXPECT_TRUE(done);
+ // The key assertion: even though the task took the else-if path (not
the
+ // if(_wake_up_early) path), operator terminate() must have been
called because the
+ // second read of _wake_up_early correctly observed the value set by
the debug point.
+ EXPECT_TRUE(task->_wake_up_early);
+
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+ EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+ }
+ DebugPoints::instance()->clear();
Review Comment:
`DebugPoints::instance()->clear()` clears *all* debug points globally, which
can interfere with other tests in the same binary if they rely on debug points.
Since this test only adds one point, prefer
`DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if")`
and leave unrelated debug points intact.
```suggestion
DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if");
```
##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
// If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
if (_wake_up_early) {
- terminate();
- THROW_IF_ERROR(_root->terminate(_state));
- THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish()))
{
+ // Debug point for testing the race condition fix: inject
set_wake_up_early() +
+ // terminate() here to simulate Thread B writing A then B between
Thread A's two
+ // reads of _wake_up_early.
+ DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+ set_wake_up_early();
+ terminate();
+ });
*done = true;
}
+
+ // NOTE: The terminate() call is intentionally placed AFTER the
_is_pending_finish() check
+ // above, not before. This ordering is critical to avoid a race
condition:
+ //
+ // Pipeline::make_all_runnable() writes in this order:
+ // (A) set_wake_up_early() -> (B) terminate() [sets
finish_dep._always_ready]
+ //
+ // If we checked _wake_up_early (A) before _is_pending_finish() (B),
there would be a
+ // window where Thread A reads _wake_up_early=false, then Thread B
writes both A and B,
+ // then Thread A reads _is_pending_finish()=false (due to
_always_ready). Thread A would
+ // then set *done=true without ever calling operator terminate(),
causing close() to run
+ // on operators that were never properly terminated (e.g.
RuntimeFilterProducer still in
+ // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+ //
+ // By reading _is_pending_finish() (B) before the second read of
_wake_up_early (A),
+ // if Thread A observes B's effect (_always_ready=true), it is
guaranteed to also observe
+ // A's effect (_wake_up_early=true) on this second read, ensuring
terminate() is called.
Review Comment:
The explanatory comment says `_is_pending_finish()` becomes false “due to
_always_ready”, but `_is_pending_finish()` ultimately checks
`Dependency::_ready` via `Dependency::is_blocked_by()`. `set_always_ready()`
does set `_always_ready`, but it unblocks tasks by calling `set_ready()`
(setting `_ready=true`). Consider rewording this comment to reference
`_ready`/`set_ready()` so the described mechanism matches the actual code path.
```suggestion
// (A) set_wake_up_early() -> (B) terminate() [marks the finish
dependency ready
// via set_always_ready()/set_ready(), which sets
Dependency::_ready=true]
//
// If we checked _wake_up_early (A) before _is_pending_finish() (B),
there would be a
// window where Thread A reads _wake_up_early=false, then Thread B
writes both A and B,
// then Thread A reads _is_pending_finish()=false (because the
finish dependency's
// underlying Dependency::_ready flag is now true). Thread A would
then set *done=true
// without ever calling operator terminate(), causing close() to run
on operators that
// were never properly terminated (e.g. RuntimeFilterProducer still
in
// WAITING_FOR_SYNCED_SIZE state when insert() is called).
//
// By reading _is_pending_finish() (B) before the second read of
_wake_up_early (A),
// if Thread A observes B's effect (Dependency::_ready=true on the
finish dependency), it
// is guaranteed to also observe A's effect (_wake_up_early=true) on
this second read,
// ensuring terminate() is called.
```
##########
be/test/exec/pipeline/pipeline_task_test.cpp:
##########
@@ -1534,4 +1538,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
}
}
+// Test for the race condition fix between _wake_up_early and
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A)
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+ auto num_instances = 1;
+ auto pip_id = 0;
+ auto task_id = 0;
+ auto pip = std::make_shared<Pipeline>(pip_id, num_instances,
num_instances);
+ {
+ OperatorPtr source_op;
+ source_op.reset(new DummyOperator());
+ EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+ int op_id = 1;
+ int node_id = 2;
+ int dest_id = 3;
+ DataSinkOperatorPtr sink_op;
+ sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+ EXPECT_TRUE(pip->set_sink(sink_op).ok());
+ }
+ auto profile = std::make_shared<RuntimeProfile>("Pipeline : " +
std::to_string(pip_id));
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ _runtime_state->resize_op_id_to_local_state(-1);
+ auto task = std::make_shared<PipelineTask>(pip, task_id,
_runtime_state.get(), _context,
+ profile.get(),
shared_state_map, task_id);
+ task->_exec_time_slice = 10'000'000'000ULL;
+ {
+ std::vector<TScanRangeParams> scan_range;
+ int sender_id = 0;
+ TDataSink tsink;
+ EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+ }
+ _query_ctx->get_execution_dependency()->set_ready();
+
+ // Get the sink's finish dependency and block it to simulate a pending
async operation
+ // (e.g. runtime filter size sync RPC in flight).
+ auto* sink_finish_dep =
+
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+ EXPECT_NE(sink_finish_dep, nullptr);
+ sink_finish_dep->block();
+
+ // Drive the task to EOS so it will enter the Defer's pending-finish check.
+ task->_operators.front()->cast<DummyOperator>()._eos = true;
+ {
+ bool done = false;
+ EXPECT_TRUE(task->execute(&done).ok());
+ // EOS reached but still blocked on finish dependency: not done yet.
+ EXPECT_TRUE(task->_eos);
+ EXPECT_FALSE(done);
+ EXPECT_FALSE(task->_wake_up_early);
+ }
+
+ // Now unblock the finish dependency (simulates the async op completing)
and activate the
+ // debug point. The debug point fires inside the else-if branch — after
_is_pending_finish()
+ // returns false but before the second _wake_up_early read — and calls
set_wake_up_early() +
+ // terminate(). This precisely reproduces the race where Thread B's writes
land between
+ // Thread A's two reads of _wake_up_early.
+ sink_finish_dep->set_ready();
+ config::enable_debug_points = true;
+
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+ {
Review Comment:
This test flips the global `config::enable_debug_points` flag but restores
it to `false` unconditionally. If another test (or future setup) enables debug
points, this will leak state across tests. Prefer capturing the original value
and restoring it (e.g., via RAII/Defer), similar to `SpillableDebugPointHelper`
in `be/test/exec/operator/spillable_operator_test_helper.h`.
##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
// If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
if (_wake_up_early) {
- terminate();
- THROW_IF_ERROR(_root->terminate(_state));
- THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish()))
{
+ // Debug point for testing the race condition fix: inject
set_wake_up_early() +
+ // terminate() here to simulate Thread B writing A then B between
Thread A's two
+ // reads of _wake_up_early.
+ DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+ set_wake_up_early();
+ terminate();
Review Comment:
The injected debug-point block calls `terminate()`, but `terminate()` is
called again immediately afterwards when `_wake_up_early` is re-checked. If
`terminate()` ever becomes non-idempotent, this debug point will change
behavior (and can mask issues). Consider removing the `terminate()` call from
the debug-point injection and only toggling `_wake_up_early`, or otherwise
guard so `terminate()` runs only once per `execute()` call when the debug point
is enabled.
```suggestion
// Debug point for testing the race condition fix: inject
set_wake_up_early() here
// to simulate Thread B writing A then B between Thread A's two
reads of
// _wake_up_early. terminate() will be invoked once below when
_wake_up_early is
// re-checked, to avoid double-calling terminate() in the debug
path.
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
set_wake_up_early();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]