This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e08498eb559 [fix](pipeline) Fix wake up early without terminate call 
(#63539)
e08498eb559 is described below

commit e08498eb5592bc04ef4af5340622349019bce543
Author: Pxl <[email protected]>
AuthorDate: Tue May 26 14:30:18 2026 +0800

    [fix](pipeline) Fix wake up early without terminate call (#63539)
    
    ### What problem does this PR solve?
    
    Issue Number: None
    
    Related PR: #61679
    
    Problem Summary: Backport #61679 to branch-4.0. A pipeline task can race
    with downstream early wake-up: one thread may observe `_wake_up_early`
    as false, then another thread sets `_wake_up_early` and unblocks finish
    dependencies, and the first thread later sees `_is_pending_finish()` as
    false and finishes without calling operator `terminate()`. For hash join
    build tasks this can leave runtime filter producers in
    `WAITING_FOR_SYNCED_SIZE`; during close/build, `insert()` expects
    `WAITING_FOR_DATA` and reports an invalid runtime filter producer state.
    This change moves operator termination after the pending-finish check so
    the second `_wake_up_early` read observes the early wake-up and
    terminates operators before close.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Unit Test
    - `build-support/check-format.sh be/src/pipeline/pipeline_task.cpp
    be/test/pipeline/pipeline_task_test.cpp`
    - `ninja -C be/build_Release
    src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o`
    - `ninja -C be/ut_build_ASAN
    src/exec/CMakeFiles/Exec.dir/pipeline/pipeline_task.cpp.o
    test/CMakeFiles/doris_be_test.dir/exec/pipeline/pipeline_task_test.cpp.o
    test/doris_be_test`
    - `be/ut_build_ASAN/test/doris_be_test
    --gtest_filter=PipelineTaskTest.TEST_TERMINATE_RACE_FIX
    --gtest_print_time=true`
    - Behavior changed: No
    - Does this need documentation: No
---
 be/src/pipeline/pipeline_task.cpp       | 32 +++++++++++--
 be/test/pipeline/pipeline_task_test.cpp | 79 +++++++++++++++++++++++++++++++++
 2 files changed, 108 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index dbe52cbabc0..afd40596241 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -409,15 +409,41 @@ Status PipelineTask::execute(bool* done) {
 
         // If task is woke up early, we should terminate all operators, and 
this task could be closed immediately.
         if (_wake_up_early) {
-            terminate();
-            THROW_IF_ERROR(_root->terminate(_state));
-            THROW_IF_ERROR(_sink->terminate(_state));
             _eos = true;
             *done = true;
         } else if (_eos && !_spilling &&
                    (fragment_context->is_canceled() || !_is_pending_finish())) 
{
+            // Debug point for testing the race condition fix: inject 
set_wake_up_early() +
+            // terminate() here to simulate Thread B writing A then B between 
Thread A's two
+            // reads of _wake_up_early.
+            DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+                set_wake_up_early();
+                terminate();
+            });
             *done = true;
         }
+
+        // NOTE: The terminate() call is intentionally placed AFTER the 
_is_pending_finish() check
+        // above, not before. This ordering is critical to avoid a race 
condition:
+        //
+        // Pipeline::make_all_runnable() writes in this order:
+        //   (A) set_wake_up_early()  ->  (B) terminate() [sets 
finish_dep._always_ready]
+        //
+        // If we checked _wake_up_early (A) before _is_pending_finish() (B), 
there would be a
+        // window where Thread A reads _wake_up_early=false, then Thread B 
writes both A and B,
+        // then Thread A reads _is_pending_finish()=false (due to 
_always_ready). Thread A would
+        // then set *done=true without ever calling operator terminate(), 
causing close() to run
+        // on operators that were never properly terminated (e.g. 
RuntimeFilterProducer still in
+        // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+        //
+        // By reading _is_pending_finish() (B) before the second read of 
_wake_up_early (A),
+        // if Thread A observes B's effect (_always_ready=true), it is 
guaranteed to also observe
+        // A's effect (_wake_up_early=true) on this second read, ensuring 
terminate() is called.
+        if (_wake_up_early) {
+            terminate();
+            THROW_IF_ERROR(_root->terminate(_state));
+            THROW_IF_ERROR(_sink->terminate(_state));
+        }
     }};
     const auto query_id = _state->query_id();
     // If this task is already EOS and block is empty (which means we already 
output all blocks),
diff --git a/be/test/pipeline/pipeline_task_test.cpp 
b/be/test/pipeline/pipeline_task_test.cpp
index 8a8e16e3c4d..a629344363a 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -18,6 +18,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "dummy_task_queue.h"
 #include "pipeline/dependency.h"
@@ -30,6 +31,7 @@
 #include "testutil/mock/mock_thread_mem_tracker_mgr.h"
 #include "testutil/mock/mock_workload_group_mgr.h"
 #include "thrift_builder.h"
+#include "util/debug_points.h"
 
 namespace doris::pipeline {
 
@@ -1186,4 +1188,81 @@ TEST_F(PipelineTaskTest, TEST_INJECT_SHARED_STATE) {
     }
 }
 
+// Test for the race condition fix between _wake_up_early and 
_is_pending_finish().
+//
+// The race: Pipeline::make_all_runnable() writes in order (A) 
set_wake_up_early -> (B) terminate()
+// [sets finish_dep._always_ready]. In execute()'s Defer block, if Thread A 
reads _wake_up_early=false
+// (A), then Thread B writes A and B, then Thread A reads 
_is_pending_finish()=false (due to
+// _always_ready from B), Thread A would set *done=true without calling 
operator terminate().
+//
+// The fix: terminate() is called after _is_pending_finish() in the Defer. So 
if Thread A sees B's
+// effect (_always_ready=true), it must also see A's effect 
(_wake_up_early=true) on the subsequent
+// read, ensuring terminate() is always called.
+//
+// This test uses a debug point injected into the else-if branch to simulate 
the exact bad timing:
+// the debug point fires set_wake_up_early() + terminate() after 
_is_pending_finish() returns false
+// (due to finish_dep being naturally unblocked) but before the second 
_wake_up_early check.
+TEST_F(PipelineTaskTest, TEST_TERMINATE_RACE_FIX) {
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    {
+        OperatorPtr source_op;
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    task->_exec_time_slice = 10'000'000'000ULL;
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+    }
+    _query_ctx->get_execution_dependency()->set_ready();
+
+    auto* sink_finish_dep =
+            
_runtime_state->get_sink_local_state()->cast<DummySinkLocalState>().finishdependency();
+    EXPECT_NE(sink_finish_dep, nullptr);
+    sink_finish_dep->block();
+
+    task->_operators.front()->cast<DummyOperator>()._eos = true;
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+    }
+
+    sink_finish_dep->set_ready();
+    config::enable_debug_points = true;
+    
DebugPoints::instance()->add("PipelineTask::execute.wake_up_early_in_else_if");
+    {
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_TRUE(task->_eos);
+        EXPECT_TRUE(done);
+        EXPECT_TRUE(task->_wake_up_early);
+        
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._terminated);
+        EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._terminated);
+    }
+    DebugPoints::instance()->clear();
+    config::enable_debug_points = false;
+}
+
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to