This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch repair_outer_join_0714 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/repair_outer_join_0714 by this push: new 3918e1fdd8 save be code (#10835) 3918e1fdd8 is described below commit 3918e1fdd8d5eef84942e4f5867022d892773495 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Jul 14 16:06:02 2022 +0800 save be code (#10835) Co-authored-by: lihaopeng <lihaop...@baidu.com> --- be/src/exec/mysql_scan_node.cpp | 2 +- be/src/runtime/mysql_table_writer.cpp | 3 +- be/src/vec/exec/join/vhash_join_node.cpp | 73 +++++++++++++++----------------- be/src/vec/exec/join/vhash_join_node.h | 6 +-- 4 files changed, 40 insertions(+), 44 deletions(-) diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp index e28d7a1603..f0a24da123 100644 --- a/be/src/exec/mysql_scan_node.cpp +++ b/be/src/exec/mysql_scan_node.cpp @@ -138,7 +138,7 @@ Status MysqlScanNode::write_text_slot(char* value, int value_length, SlotDescrip if (!_text_converter->write_slot(slot, _tuple, value, value_length, true, false, _tuple_pool.get())) { return Status::InternalError("Fail to convert mysql value:'{}' to {} on column:`{}`", value, - slot->type(), slot->col_name()); + slot->type().type, slot->col_name()); } return Status::OK(); diff --git a/be/src/runtime/mysql_table_writer.cpp b/be/src/runtime/mysql_table_writer.cpp index 3f921db2a4..0c93f3246a 100644 --- a/be/src/runtime/mysql_table_writer.cpp +++ b/be/src/runtime/mysql_table_writer.cpp @@ -146,8 +146,7 @@ Status MysqlTableWriter::insert_row(TupleRow* row) { } default: { - return Status::InternalError("can't convert this type to mysql type. type = {}", - _output_expr_ctxs[i]->root()->type()); + return Status::InternalError("can't convert this type to mysql type. type = {}", 1); } } } diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 803b954ce6..5f15982c78 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -672,6 +672,9 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids : std::vector<SlotId> {}), + _intermediate_row_desc( + descs, tnode.hash_join_node.vintermediate_tuple_id_list, + std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())), _output_row_desc(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}) { _runtime_filter_descs = tnode.runtime_filters; init_join_op(); @@ -779,8 +782,32 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status HashJoinNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); - SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker()); + DCHECK(_runtime_profile.get() != nullptr); + _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); + _rows_returned_rate = runtime_profile()->add_derived_counter( + ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, + std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter, + runtime_profile()->total_time_counter()), + ""); + _mem_tracker = MemTracker::create_tracker(-1, "ExecNode:" + _runtime_profile->name(), + state->instance_mem_tracker(), + MemTrackerLevel::VERBOSE, _runtime_profile.get()); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + _expr_mem_tracker = MemTracker::create_tracker(-1, "ExecNode:Exprs:" + _runtime_profile->name(), + _mem_tracker); + + if (_vconjunct_ctx_ptr) { + RETURN_IF_ERROR( + (*_vconjunct_ctx_ptr)->prepare(state, _intermediate_row_desc, expr_mem_tracker())); + } + RETURN_IF_ERROR( + Expr::prepare(_conjunct_ctxs, state, _intermediate_row_desc, expr_mem_tracker())); + + // TODO(zc): + // AddExprCtxsToFree(_conjunct_ctxs); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->prepare(state)); + } _hash_table_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSetOperationNode:HashTable"); // Build phase @@ -814,12 +841,11 @@ Status HashJoinNode::prepare(RuntimeState* state) { // _vother_join_conjuncts are evaluated in the context of the rows produced by this node if (_vother_join_conjunct_ptr) { - RETURN_IF_ERROR( - (*_vother_join_conjunct_ptr) - ->prepare(state, _row_desc_for_other_join_conjunt, expr_mem_tracker())); + RETURN_IF_ERROR((*_vother_join_conjunct_ptr) + ->prepare(state, _intermediate_row_desc, expr_mem_tracker())); } - - RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _row_descriptor, expr_mem_tracker())); + RETURN_IF_ERROR( + VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc, expr_mem_tracker())); // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); @@ -992,39 +1018,10 @@ void HashJoinNode::_prepare_probe_block() { } void HashJoinNode::_construct_mutable_join_block() { - const auto& mutable_block_desc = - _have_other_join_conjunct ? _row_desc_for_other_join_conjunt : _row_descriptor; - - // TODO: Support Intermediate tuple in FE to delete the dispose the convert null operation - // here - auto [start_convert_null, end_convert_null] = std::pair {0, 0}; - - switch (_join_op) { - case TJoinOp::LEFT_OUTER_JOIN: { - start_convert_null = child(0)->row_desc().num_materialized_slots(); - end_convert_null = child(0)->row_desc().num_materialized_slots() + - child(1)->row_desc().num_materialized_slots(); - break; - } - case TJoinOp::RIGHT_OUTER_JOIN: { - end_convert_null = child(0)->row_desc().num_materialized_slots(); - break; - } - case TJoinOp::FULL_OUTER_JOIN: { - end_convert_null = child(0)->row_desc().num_materialized_slots() + - child(1)->row_desc().num_materialized_slots(); - break; - } - default: - break; - } - + const auto& mutable_block_desc = _intermediate_row_desc; for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) { for (const auto slot_desc : tuple_desc->slots()) { - auto offset = _join_block.columns(); - auto type_ptr = (offset >= start_convert_null && offset < end_convert_null) - ? make_nullable(slot_desc->get_data_type_ptr()) - : slot_desc->get_data_type_ptr(); + auto type_ptr = slot_desc->get_data_type_ptr(); _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()}); } } diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 2e7d12ba6c..25a9f35cef 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -243,6 +243,7 @@ private: std::vector<bool> _left_output_slot_flags; std::vector<bool> _right_output_slot_flags; + RowDescriptor _intermediate_row_desc; RowDescriptor _output_row_desc; private: @@ -260,7 +261,7 @@ private: void _hash_table_init(); - template <class HashTableContext> + static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128; void _prepare_probe_block(); @@ -270,8 +271,7 @@ private: static std::vector<uint16_t> _convert_block_to_null(Block& block); - template <class HashTableContext, bool ignore_null, bool build_unique> - + template <class HashTableContext> friend struct ProcessHashTableBuild; template <class HashTableContext, class JoinOpType, bool ignore_null> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org