This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 96164f3bdc [pipelinex](sort) Fix expression initialization order 
(#23405)
96164f3bdc is described below

commit 96164f3bdcac9dc8dfebfe318de6c3b2f94f6480
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Thu Aug 24 17:29:24 2023 +0800

    [pipelinex](sort) Fix expression initialization order (#23405)
---
 be/src/pipeline/exec/operator.h              |  4 ++
 be/src/pipeline/exec/sort_sink_operator.cpp  |  3 +-
 be/src/pipeline/pipeline.cpp                 |  4 +-
 be/src/pipeline/pipeline_task.h              | 74 ++++++++++++++--------------
 be/src/pipeline/pipeline_x/pipeline_x_task.h | 17 ++++++-
 5 files changed, 60 insertions(+), 42 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c6e41982a6..a03297bcf2 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -603,6 +603,10 @@ public:
 
     Status finalize(RuntimeState* state) override { return Status::OK(); }
 
+    [[nodiscard]] bool can_terminate_early() override { return false; }
+
+    [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { 
return false; }
+
     bool can_read() override {
         LOG(FATAL) << "should not reach here!";
         return false;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 12f0d8a753..182ce7bd71 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -33,6 +33,7 @@ Status SortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _dependency = (SortDependency*)info.dependency;
     _shared_state = (SortSharedState*)_dependency->shared_state();
 
+    RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
     _profile = p._pool->add(new RuntimeProfile("SortSinkLocalState"));
     switch (p._algorithm) {
     case SortAlgorithm::HEAP_SORT: {
@@ -70,7 +71,7 @@ Status SortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     _child_get_next_timer = ADD_TIMER(_profile, "ChildGetResultTime");
     _sink_timer = ADD_TIMER(_profile, "PartialSortTotalTime");
 
-    return p._vsort_exec_exprs.clone(state, _vsort_exec_exprs);
+    return Status::OK();
 }
 
 SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 1d1dd627ec..69eaba3fbb 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -63,8 +63,8 @@ Status Pipeline::add_operator(OperatorXPtr& op) {
 
 Status Pipeline::prepare(RuntimeState* state) {
     // TODO
-    RETURN_IF_ERROR(_operators[_operators.size() - 1]->prepare(state));
-    RETURN_IF_ERROR(_operators[_operators.size() - 1]->open(state));
+    RETURN_IF_ERROR(_operators.back()->prepare(state));
+    RETURN_IF_ERROR(_operators.back()->open(state));
     RETURN_IF_ERROR(_sink_x->prepare(state));
     RETURN_IF_ERROR(_sink_x->open(state));
     return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index dea3cd27ff..57d7659197 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -155,48 +155,13 @@ public:
         return false;
     }
 
-    virtual bool source_can_read() { return _source->can_read() || 
ignore_blocking_source(); }
+    virtual bool source_can_read() { return _source->can_read() || 
_ignore_blocking_source(); }
 
     virtual bool runtime_filters_are_ready_or_timeout() {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    /**
-     * Consider the query plan below:
-     *
-     *      ExchangeSource     JoinBuild1
-     *            \              /
-     *         JoinProbe1 (Right Outer)    JoinBuild2
-     *                   \                   /
-     *                 JoinProbe2 (Right Outer)
-     *                          |
-     *                        Sink
-     *
-     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
-     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
-     *
-     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
-     * produce more data.
-     *
-     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
-     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
-     *
-     * In a nutshell, we should follow the rules:
-     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
-     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
-     */
-    [[nodiscard]] virtual bool ignore_blocking_sink() { return 
_root->can_terminate_early(); }
-
-    [[nodiscard]] virtual bool ignore_blocking_source() {
-        for (size_t i = 1; i < _operators.size(); i++) {
-            if (_operators[i]->can_terminate_early()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    virtual bool sink_can_write() { return _sink->can_write() || 
ignore_blocking_sink(); }
+    virtual bool sink_can_write() { return _sink->can_write() || 
_ignore_blocking_sink(); }
 
     virtual Status finalize();
 
@@ -381,6 +346,41 @@ protected:
     RuntimeProfile::Counter* _pip_task_total_timer;
 
 private:
+    /**
+     * Consider the query plan below:
+     *
+     *      ExchangeSource     JoinBuild1
+     *            \              /
+     *         JoinProbe1 (Right Outer)    JoinBuild2
+     *                   \                   /
+     *                 JoinProbe2 (Right Outer)
+     *                          |
+     *                        Sink
+     *
+     * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should 
not be blocked by ExchangeSource
+     * because we have a determined conclusion that JoinProbe1/JoinProbe2 will 
also output 0 rows.
+     *
+     * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked 
by Sink because JoinProbe2 will
+     * produce more data.
+     *
+     * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be 
blocked by ExchangeSource
+     * and Sink because JoinProbe2 will always produce 0 rows and terminate 
early.
+     *
+     * In a nutshell, we should follow the rules:
+     * 1. if any operator in pipeline can terminate early, this task should 
never be blocked by source operator.
+     * 2. if the last operator (except sink) can terminate early, this task 
should never be blocked by sink operator.
+     */
+    [[nodiscard]] bool _ignore_blocking_sink() { return 
_root->can_terminate_early(); }
+
+    [[nodiscard]] bool _ignore_blocking_source() {
+        for (size_t i = 1; i < _operators.size(); i++) {
+            if (_operators[i]->can_terminate_early()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     Operators _operators; // left is _source, right is _root
     OperatorPtr _source;
     OperatorPtr _root;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index 74688fdc90..1453b10ba2 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -63,13 +63,15 @@ public:
     // must be call after all pipeline task is finish to release resource
     Status close() override;
 
-    bool source_can_read() override { return _source->can_read(_state); }
+    bool source_can_read() override {
+        return _source->can_read(_state) || _ignore_blocking_source();
+    }
 
     bool runtime_filters_are_ready_or_timeout() override {
         return _source->runtime_filters_are_ready_or_timeout();
     }
 
-    bool sink_can_write() override { return _sink->can_write(_state); }
+    bool sink_can_write() override { return _sink->can_write(_state) || 
_ignore_blocking_sink(); }
 
     Status finalize() override;
 
@@ -100,6 +102,17 @@ public:
     }
 
 private:
+    [[nodiscard]] bool _ignore_blocking_sink() { return 
_root->can_terminate_early(_state); }
+
+    [[nodiscard]] bool _ignore_blocking_source() {
+        for (size_t i = 1; i < _operators.size(); i++) {
+            if (_operators[i]->can_terminate_early(_state)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     using DependencyMap = std::map<int, DependencySPtr>;
     Status _open() override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to