This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6f8b8840f40 [Bug](pipeline) make sink operator process eos signals 
after wake_up_… (#45207) (#46374)
6f8b8840f40 is described below

commit 6f8b8840f4009da73c5944e0b33eaebff229162e
Author: Pxl <x...@selectdb.com>
AuthorDate: Fri Jan 3 18:37:27 2025 +0800

    [Bug](pipeline) make sink operator process eos signals after wake_up_… 
(#45207) (#46374)
    
    …early  (#45207)
    
    1. make sink operator process eos signals after wake_up_early
    2. set wake_up_early when `pipeline task meet
    wake_up_by_downstream`/`sink reach limit`/`sink get eof status`
    3. close non-sink operators after sink meet eos
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exprs/runtime_filter.cpp                  |   6 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp     |  57 ++++++-----
 be/src/pipeline/exec/hashjoin_probe_operator.cpp |   2 +-
 be/src/pipeline/pipeline.cpp                     |   7 +-
 be/src/pipeline/pipeline_task.cpp                | 116 +++++++++++------------
 be/src/pipeline/pipeline_task.h                  |  11 ++-
 6 files changed, 108 insertions(+), 91 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f5e9c96184f..2169ec727b2 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -498,12 +498,12 @@ public:
         switch (_filter_type) {
         case RuntimeFilterType::IN_FILTER: {
             if (!_context->hybrid_set) {
-                _context->ignored = true;
+                set_ignored();
                 return Status::OK();
             }
             _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
             if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
-                _context->ignored = true;
+                set_ignored();
                 // release in filter
                 _context->hybrid_set.reset();
             }
@@ -1332,7 +1332,7 @@ void IRuntimeFilter::set_synced_size(uint64_t 
global_size) {
 }
 
 void IRuntimeFilter::set_ignored() {
-    _wrapper->_context->ignored = true;
+    _wrapper->set_ignored();
 }
 
 bool IRuntimeFilter::get_ignored() {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 991127d83fe..4fa9f9a95a6 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -123,35 +123,48 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }};
 
-    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled()) {
+    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() || !_eos) {
         return Base::close(state, exec_status);
     }
 
-    if (state->get_task()->wake_up_by_downstream()) {
-        if (_should_build_hash_table) {
-            // partitial ignore rf to make global rf work
+    try {
+        if (state->get_task()->wake_up_early()) {
+            // partitial ignore rf to make global rf work or ignore useless rf
             RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
             RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
-        } else {
-            // do not publish filter coz local rf not inited and useless
-            return Base::close(state, exec_status);
-        }
-    } else if (_should_build_hash_table) {
-        auto* block = _shared_state->build_block.get();
-        uint64_t hash_table_size = block ? block->rows() : 0;
-        {
-            SCOPED_TIMER(_runtime_filter_init_timer);
-            RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
-            RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
-        }
-        if (hash_table_size > 1) {
-            SCOPED_TIMER(_runtime_filter_compute_timer);
-            _runtime_filter_slots->insert(block);
+        } else if (_should_build_hash_table) {
+            auto* block = _shared_state->build_block.get();
+            uint64_t hash_table_size = block ? block->rows() : 0;
+            {
+                SCOPED_TIMER(_runtime_filter_init_timer);
+                RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+                RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+            }
+            if (hash_table_size > 1) {
+                SCOPED_TIMER(_runtime_filter_compute_timer);
+                _runtime_filter_slots->insert(block);
+            }
         }
+
+        SCOPED_TIMER(_publish_runtime_filter_timer);
+        RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
+    } catch (Exception& e) {
+        bool blocked_by_complete_build_stage = p._shared_hashtable_controller 
&&
+                                               
!p._shared_hash_table_context->complete_build_stage;
+        bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
+                                                   
p._shared_hashtable_controller &&
+                                                   
!p._shared_hash_table_context->signaled;
+
+        return Status::InternalError(
+                "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
+                "{}, _finish_dependency: {}, blocked_by_complete_build_stage: 
{}, "
+                "blocked_by_shared_hash_table_signal: "
+                "{}",
+                e.to_string(), state->get_task()->wake_up_early(), 
_should_build_hash_table,
+                _finish_dependency->debug_string(), 
blocked_by_complete_build_stage,
+                blocked_by_shared_hash_table_signal);
     }
 
-    SCOPED_TIMER(_publish_runtime_filter_timer);
-    RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
     return Base::close(state, exec_status);
 }
 
@@ -516,7 +529,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    local_state._eos = eos;
     if (local_state._should_build_hash_table) {
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from probe side.
@@ -622,6 +634,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     }
 
     if (eos) {
+        local_state._eos = true;
         local_state.init_short_circuit_for_probe();
         // Since the comparison of null values is meaningless, null aware left 
anti/semi join should not output null
         // when the build side is not empty.
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 756a151394b..4f9184db8a5 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -241,7 +241,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
         // If we use a short-circuit strategy, should return block directly by 
add additional null data.
         auto block_rows = local_state._probe_block.rows();
         if (local_state._probe_eos && block_rows == 0) {
-            *eos = local_state._probe_eos;
+            *eos = true;
             return Status::OK();
         }
 
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 96da754daa5..6c39d361e59 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -112,7 +112,12 @@ void Pipeline::make_all_runnable() {
     if (_sink->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
-                task->clear_blocking_state(true);
+                task->set_wake_up_early();
+            }
+        }
+        for (auto* task : _tasks) {
+            if (task) {
+                task->clear_blocking_state();
             }
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 04a48d93be0..9d83c475778 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -22,6 +22,7 @@
 #include <glog/logging.h>
 #include <stddef.h>
 
+#include <algorithm>
 #include <ostream>
 #include <vector>
 
@@ -227,9 +228,6 @@ bool PipelineTask::_wait_to_start() {
     _blocked_dep = _execution_dep->is_blocked_by(this);
     if (_blocked_dep != nullptr) {
         static_cast<Dependency*>(_blocked_dep)->start_watcher();
-        if (_wake_up_by_downstream) {
-            _eos = true;
-        }
         return true;
     }
 
@@ -237,9 +235,6 @@ bool PipelineTask::_wait_to_start() {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
-            if (_wake_up_by_downstream) {
-                _eos = true;
-            }
             return true;
         }
     }
@@ -261,9 +256,6 @@ bool PipelineTask::_is_blocked() {
                 _blocked_dep = dep->is_blocked_by(this);
                 if (_blocked_dep != nullptr) {
                     _blocked_dep->start_watcher();
-                    if (_wake_up_by_downstream) {
-                        _eos = true;
-                    }
                     return true;
                 }
             }
@@ -283,9 +275,6 @@ bool PipelineTask::_is_blocked() {
         _blocked_dep = op_dep->is_blocked_by(this);
         if (_blocked_dep != nullptr) {
             _blocked_dep->start_watcher();
-            if (_wake_up_by_downstream) {
-                _eos = true;
-            }
             return true;
         }
     }
@@ -293,15 +282,15 @@ bool PipelineTask::_is_blocked() {
 }
 
 Status PipelineTask::execute(bool* eos) {
-    SCOPED_TIMER(_task_profile->total_time_counter());
-    SCOPED_TIMER(_exec_timer);
-    SCOPED_ATTACH_TASK(_state);
-    _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
-    *eos = _eos;
     if (_eos) {
-        // If task is waken up by finish dependency, `_eos` is set to true by 
last execution, and we should return here.
+        *eos = true;
         return Status::OK();
     }
+
+    SCOPED_TIMER(_task_profile->total_time_counter());
+    SCOPED_TIMER(_exec_timer);
+    SCOPED_ATTACH_TASK(_state);
+
     int64_t time_spent = 0;
     DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
         Status status = Status::Error<INTERNAL_ERROR>("fault_inject 
pipeline_task execute failed");
@@ -324,27 +313,31 @@ Status PipelineTask::execute(bool* eos) {
     if (_wait_to_start()) {
         return Status::OK();
     }
-    if (_wake_up_by_downstream) {
-        _eos = true;
-        *eos = true;
-        return Status::OK();
-    }
+
     // The status must be runnable
     if (!_opened && !_fragment_context->is_canceled()) {
+        if (_wake_up_early) {
+            *eos = true;
+            _eos = true;
+            return Status::OK();
+        }
         RETURN_IF_ERROR(_open());
     }
 
+    auto set_wake_up_and_dep_ready = [&]() {
+        if (wake_up_early()) {
+            return;
+        }
+        set_wake_up_early();
+        clear_blocking_state();
+    };
+
     _task_profile->add_info_string("TaskState", "Runnable");
     _task_profile->add_info_string("BlockedByDependency", "");
     while (!_fragment_context->is_canceled()) {
         if (_is_blocked()) {
             return Status::OK();
         }
-        if (_wake_up_by_downstream) {
-            _eos = true;
-            *eos = true;
-            return Status::OK();
-        }
 
         /// When a task is cancelled,
         /// its blocking state will be cleared and it will transition to a 
ready state (though it is not truly ready).
@@ -365,47 +358,47 @@ Status PipelineTask::execute(bool* eos) {
             RETURN_IF_ERROR(_sink->revoke_memory(_state));
             continue;
         }
-        *eos = _eos;
         DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
             Status status =
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
-        // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
-        if (_dry_run || _sink->is_finished(_state)) {
-            *eos = true;
-            _eos = true;
-        } else {
+        if (_sink->is_finished(_state)) {
+            set_wake_up_and_dep_ready();
+        }
+
+        // `_dry_run` means sink operator need no more data
+        *eos = wake_up_early() || _dry_run;
+        if (!*eos) {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, 
eos));
         }
 
+        if (*eos) {
+            RETURN_IF_ERROR(close(Status::OK(), false));
+        }
+
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            Status status = Status::OK();
-            // Define a lambda function to catch sink exception, because sink 
will check
-            // return error status with EOF, it is special, could not return 
directly.
-            auto sink_function = [&]() -> Status {
-                Status internal_st;
-                internal_st = _sink->sink(_state, block, *eos);
-                return internal_st;
-            };
-            status = sink_function();
-            if (!status.is<ErrorCode::END_OF_FILE>()) {
-                RETURN_IF_ERROR(status);
+            Status status = _sink->sink(_state, block, *eos);
+
+            if (status.is<ErrorCode::END_OF_FILE>()) {
+                set_wake_up_and_dep_ready();
+            } else if (!status) {
+                return status;
             }
-            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
+
             if (*eos) { // just return, the scheduler will do finish work
-                _eos = true;
                 _task_profile->add_info_string("TaskState", "Finished");
+                _eos = true;
                 return Status::OK();
             }
         }
     }
 
-    static_cast<void>(get_task_queue()->push_back(this));
+    RETURN_IF_ERROR(get_task_queue()->push_back(this));
     return Status::OK();
 }
 
@@ -474,17 +467,14 @@ void PipelineTask::finalize() {
     _le_state_map.clear();
 }
 
-Status PipelineTask::close(Status exec_status) {
+Status PipelineTask::close(Status exec_status, bool close_sink) {
     int64_t close_ns = 0;
-    Defer defer {[&]() {
-        if (_task_queue) {
-            _task_queue->update_statistics(this, close_ns);
-        }
-    }};
     Status s;
     {
         SCOPED_RAW_TIMER(&close_ns);
-        s = _sink->close(_state, exec_status);
+        if (close_sink) {
+            s = _sink->close(_state, exec_status);
+        }
         for (auto& op : _operators) {
             auto tem = op->close(_state);
             if (!tem.ok() && s.ok()) {
@@ -493,10 +483,18 @@ Status PipelineTask::close(Status exec_status) {
         }
     }
     if (_opened) {
-        _fresh_profile_counter();
-        COUNTER_SET(_close_timer, close_ns);
+        COUNTER_UPDATE(_close_timer, close_ns);
         COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
     }
+
+    if (close_sink && _opened) {
+        _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" 
: "false");
+        _fresh_profile_counter();
+    }
+
+    if (_task_queue) {
+        _task_queue->update_statistics(this, close_ns);
+    }
     return s;
 }
 
@@ -512,10 +510,10 @@ std::string PipelineTask::debug_string() {
     auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
     fmt::format_to(debug_string_buffer,
                    "PipelineTask[this = {}, id = {}, open = {}, eos = {}, 
finish = {}, dry run = "
-                   "{}, elapse time = {}s, _wake_up_by_downstream = {}], block 
dependency = {}, is "
+                   "{}, elapse time = {}s, _wake_up_early = {}], block 
dependency = {}, is "
                    "running = {}\noperators: ",
                    (void*)this, _index, _opened, _eos, _finalized, _dry_run, 
elapsed,
-                   _wake_up_by_downstream.load(),
+                   _wake_up_early.load(),
                    cur_blocked_dep && !_finalized ? 
cur_blocked_dep->debug_string() : "NULL",
                    is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index febc9634c49..94a553e2fa1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -61,7 +61,7 @@ public:
 
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
-    Status close(Status exec_status);
+    Status close(Status exec_status, bool close_sink = true);
 
     PipelineFragmentContext* fragment_context() { return _fragment_context; }
 
@@ -135,11 +135,12 @@ public:
     int task_id() const { return _index; };
     bool is_finalized() const { return _finalized; }
 
-    void clear_blocking_state(bool wake_up_by_downstream = false) {
+    void set_wake_up_early() { _wake_up_early = true; }
+
+    void clear_blocking_state() {
         
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
-        _wake_up_by_downstream = _wake_up_by_downstream || 
wake_up_by_downstream;
         if (!_finalized) {
             _execution_dep->set_always_ready();
             for (auto* dep : _filter_dependencies) {
@@ -236,7 +237,7 @@ public:
 
     PipelineId pipeline_id() const { return _pipeline->id(); }
 
-    bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
+    bool wake_up_early() const { return _wake_up_early; }
 
 private:
     friend class RuntimeFilterDependency;
@@ -318,7 +319,7 @@ private:
 
     std::atomic<bool> _running = false;
     std::atomic<bool> _eos = false;
-    std::atomic<bool> _wake_up_by_downstream = false;
+    std::atomic<bool> _wake_up_early = false;
 };
 
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to