Copilot commented on code in PR #61679:
URL: https://github.com/apache/doris/pull/61679#discussion_r2980681947


##########
be/test/exec/pipeline/pipeline_task_test.cpp:
##########
@@ -1534,4 +1538,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
     }
 }
 
+// Test for the race condition fix between _wake_up_early and 
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A) 
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A 
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads 
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling 
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So 
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect 
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate 
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after 
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second 
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    }
+    _query_ctx->get_execution_dependency()->set_ready();
+
+    // Get the sink's finish dependency and block it to simulate a pending 
async operation
+    // (e.g. runtime filter size sync RPC in flight).
+    auto* sink_finish_dep =
+            
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+    EXPECT_NE(sink_finish_dep, nullptr);
+    sink_finish_dep->block();
+
+    // Drive the task to EOS so it will enter the Defer's pending-finish check.
+    task->_operators.front()->cast<DummyOperator>()._eos = true;
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        // EOS reached but still blocked on finish dependency: not done yet.
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+    }
+
+    // Now unblock the finish dependency (simulates the async op completing) 
and activate the
+    // debug point. The debug point fires inside the else-if branch — after 
_is_pending_finish()
+    // returns false but before the second _wake_up_early read — and calls 
set_wake_up_early() +
+    // terminate(). This precisely reproduces the race where Thread B's writes 
land between
+    // Thread A's two reads of _wake_up_early.
+    sink_finish_dep->set_ready();
+    config::enable_debug_points = true;
+    
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_TRUE(done);
+        // The key assertion: even though the task took the else-if path (not 
the
+        // if(_wake_up_early) path), operator terminate() must have been 
called because the
+        // second read of _wake_up_early correctly observed the value set by 
the debug point.
+        EXPECT_TRUE(task->_wake_up_early);
+        
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+        EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+    }
+    DebugPoints::instance()->clear();

Review Comment:
   `DebugPoints::instance()->clear()` clears *all* debug points globally, which 
can interfere with other tests in the same binary if they rely on debug points. 
Since this test only adds one point, prefer 
`DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if")`
 and leave unrelated debug points intact.
   ```suggestion
       
DebugPoints::instance()->remove("PipelineTask::execute.wake_up_early_in_else_if");
   ```



##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
 
         // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
         if (_wake_up_early) {
-            terminate();
-            THROW_IF_ERROR(_root->terminate(_state));
-            THROW_IF_ERROR(_sink->terminate(_state));
             _eos = true;
             *done = true;
         } else if (_eos && !_spilling &&
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
+            // Debug point for testing the race condition fix: inject 
set_wake_up_early() +
+            // terminate() here to simulate Thread B writing A then B between 
Thread A's two
+            // reads of _wake_up_early.
+            DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+                set_wake_up_early();
+                terminate();
+            });
             *done = true;
         }
+
+        // NOTE: The terminate() call is intentionally placed AFTER the 
_is_pending_finish() check
+        // above, not before. This ordering is critical to avoid a race 
condition:
+        //
+        // Pipeline::make_all_runnable() writes in this order:
+        //   (A) set_wake_up_early()  ->  (B) terminate() [sets 
finish_dep._always_ready]
+        //
+        // If we checked _wake_up_early (A) before _is_pending_finish() (B), 
there would be a
+        // window where Thread A reads _wake_up_early=false, then Thread B 
writes both A and B,
+        // then Thread A reads _is_pending_finish()=false (due to 
_always_ready). Thread A would
+        // then set *done=true without ever calling operator terminate(), 
causing close() to run
+        // on operators that were never properly terminated (e.g. 
RuntimeFilterProducer still in
+        // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+        //
+        // By reading _is_pending_finish() (B) before the second read of 
_wake_up_early (A),
+        // if Thread A observes B's effect (_always_ready=true), it is 
guaranteed to also observe
+        // A's effect (_wake_up_early=true) on this second read, ensuring 
terminate() is called.

Review Comment:
   The explanatory comment says `_is_pending_finish()` becomes false “due to 
_always_ready”, but `_is_pending_finish()` ultimately checks 
`Dependency::_ready` via `Dependency::is_blocked_by()`. `set_always_ready()` 
does set `_always_ready`, but it unblocks tasks by calling `set_ready()` 
(setting `_ready=true`). Consider rewording this comment to reference 
`_ready`/`set_ready()` so the described mechanism matches the actual code path.
   ```suggestion
           //   (A) set_wake_up_early()  ->  (B) terminate() [marks the finish 
dependency ready
           //       via set_always_ready()/set_ready(), which sets 
Dependency::_ready=true]
           //
           // If we checked _wake_up_early (A) before _is_pending_finish() (B), 
there would be a
           // window where Thread A reads _wake_up_early=false, then Thread B 
writes both A and B,
           // then Thread A reads _is_pending_finish()=false (because the 
finish dependency's
           // underlying Dependency::_ready flag is now true). Thread A would 
then set *done=true
           // without ever calling operator terminate(), causing close() to run 
on operators that
           // were never properly terminated (e.g. RuntimeFilterProducer still 
in
           // WAITING_FOR_SYNCED_SIZE state when insert() is called).
           //
           // By reading _is_pending_finish() (B) before the second read of 
_wake_up_early (A),
           // if Thread A observes B's effect (Dependency::_ready=true on the 
finish dependency), it
           // is guaranteed to also observe A's effect (_wake_up_early=true) on 
this second read,
           // ensuring terminate() is called.
   ```



##########
be/test/exec/pipeline/pipeline_task_test.cpp:
##########
@@ -1534,4 +1538,93 @@ TEST_F(PipelineTaskTest, TEST_REVOKE_MEMORY) {
     }
 }
 
+// Test for the race condition fix between _wake_up_early and 
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A) 
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A 
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads 
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling 
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So 
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect 
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate 
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after 
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second 
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    }
+    _query_ctx->get_execution_dependency()->set_ready();
+
+    // Get the sink's finish dependency and block it to simulate a pending 
async operation
+    // (e.g. runtime filter size sync RPC in flight).
+    auto* sink_finish_dep =
+            
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+    EXPECT_NE(sink_finish_dep, nullptr);
+    sink_finish_dep->block();
+
+    // Drive the task to EOS so it will enter the Defer's pending-finish check.
+    task->_operators.front()->cast<DummyOperator>()._eos = true;
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        // EOS reached but still blocked on finish dependency: not done yet.
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+    }
+
+    // Now unblock the finish dependency (simulates the async op completing) 
and activate the
+    // debug point. The debug point fires inside the else-if branch — after 
_is_pending_finish()
+    // returns false but before the second _wake_up_early read — and calls 
set_wake_up_early() +
+    // terminate(). This precisely reproduces the race where Thread B's writes 
land between
+    // Thread A's two reads of _wake_up_early.
+    sink_finish_dep->set_ready();
+    config::enable_debug_points = true;
+    
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+    {

Review Comment:
   This test flips the global `config::enable_debug_points` flag but restores 
it to `false` unconditionally. If another test (or future setup) enables debug 
points, this will leak state across tests. Prefer capturing the original value 
and restoring it (e.g., via RAII/Defer), similar to `SpillableDebugPointHelper` 
in `be/test/exec/operator/spillable_operator_test_helper.h`.



##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
 
         // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
         if (_wake_up_early) {
-            terminate();
-            THROW_IF_ERROR(_root->terminate(_state));
-            THROW_IF_ERROR(_sink->terminate(_state));
             _eos = true;
             *done = true;
         } else if (_eos && !_spilling &&
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
+            // Debug point for testing the race condition fix: inject 
set_wake_up_early() +
+            // terminate() here to simulate Thread B writing A then B between 
Thread A's two
+            // reads of _wake_up_early.
+            DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+                set_wake_up_early();
+                terminate();

Review Comment:
   The injected debug-point block calls `terminate()`, but `terminate()` is 
called again immediately afterwards when `_wake_up_early` is re-checked. If 
`terminate()` ever becomes non-idempotent, this debug point will change 
behavior (and can mask issues). Consider removing the `terminate()` call from 
the debug-point injection and only toggling `_wake_up_early`, or otherwise 
guard so `terminate()` runs only once per `execute()` call when the debug point 
is enabled.
   ```suggestion
               // Debug point for testing the race condition fix: inject 
set_wake_up_early() here
               // to simulate Thread B writing A then B between Thread A's two 
reads of
               // _wake_up_early. terminate() will be invoked once below when 
_wake_up_early is
               // re-checked, to avoid double-calling terminate() in the debug 
path.
               
DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
                   set_wake_up_early();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to