This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 8ecf69b09b [pipeline](regression) nested loop join test get error result in pipeline engine and refactor the code for need more input data (#15208) 8ecf69b09b is described below commit 8ecf69b09b034612967223a35cb65bad5c742967 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Dec 21 19:03:51 2022 +0800 [pipeline](regression) nested loop join test get error result in pipeline engine and refactor the code for need more input data (#15208) --- be/src/pipeline/exec/operator.h | 22 +++--- be/src/pipeline/exec/repeat_operator.cpp | 11 +++ be/src/pipeline/exec/repeat_operator.h | 4 ++ be/src/vec/exec/join/vhash_join_node.cpp | 29 ++++---- be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +- be/src/vec/exec/scan/vscan_node.cpp | 3 +- be/src/vec/exec/vrepeat_node.cpp | 90 +++++++++++-------------- be/src/vec/exec/vrepeat_node.h | 5 +- be/src/vec/exec/vtable_function_node.cpp | 34 ++++------ be/src/vec/exec/vtable_function_node.h | 4 +- 10 files changed, 95 insertions(+), 109 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 652486aa34..58c36b3a51 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -432,24 +432,24 @@ public: auto& child = StreamingOperator<OperatorBuilderType>::_child; if (node->need_more_input_data()) { + _child_block->clear_column_data(); RETURN_IF_ERROR(child->get_block(state, _child_block.get(), _child_source_state)); source_state = _child_source_state; - if (_child_block->rows() == 0 && source_state != SourceState::FINISHED) { + if (_child_block->rows() == 0 && _child_source_state != SourceState::FINISHED) { return Status::OK(); } node->prepare_for_next(); - node->push(state, _child_block.get(), source_state == SourceState::FINISHED); + node->push(state, _child_block.get(), _child_source_state == SourceState::FINISHED); } - bool eos = false; - RETURN_IF_ERROR(node->pull(state, block, &eos)); - if (eos) { - source_state = SourceState::FINISHED; - _child_block->clear_column_data(); - } else if (!node->need_more_input_data()) { - source_state = SourceState::MORE_DATA; - } else { - _child_block->clear_column_data(); + if (!node->need_more_input_data()) { + bool eos = false; + RETURN_IF_ERROR(node->pull(state, block, &eos)); + if (eos) { + source_state = SourceState::FINISHED; + } else if (!node->need_more_input_data()) { + source_state = SourceState::MORE_DATA; + } } return Status::OK(); } diff --git a/be/src/pipeline/exec/repeat_operator.cpp b/be/src/pipeline/exec/repeat_operator.cpp index def1f6da9d..d2c9f0a1e2 100644 --- a/be/src/pipeline/exec/repeat_operator.cpp +++ b/be/src/pipeline/exec/repeat_operator.cpp @@ -23,4 +23,15 @@ namespace doris::pipeline { OPERATOR_CODE_GENERATOR(RepeatOperator, StatefulOperator) +Status RepeatOperator::prepare(doris::RuntimeState* state) { + // just for speed up, the way is dangerous + _child_block.reset(_node->get_child_block()); + return StatefulOperator::prepare(state); +} + +Status RepeatOperator::close(doris::RuntimeState* state) { + _child_block.release(); + return StatefulOperator::close(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/repeat_operator.h b/be/src/pipeline/exec/repeat_operator.h index 15707ea39c..b397ea05d3 100644 --- a/be/src/pipeline/exec/repeat_operator.h +++ b/be/src/pipeline/exec/repeat_operator.h @@ -37,6 +37,10 @@ public: class RepeatOperator final : public StatefulOperator<RepeatOperatorBuilder> { public: RepeatOperator(OperatorBuilderBase* operator_builder, ExecNode* repeat_node); + + Status prepare(RuntimeState* state) override; + + Status close(RuntimeState* state) override; }; } // namespace pipeline diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 337aec7b1e..fc2a7ed4dc 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -597,24 +597,19 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo *eos = true; return Status::OK(); } - if (need_more_input_data()) { + while (need_more_input_data()) { prepare_for_next(); - do { - SCOPED_TIMER(_probe_next_timer); - RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next_after_projects( - state, &_probe_block, &_probe_eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, - bool*)) & - ExecNode::get_next, - _children[0], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)), - child(0)->get_next_span(), _probe_eos); - } while (_probe_block.rows() == 0 && !_probe_eos); - - if (_probe_block.rows() != 0) { - RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); - } + SCOPED_TIMER(_probe_next_timer); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects( + state, &_probe_block, &_probe_eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[0], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)), + child(0)->get_next_span(), _probe_eos); + + RETURN_IF_ERROR(push(state, &_probe_block, _probe_eos)); } return pull(state, output_block, eos); diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 82cb6f8605..7fc43fcf19 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -671,7 +671,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block, } bool VNestedLoopJoinNode::need_more_input_data() const { - return _need_more_input_data; + return _need_more_input_data and !_left_side_eos; } void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) { diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 8d436bb733..2f35cd47fc 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -262,8 +262,7 @@ Status VScanNode::_acquire_runtime_filter(bool wait) { !_runtime_filter_ctxs[i].apply_mark) { _blocked_by_rf = true; } else if (!_runtime_filter_ctxs[i].apply_mark) { - DCHECK(!_blocked_by_rf && - runtime_filter->current_state() != RuntimeFilterState::NOT_READY); + DCHECK(runtime_filter->current_state() != RuntimeFilterState::NOT_READY); _is_all_rf_applied = false; } } diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 961e5aad4b..01db7d29ef 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -55,7 +55,6 @@ Status VRepeatNode::prepare(RuntimeState* state) { for (const auto& slot_desc : _output_tuple_desc->slots()) { _output_slots.push_back(slot_desc); } - _child_block.reset(new Block()); return Status::OK(); } @@ -181,50 +180,51 @@ Status VRepeatNode::pull(doris::RuntimeState* state, vectorized::Block* output_b } DCHECK(output_block->rows() == 0); - if (!_intermediate_block || _intermediate_block->rows() == 0) { - return Status::OK(); - } + if (_intermediate_block && _intermediate_block->rows() > 0) { + RETURN_IF_ERROR( + get_repeated_block(_intermediate_block.get(), _repeat_id_idx, output_block)); - RETURN_IF_ERROR(get_repeated_block(_intermediate_block.get(), _repeat_id_idx, output_block)); + _repeat_id_idx++; - _repeat_id_idx++; - - int size = _repeat_id_list.size(); - if (_repeat_id_idx >= size) { - _intermediate_block->clear(); - release_block_memory(*_child_block); - _repeat_id_idx = 0; + int size = _repeat_id_list.size(); + if (_repeat_id_idx >= size) { + _intermediate_block->clear(); + release_block_memory(_child_block); + _repeat_id_idx = 0; + } } + *eos = _child_eos && _child_block.rows() == 0; reached_limit(output_block, eos); COUNTER_SET(_rows_returned_counter, _num_rows_returned); return Status::OK(); } Status VRepeatNode::push(RuntimeState* state, vectorized::Block* input_block, bool eos) { - if (input_block->rows() == 0) { - return Status::OK(); - } + _child_eos = eos; DCHECK(!_intermediate_block || _intermediate_block->rows() == 0); DCHECK(!_expr_ctxs.empty()); - _intermediate_block.reset(new Block()); - - for (auto expr : _expr_ctxs) { - int result_column_id = -1; - RETURN_IF_ERROR(expr->execute(input_block, &result_column_id)); - DCHECK(result_column_id != -1); - input_block->get_by_position(result_column_id).column = - input_block->get_by_position(result_column_id) - .column->convert_to_full_column_if_const(); - _intermediate_block->insert(input_block->get_by_position(result_column_id)); + + if (input_block->rows() > 0) { + _intermediate_block.reset(new Block()); + + for (auto expr : _expr_ctxs) { + int result_column_id = -1; + RETURN_IF_ERROR(expr->execute(input_block, &result_column_id)); + DCHECK(result_column_id != -1); + input_block->get_by_position(result_column_id).column = + input_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + _intermediate_block->insert(input_block->get_by_position(result_column_id)); + } + DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns()); } - DCHECK_EQ(_expr_ctxs.size(), _intermediate_block->columns()); return Status::OK(); } bool VRepeatNode::need_more_input_data() { - return !_intermediate_block || _intermediate_block->rows() == 0; + return !_child_block.rows() && !_child_eos; } Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { @@ -241,26 +241,17 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { DCHECK(_repeat_id_idx <= (int)v.size()); } DCHECK(block->rows() == 0); - - if (need_more_input_data()) { - while (_child_block->rows() == 0 && !_child_eos) { - RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next_after_projects( - state, _child_block.get(), &_child_eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, - bool*)) & - ExecNode::get_next, - _children[0], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)), - child(0)->get_next_span(), _child_eos); - } - - if (_child_eos and _child_block->rows() == 0) { - *eos = true; - return Status::OK(); - } - - push(state, _child_block.get(), *eos); + while (need_more_input_data()) { + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects( + state, &_child_block, &_child_eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[0], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)), + child(0)->get_next_span(), _child_eos); + + push(state, &_child_block, _child_eos); } return pull(state, block, eos); @@ -294,9 +285,4 @@ void VRepeatNode::debug_string(int indentation_level, std::stringstream* out) co *out << ")"; } -void VRepeatNode::_release_mem() { - _child_block = nullptr; - _intermediate_block = nullptr; -} - } // namespace doris::vectorized diff --git a/be/src/vec/exec/vrepeat_node.h b/be/src/vec/exec/vrepeat_node.h index 53eb025cc1..394690d729 100644 --- a/be/src/vec/exec/vrepeat_node.h +++ b/be/src/vec/exec/vrepeat_node.h @@ -46,6 +46,7 @@ public: Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) override; bool need_more_input_data(); + Block* get_child_block() { return &_child_block; } protected: virtual void debug_string(int indentation_level, std::stringstream* out) const override; @@ -53,8 +54,6 @@ protected: private: Status get_repeated_block(Block* child_block, int repeat_id_idx, Block* output_block); - void _release_mem(); - // Slot id set used to indicate those slots need to set to null. std::vector<std::set<SlotId>> _slot_id_set_list; // all slot id @@ -65,7 +64,7 @@ private: TupleId _output_tuple_id; const TupleDescriptor* _output_tuple_desc; - std::unique_ptr<Block> _child_block {}; + Block _child_block; std::unique_ptr<Block> _intermediate_block {}; std::vector<SlotDescriptor*> _output_slots; diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 9336040f9a..c26bfdba21 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -83,29 +83,20 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos RETURN_IF_CANCELLED(state); // if child_block is empty, get data from child. - if (need_more_input_data()) { - while (_child_block.rows() == 0 && !_child_eos) { - RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next_after_projects( - state, &_child_block, &_child_eos, - std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, - bool*)) & - ExecNode::get_next, - _children[0], std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3)), - child(0)->get_next_span(), _child_eos); - } - if (_child_eos && _child_block.rows() == 0) { - *eos = true; - return Status::OK(); - } - - push(state, &_child_block, *eos); + while (need_more_input_data()) { + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects( + state, &_child_block, &_child_eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[0], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3)), + child(0)->get_next_span(), _child_eos); + + push(state, &_child_block, _child_eos); } - pull(state, block, eos); - - return Status::OK(); + return pull(state, block, eos); } Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output_block, bool* eos) { @@ -204,6 +195,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); + *eos = _child_eos && _cur_child_offset == -1; return Status::OK(); } diff --git a/be/src/vec/exec/vtable_function_node.h b/be/src/vec/exec/vtable_function_node.h index 451a35c739..c831e55856 100644 --- a/be/src/vec/exec/vtable_function_node.h +++ b/be/src/vec/exec/vtable_function_node.h @@ -30,10 +30,10 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; Status get_next(RuntimeState* state, Block* block, bool* eos) override; - - bool need_more_input_data() { return !_child_block.rows(); } + bool need_more_input_data() { return !_child_block.rows() && !_child_eos; } Status push(RuntimeState*, vectorized::Block* input_block, bool eos) override { + _child_eos = eos; if (input_block->rows() == 0) { return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org