This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 96164f3bdc [pipelinex](sort) Fix expression initialization order (#23405) 96164f3bdc is described below commit 96164f3bdcac9dc8dfebfe318de6c3b2f94f6480 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 24 17:29:24 2023 +0800 [pipelinex](sort) Fix expression initialization order (#23405) --- be/src/pipeline/exec/operator.h | 4 ++ be/src/pipeline/exec/sort_sink_operator.cpp | 3 +- be/src/pipeline/pipeline.cpp | 4 +- be/src/pipeline/pipeline_task.h | 74 ++++++++++++++-------------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 17 ++++++- 5 files changed, 60 insertions(+), 42 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index c6e41982a6..a03297bcf2 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -603,6 +603,10 @@ public: Status finalize(RuntimeState* state) override { return Status::OK(); } + [[nodiscard]] bool can_terminate_early() override { return false; } + + [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } + bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 12f0d8a753..182ce7bd71 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -33,6 +33,7 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _dependency = (SortDependency*)info.dependency; _shared_state = (SortSharedState*)_dependency->shared_state(); + RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs)); _profile = p._pool->add(new RuntimeProfile("SortSinkLocalState")); switch (p._algorithm) { case SortAlgorithm::HEAP_SORT: { @@ -70,7 +71,7 @@ Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { _child_get_next_timer = ADD_TIMER(_profile, "ChildGetResultTime"); _sink_timer = ADD_TIMER(_profile, "PartialSortTotalTime"); - return p._vsort_exec_exprs.clone(state, _vsort_exec_exprs); + return Status::OK(); } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode, diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 1d1dd627ec..69eaba3fbb 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -63,8 +63,8 @@ Status Pipeline::add_operator(OperatorXPtr& op) { Status Pipeline::prepare(RuntimeState* state) { // TODO - RETURN_IF_ERROR(_operators[_operators.size() - 1]->prepare(state)); - RETURN_IF_ERROR(_operators[_operators.size() - 1]->open(state)); + RETURN_IF_ERROR(_operators.back()->prepare(state)); + RETURN_IF_ERROR(_operators.back()->open(state)); RETURN_IF_ERROR(_sink_x->prepare(state)); RETURN_IF_ERROR(_sink_x->open(state)); return Status::OK(); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index dea3cd27ff..57d7659197 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -155,48 +155,13 @@ public: return false; } - virtual bool source_can_read() { return _source->can_read() || ignore_blocking_source(); } + virtual bool source_can_read() { return _source->can_read() || _ignore_blocking_source(); } virtual bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); } - /** - * Consider the query plan below: - * - * ExchangeSource JoinBuild1 - * \ / - * JoinProbe1 (Right Outer) JoinBuild2 - * \ / - * JoinProbe2 (Right Outer) - * | - * Sink - * - * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource - * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows. - * - * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will - * produce more data. - * - * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource - * and Sink because JoinProbe2 will always produce 0 rows and terminate early. - * - * In a nutshell, we should follow the rules: - * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. - * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. - */ - [[nodiscard]] virtual bool ignore_blocking_sink() { return _root->can_terminate_early(); } - - [[nodiscard]] virtual bool ignore_blocking_source() { - for (size_t i = 1; i < _operators.size(); i++) { - if (_operators[i]->can_terminate_early()) { - return true; - } - } - return false; - } - - virtual bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); } + virtual bool sink_can_write() { return _sink->can_write() || _ignore_blocking_sink(); } virtual Status finalize(); @@ -381,6 +346,41 @@ protected: RuntimeProfile::Counter* _pip_task_total_timer; private: + /** + * Consider the query plan below: + * + * ExchangeSource JoinBuild1 + * \ / + * JoinProbe1 (Right Outer) JoinBuild2 + * \ / + * JoinProbe2 (Right Outer) + * | + * Sink + * + * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource + * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows. + * + * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will + * produce more data. + * + * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource + * and Sink because JoinProbe2 will always produce 0 rows and terminate early. + * + * In a nutshell, we should follow the rules: + * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. + * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. + */ + [[nodiscard]] bool _ignore_blocking_sink() { return _root->can_terminate_early(); } + + [[nodiscard]] bool _ignore_blocking_source() { + for (size_t i = 1; i < _operators.size(); i++) { + if (_operators[i]->can_terminate_early()) { + return true; + } + } + return false; + } + Operators _operators; // left is _source, right is _root OperatorPtr _source; OperatorPtr _root; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 74688fdc90..1453b10ba2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -63,13 +63,15 @@ public: // must be call after all pipeline task is finish to release resource Status close() override; - bool source_can_read() override { return _source->can_read(_state); } + bool source_can_read() override { + return _source->can_read(_state) || _ignore_blocking_source(); + } bool runtime_filters_are_ready_or_timeout() override { return _source->runtime_filters_are_ready_or_timeout(); } - bool sink_can_write() override { return _sink->can_write(_state); } + bool sink_can_write() override { return _sink->can_write(_state) || _ignore_blocking_sink(); } Status finalize() override; @@ -100,6 +102,17 @@ public: } private: + [[nodiscard]] bool _ignore_blocking_sink() { return _root->can_terminate_early(_state); } + + [[nodiscard]] bool _ignore_blocking_source() { + for (size_t i = 1; i < _operators.size(); i++) { + if (_operators[i]->can_terminate_early(_state)) { + return true; + } + } + return false; + } + using DependencyMap = std::map<int, DependencySPtr>; Status _open() override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org