This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch cp_1213_2
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4131d3c111d654497296dcedf8292cd72a7f9aa2
Author: BiteTheDDDDt <pxl...@qq.com>
AuthorDate: Fri Dec 13 15:28:52 2024 +0800

    make sink operator process eos signals after wake_up_early
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp   | 58 ++++++++++-------
 be/src/pipeline/pipeline.cpp                   |  7 +-
 be/src/pipeline/pipeline_task.cpp              |  6 +-
 be/src/pipeline/pipeline_task.h                |  5 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 88 +++++++++++++-------------
 be/src/pipeline/pipeline_x/pipeline_x_task.h   | 12 ++--
 6 files changed, 101 insertions(+), 75 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index dff377d62d5..1fee397cfa8 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -137,34 +137,48 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         }
     }};
 
-    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled()) {
+    if (!_runtime_filter_slots || _runtime_filters.empty() || 
state->is_cancelled() ||
+        !p.get_local_state(state)._eos) {
         return Base::close(state, exec_status);
     }
 
-    if (state->get_task()->wake_up_by_downstream()) {
-        if (_should_build_hash_table) {
-            // partitial ignore rf to make global rf work
+    try {
+        if (state->get_task()->wake_up_early()) {
+            // partitial ignore rf to make global rf work or ignore useless rf
             RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
             RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
-        } else {
-            // do not publish filter coz local rf not inited and useless
-            return Base::close(state, exec_status);
-        }
-    } else if (_should_build_hash_table) {
-        auto* block = _shared_state->build_block.get();
-        uint64_t hash_table_size = block ? block->rows() : 0;
-        {
-            SCOPED_TIMER(_runtime_filter_init_timer);
-            RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
-            RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
-        }
-        if (hash_table_size > 1) {
-            SCOPED_TIMER(_runtime_filter_compute_timer);
-            _runtime_filter_slots->insert(block);
+        } else if (_should_build_hash_table) {
+            auto* block = _shared_state->build_block.get();
+            uint64_t hash_table_size = block ? block->rows() : 0;
+            {
+                SCOPED_TIMER(_runtime_filter_init_timer);
+                RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+                RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+            }
+            if (hash_table_size > 1) {
+                SCOPED_TIMER(_runtime_filter_compute_timer);
+                _runtime_filter_slots->insert(block);
+            }
         }
+
+        SCOPED_TIMER(_publish_runtime_filter_timer);
+        
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
+    } catch (Exception& e) {
+        bool blocked_by_complete_build_stage = p._shared_hashtable_controller 
&&
+                                               
!p._shared_hash_table_context->complete_build_stage;
+        bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
+                                                   
p._shared_hashtable_controller &&
+                                                   
!p._shared_hash_table_context->signaled;
+
+        return Status::InternalError(
+                "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
+                "{}, _finish_dependency: {}, blocked_by_complete_build_stage: 
{}, "
+                "blocked_by_shared_hash_table_signal: "
+                "{}",
+                e.to_string(), state->get_task()->wake_up_early(), 
_should_build_hash_table,
+                _finish_dependency->debug_string(), 
blocked_by_complete_build_stage,
+                blocked_by_shared_hash_table_signal);
     }
-    SCOPED_TIMER(_publish_runtime_filter_timer);
-    RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
     return Base::close(state, exec_status);
 }
 
@@ -536,7 +550,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
-    local_state._eos = eos;
     if (local_state._should_build_hash_table) {
         // If eos or have already met a null value using short-circuit 
strategy, we do not need to pull
         // data from probe side.
@@ -654,6 +667,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     }
 
     if (eos) {
+        local_state._eos = eos;
         local_state.init_short_circuit_for_probe();
         // Since the comparison of null values is meaningless, null aware left 
anti/semi join should not output null
         // when the build side is not empty.
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 2431e64d158..450cb0c123d 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -104,7 +104,12 @@ void Pipeline::make_all_runnable() {
     if (_sink_x->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
-                task->clear_blocking_state(true);
+                task->set_wake_up_early();
+            }
+        }
+        for (auto* task : _tasks) {
+            if (task) {
+                task->clear_blocking_state();
             }
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 13b672868b3..eb2073e8f8c 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -325,7 +325,7 @@ Status PipelineTask::execute(bool* eos) {
     return status;
 }
 
-Status PipelineTask::close(Status exec_status) {
+Status PipelineTask::close(Status exec_status, bool close_sink) {
     int64_t close_ns = 0;
     Defer defer {[&]() {
         if (_task_queue) {
@@ -335,7 +335,9 @@ Status PipelineTask::close(Status exec_status) {
     Status s;
     {
         SCOPED_RAW_TIMER(&close_ns);
-        s = _sink->close(_state);
+        if (close_sink) {
+            s = _sink->close(_state);
+        }
         for (auto& op : _operators) {
             auto tem = op->close(_state);
             if (!tem.ok() && s.ok()) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 661b16c99ed..08fdb32d7fe 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -133,7 +133,7 @@ public:
 
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
-    virtual Status close(Status exec_status);
+    virtual Status close(Status exec_status, bool close_sink = true);
 
     void put_in_runnable_queue() {
         _schedule_time++;
@@ -293,7 +293,8 @@ public:
 
     PipelineId pipeline_id() const { return _pipeline->id(); }
 
-    virtual void clear_blocking_state(bool wake_up_by_downstream = false) {}
+    virtual void clear_blocking_state() {}
+    virtual void set_wake_up_early() {}
 
 protected:
     void _finish_p_dependency() {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 6be1c6a1492..5aba572e361 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -226,6 +226,10 @@ Status PipelineXTask::_open() {
 }
 
 Status PipelineXTask::execute(bool* eos) {
+    if (_eos) {
+        *eos = true;
+        return Status::OK();
+    }
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_TIMER(_exec_timer);
     SCOPED_ATTACH_TASK(_state);
@@ -247,28 +251,23 @@ Status PipelineXTask::execute(bool* eos) {
             cpu_qs->add_cpu_nanos(delta_cpu_time);
         }
     }};
-    *eos = _sink->is_finished(_state) || _wake_up_by_downstream || 
is_final_state(_cur_state);
-    if (*eos) {
-        return Status::OK();
-    }
+
     if (has_dependency()) {
         set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
         return Status::OK();
     }
-    if (_wake_up_by_downstream) {
-        *eos = true;
-        return Status::OK();
-    }
     if (_runtime_filter_blocked_dependency() != nullptr) {
         set_state(PipelineTaskState::BLOCKED_FOR_RF);
         return Status::OK();
     }
-    if (_wake_up_by_downstream) {
-        *eos = true;
-        return Status::OK();
-    }
+
     // The status must be runnable
     if (!_opened) {
+        if (_wake_up_early) {
+            *eos = true;
+            _eos = true;
+            return Status::OK();
+        }
         {
             SCOPED_RAW_TIMER(&time_spent);
             RETURN_IF_ERROR(_open());
@@ -277,20 +276,20 @@ Status PipelineXTask::execute(bool* eos) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             return Status::OK();
         }
-        if (_wake_up_by_downstream) {
-            *eos = true;
-            return Status::OK();
-        }
         if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             return Status::OK();
         }
-        if (_wake_up_by_downstream) {
-            *eos = true;
-            return Status::OK();
-        }
     }
 
+    auto set_wake_up_and_dep_ready = [&]() {
+        if (wake_up_early()) {
+            return;
+        }
+        set_wake_up_early();
+        clear_blocking_state();
+    };
+
     Status status = Status::OK();
     set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
@@ -298,18 +297,10 @@ Status PipelineXTask::execute(bool* eos) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             break;
         }
-        if (_wake_up_by_downstream) {
-            *eos = true;
-            return Status::OK();
-        }
         if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             break;
         }
-        if (_wake_up_by_downstream) {
-            *eos = true;
-            return Status::OK();
-        }
 
         /// When a task is cancelled,
         /// its blocking state will be cleared and it will transition to a 
ready state (though it is not truly ready).
@@ -336,10 +327,15 @@ Status PipelineXTask::execute(bool* eos) {
                     Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task 
executing failed");
             return status;
         });
+
+        if (_sink->is_finished(_state)) {
+            set_wake_up_and_dep_ready();
+        }
+        // `_dry_run` means sink operator need no more data
+        *eos = wake_up_early() || _dry_run;
+
         // Pull block from operator chain
-        if (_dry_run || _sink->is_finished(_state)) {
-            *eos = true;
-        } else {
+        if (!*eos) {
             SCOPED_TIMER(_get_block_timer);
             _get_block_counter->update(1);
             try {
@@ -353,11 +349,14 @@ Status PipelineXTask::execute(bool* eos) {
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
             status = _sink->sink(_state, block, *eos);
-            if (!status.is<ErrorCode::END_OF_FILE>()) {
-                RETURN_IF_ERROR(status);
+            if (status.is<ErrorCode::END_OF_FILE>()) {
+                set_wake_up_and_dep_ready();
+            } else if (!status) {
+                return status;
             }
-            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
+
             if (*eos) { // just return, the scheduler will do finish work
+                RETURN_IF_ERROR(close(status, false));
                 break;
             }
         }
@@ -425,7 +424,7 @@ void PipelineXTask::finalize() {
     _le_state_map.clear();
 }
 
-Status PipelineXTask::close(Status exec_status) {
+Status PipelineXTask::close(Status exec_status, bool close_sink) {
     int64_t close_ns = 0;
     Defer defer {[&]() {
         if (_task_queue) {
@@ -435,7 +434,10 @@ Status PipelineXTask::close(Status exec_status) {
     Status s;
     {
         SCOPED_RAW_TIMER(&close_ns);
-        s = _sink->close(_state, exec_status);
+        if (close_sink) {
+            _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? 
"true" : "false");
+            s = _sink->close(_state, exec_status);
+        }
         for (auto& op : _operators) {
             auto tem = op->close(_state);
             if (!tem.ok() && s.ok()) {
@@ -468,13 +470,13 @@ std::string PipelineXTask::debug_string() {
     // If at the same time FE cancel this pipeline task and logging 
debug_string before _blocked_dep is cleared,
     // it will think _blocked_dep is not nullptr and call 
_blocked_dep->debug_string().
     auto* cur_blocked_dep = _blocked_dep;
-    fmt::format_to(
-            debug_string_buffer,
-            "PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
-            "= {}s], _wake_up_by_downstream = {}, block dependency = {}, is 
running = "
-            "{}\noperators: ",
-            (void*)this, get_state_name(_cur_state), _dry_run, elapsed, 
_wake_up_by_downstream,
-            cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : 
"NULL", is_running());
+    fmt::format_to(debug_string_buffer,
+                   "PipelineTask[this = {}, state = {}, dry run = {}, elapse 
time "
+                   "= {}s], _wake_up_early = {}, block dependency = {}, is 
running = "
+                   "{}\noperators: ",
+                   (void*)this, get_state_name(_cur_state), _dry_run, elapsed, 
_wake_up_early,
+                   cur_blocked_dep && !_finished ? 
cur_blocked_dep->debug_string() : "NULL",
+                   is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}",
                        _opened && !_finished ? 
_operators[i]->debug_string(_state, i)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 42dca15076a..0127e01b323 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -69,7 +69,7 @@ public:
 
     // if the pipeline create a bunch of pipeline task
     // must be call after all pipeline task is finish to release resource
-    Status close(Status exec_status) override;
+    Status close(Status exec_status, bool close_sink = true) override;
 
     Status close_sink(Status exec_status);
     bool source_can_read() override {
@@ -138,8 +138,7 @@ public:
 
     int task_id() const { return _index; };
 
-    void clear_blocking_state(bool wake_up_by_downstream = false) override {
-        _wake_up_by_downstream = _wake_up_by_downstream || 
wake_up_by_downstream;
+    void clear_blocking_state() override {
         
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
@@ -177,7 +176,9 @@ public:
 
     static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes);
 
-    bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
+    bool wake_up_early() const { return _wake_up_early; }
+
+    void set_wake_up_early() override { _wake_up_early = true; }
 
 private:
     friend class RuntimeFilterDependency;
@@ -255,7 +256,8 @@ private:
 
     std::atomic<bool> _finished {false};
     std::mutex _dependency_lock;
-    std::atomic<bool> _wake_up_by_downstream = false;
+    std::atomic<bool> _wake_up_early = false;
+    std::atomic<bool> _eos = false;
 };
 
 } // namespace doris::pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to