HappenLee commented on code in PR #24461: URL: https://github.com/apache/doris/pull/24461#discussion_r1336330717
########## be/src/vec/exec/join/process_hash_table_probe_impl.h: ########## @@ -207,883 +208,601 @@ void ProcessHashTableProbe<JoinOpType>::_pre_serialize_key( } template <int JoinOpType> -template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> +template <typename KeyGetter> +KeyGetter ProcessHashTableProbe<JoinOpType>::_init_probe_side(size_t probe_rows, + bool with_other_join_conjuncts) { + _right_col_idx = _join_context->_is_right_semi_anti && !with_other_join_conjuncts + ? 0 + : _join_context->_left_table_data_types->size(); + _right_col_len = _join_context->_right_table_data_types->size(); + _row_count_from_last_probe = 0; + + _probe_indexs.clear(); + _build_block_rows.clear(); + _build_block_offsets.clear(); + _visited_map.clear(); + _same_to_prev.clear(); + if (with_other_join_conjuncts) { + _probe_indexs.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE); + // use in right join to change visited state after exec the vother join conjunct + _visited_map.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _same_to_prev.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE); + } + _build_block_rows.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.reserve(probe_rows * PROBE_SIDE_EXPLODE_RATE); + + KeyGetter key_getter(*_join_context->_probe_columns, _join_context->_probe_key_sz, nullptr); + + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { + if (*_join_context->_ready_probe == false) { + _pre_serialize_key(*_join_context->_probe_columns, probe_rows, _probe_keys); + } + key_getter.set_serialized_keys(_probe_keys.data()); + } + + return key_getter; +} + +template <int JoinOpType> +template <bool need_null_map_for_probe, typename HashTableType, typename Keys> +void ProcessHashTableProbe<JoinOpType>::_probe_hash(const Keys& keys, HashTableType& hash_table_ctx, + ConstNullMapPtr null_map) { + if (*_join_context->_ready_probe) { + return; + } + SCOPED_TIMER(_search_hashtable_timer); + _probe_side_hash_values.resize(keys.size()); + for (size_t k = 0; k < keys.size(); ++k) { + if constexpr (need_null_map_for_probe) { + if ((*null_map)[k]) { + continue; + } + } + _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash(keys[k]); + } + *_join_context->_ready_probe = true; +} + +template <int JoinOpType> +template <typename Mapped, bool with_other_join_conjuncts> +ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType>::_probe_row_match(int& current_offset, + int& probe_index, + size_t& probe_size, + bool& all_match_one) { + auto& probe_row_match_iter = + std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter); + if (!probe_row_match_iter.ok()) { + return probe_row_match_iter; + } + + SCOPED_TIMER(_search_hashtable_timer); + for (; probe_row_match_iter.ok() && current_offset < _batch_size; ++probe_row_match_iter) { + _emplace_element(probe_row_match_iter->block_offset, probe_row_match_iter->row_num, + current_offset); + _probe_indexs.emplace_back(probe_index); + if constexpr (with_other_join_conjuncts) { + _visited_map.emplace_back(&probe_row_match_iter->visited); + } + } + + _row_count_from_last_probe = current_offset; + all_match_one &= (current_offset == 1); + if (!probe_row_match_iter.ok()) { + ++probe_index; + } + probe_size = 1; + + return probe_row_match_iter; +} + +template <int JoinOpType> +void ProcessHashTableProbe<JoinOpType>::_emplace_element(int8_t block_offset, int32_t block_row, + int& current_offset) { + _build_block_offsets.emplace_back(block_offset); + _build_block_rows.emplace_back(block_row); + current_offset++; +} + +template <int JoinOpType> +template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType, + bool with_other_conjuncts, bool is_mark_join> Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, - Block* output_block, size_t probe_rows, - bool is_mark_join) { + Block* output_block, size_t probe_rows) { auto& probe_index = *_join_context->_probe_index; - auto& probe_raw_ptrs = *_join_context->_probe_columns; - _probe_indexs.resize(_batch_size); - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - } using KeyGetter = typename HashTableType::State; using Mapped = typename HashTableType::Mapped; - int right_col_idx = - _join_context->_is_right_semi_anti ? 0 : _join_context->_left_table_data_types->size(); - int right_col_len = _join_context->_right_table_data_types->size(); - - KeyGetter key_getter(probe_raw_ptrs, _join_context->_probe_key_sz, nullptr); - - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { - if (probe_index == 0) { - _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); - } - key_getter.set_serialized_keys(_probe_keys.data()); - } + KeyGetter key_getter = _init_probe_side<KeyGetter>(probe_rows, with_other_conjuncts); auto& mcol = mutable_block.mutable_columns(); - int current_offset = 0; constexpr auto is_right_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; constexpr auto probe_all = JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - bool all_match_one = true; int last_probe_index = probe_index; + + int current_offset = 0; + bool all_match_one = true; size_t probe_size = 0; - auto& probe_row_match_iter = - std::get<ForwardIterator<Mapped>>(*_join_context->_probe_row_match_iter); + auto& probe_row_match_iter = _probe_row_match<Mapped, with_other_conjuncts>( + current_offset, probe_index, probe_size, all_match_one); + + bool is_the_last_sub_block = false; + if (with_other_conjuncts && probe_size != 0) { + is_the_last_sub_block = !probe_row_match_iter.ok(); + _same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - 1; ++i) { + _same_to_prev.emplace_back(true); + } + } + + const auto& keys = key_getter.get_keys(); + int multi_matched_output_row_count = 0; + _probe_hash<need_null_map_for_probe, HashTableType>(keys, hash_table_ctx, null_map); + { SCOPED_TIMER(_search_hashtable_timer); - if constexpr (!is_right_semi_anti_join) { - // handle ramaining matched rows from last probe row - if (probe_row_match_iter.ok()) { - for (; probe_row_match_iter.ok() && current_offset < _batch_size; - ++probe_row_match_iter) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = probe_row_match_iter->block_offset; - _build_block_rows[current_offset] = probe_row_match_iter->row_num; - _probe_indexs[current_offset] = probe_index; + using FindResult = decltype(key_getter.find_key(hash_table_ctx.hash_table, 0, *_arena)); + FindResult empty = {nullptr, false}; + while (current_offset < _batch_size && probe_index < probe_rows) { + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + _emplace_element(-1, -1, current_offset); + _probe_indexs.emplace_back(probe_index); + + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + _visited_map.emplace_back(nullptr); + } } else { - _build_block_offsets.emplace_back(probe_row_match_iter->block_offset); - _build_block_rows.emplace_back(probe_row_match_iter->row_num); - _probe_indexs.template emplace_back(probe_index); + all_match_one = false; } - ++current_offset; - } - all_match_one &= (current_offset == 1); - if (!probe_row_match_iter.ok()) { - ++probe_index; + probe_index++; + continue; } - probe_size = 1; } - } - auto empty = decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index, - *_arena)) {nullptr, false}; + const auto& find_result = + need_null_map_for_probe && (*null_map)[probe_index] + ? empty + : key_getter.find_key_with_hash(hash_table_ctx.hash_table, + _probe_side_hash_values[probe_index], + keys[probe_index]); + if (LIKELY(probe_index + HASH_MAP_PREFETCH_DIST < probe_rows) && + !(need_null_map_for_probe && (*null_map)[probe_index + HASH_MAP_PREFETCH_DIST])) { + key_getter.template prefetch_by_hash<true>( + hash_table_ctx.hash_table, + _probe_side_hash_values[probe_index + HASH_MAP_PREFETCH_DIST]); + } - if (current_offset < _batch_size) { - if (*(_join_context->_ready_probe_index) < probe_rows) { - _probe_side_hash_values.resize(probe_rows); - for (size_t k = *(_join_context->_ready_probe_index); k < probe_rows; ++k) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[k]) { - continue; - } - } - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< - KeyGetter>::value) { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena).key); - } else { - _probe_side_hash_values[k] = hash_table_ctx.hash_table.hash( - key_getter.get_key_holder(k, *_arena)); - } + auto current_probe_index = probe_index; + if constexpr (!with_other_conjuncts && + (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) { + bool need_go_ahead = + (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^ find_result.is_found(); + if constexpr (is_mark_join) { + ++current_offset; + assert_cast<ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) + .get_data() + .template push_back(need_go_ahead); + } else { + current_offset += need_go_ahead; } - *(_join_context->_ready_probe_index) = probe_rows; - } - while (probe_index < probe_rows) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - _probe_indexs[current_offset] = probe_index; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - _probe_indexs.template emplace_back(probe_index); - } - ++current_offset; + ++probe_index; + } else { + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + auto origin_offset = current_offset; + + // For mark join, if euqual-matched tuple count for one probe row + // excceeds batch size, it's difficult to implement the logic to + // split them into multiple sub blocks and handle them, keep the original + // logic for now. + if constexpr (is_mark_join && with_other_conjuncts) { + for (auto it = mapped.begin(); it.ok(); ++it) { + _emplace_element(it->block_offset, it->row_num, current_offset); + _visited_map.emplace_back(&it->visited); } - probe_index++; - all_match_one = false; - if constexpr (probe_all) { - if (current_offset >= _batch_size) { - break; + ++probe_index; + } else if constexpr (with_other_conjuncts || !is_right_semi_anti_join) { + auto multi_match_last_offset = current_offset; + auto it = mapped.begin(); + for (; it.ok() && current_offset < _batch_size; ++it) { + _emplace_element(it->block_offset, it->row_num, current_offset); + + if constexpr (with_other_conjuncts) { + _visited_map.emplace_back(&it->visited); } } - continue; - } - } - int last_offset = current_offset; - auto find_result = (need_null_map_for_probe && (*null_map)[probe_index]) - ? empty - : key_getter.find_key_with_hash( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index], - probe_index, *_arena); - if (probe_index + HASH_MAP_PREFETCH_DIST < probe_rows) { - key_getter.template prefetch_by_hash<true>( - hash_table_ctx.hash_table, - _probe_side_hash_values[probe_index + HASH_MAP_PREFETCH_DIST]); - } - - auto current_probe_index = probe_index; - if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (is_mark_join) { - ++current_offset; - assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(!find_result.is_found()); - } else { - if (!find_result.is_found()) { - ++current_offset; + probe_row_match_iter = it; + if (!it.ok()) { + // If all matched rows for the current probe row are handled, + // advance to next probe row. + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + ++probe_index; + } else if constexpr (with_other_conjuncts) { + // If not(which means it excceed batch size), probe_index is not increased and + // remaining matched rows for the current probe row will be + // handled in the next call of this function + multi_matched_output_row_count = + current_offset - multi_match_last_offset; } - } - ++probe_index; - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (is_mark_join) { - ++current_offset; - assert_cast<doris::vectorized::ColumnVector<UInt8>&>(*mcol[mcol.size() - 1]) - .get_data() - .template push_back(find_result.is_found()); } else { - if (find_result.is_found()) { - ++current_offset; - } + ++probe_index; + } + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + mapped.visited = true; } - ++probe_index; - } else { - DCHECK(!is_mark_join); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.is_single() == 1) { - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } - if constexpr (!is_right_semi_anti_join) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - } - ++probe_index; - } else { - if constexpr (!is_right_semi_anti_join) { - auto it = mapped.begin(); - for (; it.ok() && current_offset < _batch_size; ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - } - probe_row_match_iter = it; - if (!it.ok()) { - // If all matched rows for the current probe row are handled, - // advance to next probe row. - // If not(which means it excceed batch size), probe_index is not increased and - // remaining matched rows for the current probe row will be - // handled in the next call of this function - ++probe_index; - } - } else { - ++probe_index; - } - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + _same_to_prev.emplace_back(true); } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } - ++probe_index; } - } + } else if constexpr (probe_all || JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + (JoinOpType == TJoinOp::LEFT_SEMI_JOIN && is_mark_join)) { + // only full outer / left outer need insert the data of right table + _emplace_element(-1, -1, current_offset); - uint32_t count = (uint32_t)(current_offset - last_offset); - if (LIKELY(current_offset < _probe_indexs.size())) { - for (int i = last_offset; i < current_offset; ++i) { - _probe_indexs[i] = current_probe_index; + if constexpr (with_other_conjuncts) { + _same_to_prev.emplace_back(false); + _visited_map.emplace_back(nullptr); } + ++probe_index; } else { - for (int i = last_offset; i < _probe_indexs.size(); ++i) { - _probe_indexs[i] = current_probe_index; - } - _probe_indexs.resize(current_offset, current_probe_index); - } - all_match_one &= (count == 1); - if (current_offset >= _batch_size) { - break; + ++probe_index; } } - probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); + all_match_one &= (current_offset == _probe_indexs.size() + 1); + _probe_indexs.resize(current_offset, current_probe_index); } + probe_size = probe_index - last_probe_index + probe_row_match_iter.ok(); } - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - *_join_context->_right_output_slot_flags, current_offset); - } + build_side_output_column(mcol, *_join_context->_right_output_slot_flags, current_offset, + with_other_conjuncts); - if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { - SCOPED_TIMER(_probe_side_output_timer); + if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) { RETURN_IF_CATCH_EXCEPTION(probe_side_output_column( mcol, *_join_context->_left_output_slot_flags, current_offset, last_probe_index, - probe_size, all_match_one, false)); + probe_size, all_match_one, with_other_conjuncts)); } output_block->swap(mutable_block.to_block()); + if constexpr (with_other_conjuncts) { + return do_other_join_conjuncts(output_block, is_mark_join, multi_matched_output_row_count, + is_the_last_sub_block); + } + return Status::OK(); } template <int JoinOpType> -template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> -Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( - HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, - Block* output_block, size_t probe_rows, bool is_mark_join) { - auto& probe_index = *_join_context->_probe_index; - auto& probe_raw_ptrs = *_join_context->_probe_columns; - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _probe_indexs.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); +Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts( + Block* output_block, bool is_mark_join, int multi_matched_output_row_count, + bool is_the_last_sub_block) { + // dispose the other join conjunct exec + auto row_count = output_block->rows(); + if (!row_count) { + return Status::OK(); } - using KeyGetter = typename HashTableType::State; - using Mapped = typename HashTableType::Mapped; - if constexpr (std::is_same_v<Mapped, RowRefListWithFlags>) { - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - KeyGetter key_getter(probe_raw_ptrs, _join_context->_probe_key_sz, nullptr); - - if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { - if (probe_index == 0) { - _pre_serialize_key(probe_raw_ptrs, probe_rows, _probe_keys); + SCOPED_TIMER(_join_context->_process_other_join_conjunct_timer); + int orig_columns = output_block->columns(); + IColumn::Filter other_conjunct_filter(row_count, 1); + bool can_be_filter_all = false; + RETURN_IF_ERROR(VExprContext::execute_conjuncts(*_join_context->_other_join_conjuncts, nullptr, + output_block, &other_conjunct_filter, + &can_be_filter_all)); + + auto result_column_id = output_block->columns(); + auto filter_column = ColumnVector<UInt8>::create(); Review Comment: better reuse the mem of filter column -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org