This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 725811e1d1 [Improvement](pipeline) Terminate early for short-circuit join (#23378) (#23396) 725811e1d1 is described below commit 725811e1d15fcee0bed6accf08b5ac55e1813822 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Aug 24 12:32:12 2023 +0800 [Improvement](pipeline) Terminate early for short-circuit join (#23378) (#23396) --- be/src/exec/exec_node.h | 2 ++ be/src/pipeline/exec/operator.h | 4 ++++ be/src/pipeline/pipeline_task.cpp | 8 +++---- be/src/pipeline/pipeline_task.h | 41 ++++++++++++++++++++++++++++++++-- be/src/vec/exec/join/vjoin_node_base.h | 2 ++ 5 files changed, 51 insertions(+), 6 deletions(-) diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index ad7eb83074..ae7b40dc20 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -134,6 +134,8 @@ public: bool can_read() const { return _can_read; } + [[nodiscard]] virtual bool can_terminate_early() { return false; } + // Sink Data to ExecNode to do some stock work, both need impl with method: get_result // `eos` means source is exhausted, exec node should do some finalize work // Eg: Aggregation, Sort diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index acf55cb7bc..38ed45ed89 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -199,6 +199,8 @@ public: virtual bool can_write() { return false; } // for sink + [[nodiscard]] virtual bool can_terminate_early() { return false; } + /** * The main method to execute a pipeline task. * Now it is a pull-based pipeline and operators pull data from its child by this method. @@ -321,6 +323,8 @@ public: ~StreamingOperator() override = default; + [[nodiscard]] bool can_terminate_early() override { return _node->can_terminate_early(); } + Status prepare(RuntimeState* state) override { _node->increase_ref(); _use_projection = _node->has_output_row_descriptor(); diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 4d0fb49de8..645028a3dc 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -222,11 +222,11 @@ Status PipelineTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } - if (!_source->can_read()) { + if (!source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); return Status::OK(); } @@ -234,11 +234,11 @@ Status PipelineTask::execute(bool* eos) { this->set_begin_execute_time(); while (!_fragment_context->is_canceled()) { - if (_data_state != SourceState::MORE_DATA && !_source->can_read()) { + if (_data_state != SourceState::MORE_DATA && !source_can_read()) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } - if (!_sink->can_write()) { + if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 62de1fd281..34382a3f7c 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -152,13 +152,50 @@ public: return false; } - bool source_can_read() { return _source->can_read(); } + bool source_can_read() { + return _source->can_read() || ignore_blocking_source(); + ; + } bool runtime_filters_are_ready_or_timeout() { return _source->runtime_filters_are_ready_or_timeout(); } - bool sink_can_write() { return _sink->can_write(); } + bool sink_can_write() { return _sink->can_write() || ignore_blocking_sink(); } + /** + * Consider the query plan below: + * + * ExchangeSource JoinBuild1 + * \ / + * JoinProbe1 (Right Outer) JoinBuild2 + * \ / + * JoinProbe2 (Right Outer) + * | + * Sink + * + * Assume JoinBuild1/JoinBuild2 outputs 0 rows, this pipeline task should not be blocked by ExchangeSource + * because we have a determined conclusion that JoinProbe1/JoinProbe2 will also output 0 rows. + * + * Assume JoinBuild2 outputs > 0 rows, this pipeline task may be blocked by Sink because JoinProbe2 will + * produce more data. + * + * Assume both JoinBuild2 outputs 0 rows this pipeline task should not be blocked by ExchangeSource + * and Sink because JoinProbe2 will always produce 0 rows and terminate early. + * + * In a nutshell, we should follow the rules: + * 1. if any operator in pipeline can terminate early, this task should never be blocked by source operator. + * 2. if the last operator (except sink) can terminate early, this task should never be blocked by sink operator. + */ + [[nodiscard]] bool ignore_blocking_sink() { return _root->can_terminate_early(); } + + [[nodiscard]] bool ignore_blocking_source() { + for (size_t i = 1; i < _operators.size(); i++) { + if (_operators[i]->can_terminate_early()) { + return true; + } + } + return false; + } Status finalize(); diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h index 120e77785e..8756c24d20 100644 --- a/be/src/vec/exec/join/vjoin_node_base.h +++ b/be/src/vec/exec/join/vjoin_node_base.h @@ -74,6 +74,8 @@ public: virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + [[nodiscard]] bool can_terminate_early() override { return _short_circuit_for_probe; } + protected: // Construct the intermediate blocks to store the results from join operation. void _construct_mutable_join_block(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org