This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2975ef9d9b [Opt](pipeline) Refactor the short circuit of join pipeline 
(#23639) (#23728)
2975ef9d9b is described below

commit 2975ef9d9b7959da89724e6297edc16ea16063e8
Author: HappenLee <[email protected]>
AuthorDate: Fri Sep 1 07:58:28 2023 +0800

    [Opt](pipeline) Refactor the short circuit of join pipeline (#23639) 
(#23728)
    
    * [Opt](pipeline) Refactor the short circuit of join pipeline
---
 be/src/pipeline/pipeline.cpp                  |  6 +--
 be/src/pipeline/pipeline.h                    | 77 ++++++++++++++++++++++-----
 be/src/pipeline/pipeline_fragment_context.cpp |  8 ++-
 be/src/pipeline/pipeline_task.cpp             | 12 +++--
 be/src/pipeline/pipeline_task.h               | 48 ++---------------
 5 files changed, 83 insertions(+), 68 deletions(-)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index ccc7786442..80efbd154a 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -32,14 +32,14 @@ void Pipeline::_init_profile() {
     _pipeline_profile.reset(new RuntimeProfile(ss.str()));
 }
 
-Status Pipeline::build_operators(Operators& operators) {
+Status Pipeline::build_operators() {
     OperatorPtr pre;
     for (auto& operator_t : _operator_builders) {
         auto o = operator_t->build_operator();
         if (pre) {
             o->set_child(pre);
         }
-        operators.emplace_back(o);
+        _operators.emplace_back(o);
         pre = std::move(o);
     }
     return Status::OK();
@@ -64,4 +64,4 @@ Status Pipeline::set_sink(OperatorBuilderPtr& sink_) {
     return Status::OK();
 }
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 73b2c3850c..759e4fce2c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -45,31 +45,52 @@ using PipelineId = uint32_t;
 
 class Pipeline : public std::enable_shared_from_this<Pipeline> {
     friend class PipelineTask;
+    friend class PipelineXTask;
 
 public:
     Pipeline() = delete;
     explicit Pipeline(PipelineId pipeline_id, 
std::weak_ptr<PipelineFragmentContext> context)
-            : _complete_dependency(0), _pipeline_id(pipeline_id), 
_context(context) {
+            : _pipeline_id(pipeline_id), _context(context) {
         _init_profile();
     }
 
     void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
-        pipeline->_parents.push_back(weak_from_this());
-        _dependencies.push_back(pipeline);
+        pipeline->_parents.push_back({_operator_builders.size(), 
weak_from_this()});
+        _dependencies.push_back({_operator_builders.size(), pipeline});
     }
 
     // If all dependencies are finished, this pipeline task should be 
scheduled.
     // e.g. Hash join probe task will be scheduled once Hash join build task 
is finished.
-    bool finish_one_dependency(int dependency_core_id) {
-        DCHECK(_complete_dependency < _dependencies.size());
-        bool finish = _complete_dependency.fetch_add(1) == 
_dependencies.size() - 1;
-        if (finish) {
+    void finish_one_dependency(int dep_opr, int dependency_core_id) {
+        std::lock_guard l(_depend_mutex);
+        if (!_operators.empty() && _operators[dep_opr - 
1]->can_terminate_early()) {
+            _always_can_read = true;
+            _always_can_write = (dep_opr == _operators.size());
+
+            for (int i = 0; i < _dependencies.size(); ++i) {
+                if (dep_opr == _dependencies[i].first) {
+                    _dependencies.erase(_dependencies.begin(), 
_dependencies.begin() + i + 1);
+                    break;
+                }
+            }
+        } else {
+            for (int i = 0; i < _dependencies.size(); ++i) {
+                if (dep_opr == _dependencies[i].first) {
+                    _dependencies.erase(_dependencies.begin() + i);
+                    break;
+                }
+            }
+        }
+
+        if (_dependencies.empty()) {
             _previous_schedule_id = dependency_core_id;
         }
-        return finish;
     }
 
-    bool has_dependency() { return _complete_dependency.load() < 
_dependencies.size(); }
+    bool has_dependency() {
+        std::lock_guard l(_depend_mutex);
+        return !_dependencies.empty();
+    }
 
     Status add_operator(OperatorBuilderPtr& op);
 
@@ -77,25 +98,53 @@ public:
 
     OperatorBuilderBase* sink() { return _sink.get(); }
 
-    Status build_operators(Operators&);
+    Status build_operators();
 
     RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
 
 private:
     void _init_profile();
-    std::atomic<uint32_t> _complete_dependency;
 
     OperatorBuilders _operator_builders; // left is _source, right is _root
     OperatorBuilderPtr _sink;            // put block to sink
 
-    std::vector<std::weak_ptr<Pipeline>> _parents;
-    std::vector<std::shared_ptr<Pipeline>> _dependencies;
+    std::mutex _depend_mutex;
+    std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
+    std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
 
     PipelineId _pipeline_id;
     std::weak_ptr<PipelineFragmentContext> _context;
     int _previous_schedule_id = -1;
 
     std::unique_ptr<RuntimeProfile> _pipeline_profile;
+
+    Operators _operators;
+    /**
+     * Consider the query plan below:
+     *
+     *      ExchangeSource     JoinBuild1
+     *            \              /
+     *         JoinProbe1 (Right Outer)    JoinBuild2
+     *                   \                   /
+     *                 JoinProbe2 (Right Outer)
+     *                          |
+     *                        Sink
+     *
+     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
+     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
+     *
+     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
+     * produce more data.
+     *
+     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
+     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
+     *
+     * In a nutshell, we should follow the rules:
+     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
+     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
+     */
+    bool _always_can_read = false;
+    bool _always_can_write = false;
 };
 
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a5a93775a9..0e84131401 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -337,11 +337,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
         // TODO pipeline 1 need to add new interface for exec node and operator
         sink->init(request.fragment.output_sink);
 
-        Operators operators;
-        RETURN_IF_ERROR(pipeline->build_operators(operators));
-        auto task =
-                std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
-                                               operators, sink, this, 
pipeline->pipeline_profile());
+        RETURN_IF_ERROR(pipeline->build_operators());
+        auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, 
_runtime_state.get(),
+                                                   sink, this, 
pipeline->pipeline_profile());
         sink->set_child(task->get_root());
         _tasks.emplace_back(std::move(task));
         _runtime_profile->add_child(pipeline->pipeline_profile(), true, 
nullptr);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 645028a3dc..7a16c07d13 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -44,12 +44,18 @@ class TaskGroup;
 namespace doris::pipeline {
 
 PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t index, 
RuntimeState* state,
-                           Operators& operators, OperatorPtr& sink,
-                           PipelineFragmentContext* fragment_context,
+                           OperatorPtr& sink, PipelineFragmentContext* 
fragment_context,
                            RuntimeProfile* parent_profile)
         : _index(index),
           _pipeline(pipeline),
-          _operators(operators),
+          _prepared(false),
+          _opened(false),
+          _state(state),
+          _cur_state(PipelineTaskState::NOT_READY),
+          _data_state(SourceState::DEPEND_ON_SOURCE),
+          _fragment_context(fragment_context),
+          _parent_profile(parent_profile),
+          _operators(pipeline->_operators),
           _source(_operators.front()),
           _root(_operators.back()),
           _sink(sink),
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 34382a3f7c..5e7b436202 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -110,9 +110,8 @@ class TaskQueue;
 // The class do the pipeline task. Minest schdule union by task scheduler
 class PipelineTask {
 public:
-    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, 
Operators& operators,
-                 OperatorPtr& sink, PipelineFragmentContext* fragment_context,
-                 RuntimeProfile* parent_profile);
+    PipelineTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state, 
OperatorPtr& sink,
+                 PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile);
 
     Status prepare(RuntimeState* state);
 
@@ -152,50 +151,13 @@ public:
         return false;
     }
 
-    bool source_can_read() {
-        return _source->can_read() || ignore_blocking_source();
-        ;
-    }
+    bool source_can_read() { return _source->can_read() || 
_pipeline->_always_can_read; }
 
     bool runtime_filters_are_ready_or_timeout() {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    bool sink_can_write() { return _sink->can_write() || 
ignore_blocking_sink(); }
-    /**
-     * Consider the query plan below:
-     *
-     *      ExchangeSource     JoinBuild1
-     *            \              /
-     *         JoinProbe1 (Right Outer)    JoinBuild2
-     *                   \                   /
-     *                 JoinProbe2 (Right Outer)
-     *                          |
-     *                        Sink
-     *
-     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
-     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
-     *
-     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
-     * produce more data.
-     *
-     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
-     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
-     *
-     * In a nutshell, we should follow the rules:
-     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
-     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
-     */
-    [[nodiscard]] bool ignore_blocking_sink() { return 
_root->can_terminate_early(); }
-
-    [[nodiscard]] bool ignore_blocking_source() {
-        for (size_t i = 1; i < _operators.size(); i++) {
-            if (_operators[i]->can_terminate_early()) {
-                return true;
-            }
-        }
-        return false;
-    }
+    bool sink_can_write() { return _sink->can_write() || 
_pipeline->_always_can_write; }
 
     Status finalize();
 
@@ -283,7 +245,7 @@ public:
 private:
     void _finish_p_dependency() {
         for (const auto& p : _pipeline->_parents) {
-            p.lock()->finish_one_dependency(_previous_schedule_id);
+            p.second.lock()->finish_one_dependency(p.first, 
_previous_schedule_id);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to