github-actions[bot] commented on code in PR #61679:
URL: https://github.com/apache/doris/pull/61679#discussion_r2980699413


##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
 
         // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
         if (_wake_up_early) {
-            terminate();
-            THROW_IF_ERROR(_root->terminate(_state));
-            THROW_IF_ERROR(_sink->terminate(_state));
             _eos = true;
             *done = true;
         } else if (_eos && !_spilling &&
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
+            // Debug point for testing the race condition fix: inject 
set_wake_up_early() +
+            // terminate() here to simulate Thread B writing A then B between 
Thread A's two
+            // reads of _wake_up_early.
+            DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+                set_wake_up_early();
+                terminate();
+            });
             *done = true;
         }
+
+        // NOTE: The terminate() call is intentionally placed AFTER the 
_is_pending_finish() check
+        // above, not before. This ordering is critical to avoid a race 
condition:
+        //
+        // Pipeline::make_all_runnable() writes in this order:
+        //   (A) set_wake_up_early()  ->  (B) terminate() [sets 
finish_dep._always_ready]
+        //
+        // If we checked _wake_up_early (A) before _is_pending_finish() (B), 
there would be a
+        // window where Thread A reads _wake_up_early=false, then Thread B 
writes both A and B,
+        // then Thread A reads _is_pending_finish()=false (due to 
_always_ready). Thread A would
+        // then set *done=true without ever calling operator terminate(), 
causing close() to run
+        // on operators that were never properly terminated (e.g. 
RuntimeFilterProducer still in
+        // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+        //
+        // By reading _is_pending_finish() (B) before the second read of 
_wake_up_early (A),
+        // if Thread A observes B's effect (_always_ready=true), it is 
guaranteed to also observe
+        // A's effect (_wake_up_early=true) on this second read, ensuring 
terminate() is called.
+        if (_wake_up_early) {
+            terminate();
+            THROW_IF_ERROR(_root->terminate(_state));

Review Comment:
   **Advisory (pre-existing):** Per `be/src/common/AGENTS.md`, `THROW_IF_ERROR` 
should be kept out of `Defer` blocks and destructors — the recommendation is to 
use `WARN_IF_ERROR` instead. While the Doris `Defer` implementation handles 
this safely (it checks `std::uncaught_exceptions()` and swallows exceptions 
during stack unwinding), during unwinding any error from `_root->terminate()` 
or `_sink->terminate()` would be silently lost.
   
   This is a pre-existing pattern (the original code had the same calls in the 
same Defer), so it's not a blocker for this PR. But since the code is being 
moved, this would be a good opportunity to switch to `WARN_IF_ERROR` if 
operator terminate failures are truly best-effort cleanup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to