This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 725811e1d1 [Improvement](pipeline) Terminate early for short-circuit 
join (#23378) (#23396)
725811e1d1 is described below

commit 725811e1d15fcee0bed6accf08b5ac55e1813822
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 24 12:32:12 2023 +0800

    [Improvement](pipeline) Terminate early for short-circuit join (#23378) 
(#23396)
---
 be/src/exec/exec_node.h                |  2 ++
 be/src/pipeline/exec/operator.h        |  4 ++++
 be/src/pipeline/pipeline_task.cpp      |  8 +++----
 be/src/pipeline/pipeline_task.h        | 41 ++++++++++++++++++++++++++++++++--
 be/src/vec/exec/join/vjoin_node_base.h |  2 ++
 5 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ad7eb83074..ae7b40dc20 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -134,6 +134,8 @@ public:
 
     bool can_read() const { return _can_read; }
 
+    [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
     // Sink Data to ExecNode to do some stock work, both need impl with 
method: get_result
     // `eos` means source is exhausted, exec node should do some finalize work
     // Eg: Aggregation, Sort
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index acf55cb7bc..38ed45ed89 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -199,6 +199,8 @@ public:
 
     virtual bool can_write() { return false; } // for sink
 
+    [[nodiscard]] virtual bool can_terminate_early() { return false; }
+
     /**
      * The main method to execute a pipeline task.
      * Now it is a pull-based pipeline and operators pull data from its child 
by this method.
@@ -321,6 +323,8 @@ public:
 
     ~StreamingOperator() override = default;
 
+    [[nodiscard]] bool can_terminate_early() override { return 
_node->can_terminate_early(); }
+
     Status prepare(RuntimeState* state) override {
         _node->increase_ref();
         _use_projection = _node->has_output_row_descriptor();
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4d0fb49de8..645028a3dc 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -222,11 +222,11 @@ Status PipelineTask::execute(bool* eos) {
             set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
             return Status::OK();
         }
-        if (!_source->can_read()) {
+        if (!source_can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             return Status::OK();
         }
-        if (!_sink->can_write()) {
+        if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             return Status::OK();
         }
@@ -234,11 +234,11 @@ Status PipelineTask::execute(bool* eos) {
 
     this->set_begin_execute_time();
     while (!_fragment_context->is_canceled()) {
-        if (_data_state != SourceState::MORE_DATA && !_source->can_read()) {
+        if (_data_state != SourceState::MORE_DATA && !source_can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
             break;
         }
-        if (!_sink->can_write()) {
+        if (!sink_can_write()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SINK);
             break;
         }
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 62de1fd281..34382a3f7c 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -152,13 +152,50 @@ public:
         return false;
     }
 
-    bool source_can_read() { return _source->can_read(); }
+    bool source_can_read() {
+        return _source->can_read() || ignore_blocking_source();
+        ;
+    }
 
     bool runtime_filters_are_ready_or_timeout() {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    bool sink_can_write() { return _sink->can_write(); }
+    bool sink_can_write() { return _sink->can_write() || 
ignore_blocking_sink(); }
+    /**
+     * Consider the query plan below:
+     *
+     *      ExchangeSource     JoinBuild1
+     *            \              /
+     *         JoinProbe1 (Right Outer)    JoinBuild2
+     *                   \                   /
+     *                 JoinProbe2 (Right Outer)
+     *                          |
+     *                        Sink
+     *
+     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
+     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
+     *
+     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
+     * produce more data.
+     *
+     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
+     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
+     *
+     * In a nutshell, we should follow the rules:
+     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
+     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
+     */
+    [[nodiscard]] bool ignore_blocking_sink() { return 
_root->can_terminate_early(); }
+
+    [[nodiscard]] bool ignore_blocking_source() {
+        for (size_t i = 1; i < _operators.size(); i++) {
+            if (_operators[i]->can_terminate_early()) {
+                return true;
+            }
+        }
+        return false;
+    }
 
     Status finalize();
 
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index 120e77785e..8756c24d20 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -74,6 +74,8 @@ public:
 
     virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
 
+    [[nodiscard]] bool can_terminate_early() override { return 
_short_circuit_for_probe; }
+
 protected:
     // Construct the intermediate blocks to store the results from join 
operation.
     void _construct_mutable_join_block();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to