This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9701457c364 [refine](SetOperator) refine some SetOperator code. (#49772) 9701457c364 is described below commit 9701457c364c3f2bec6e2d2adc2da4676110350f Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Thu Apr 17 11:26:18 2025 +0800 [refine](SetOperator) refine some SetOperator code. (#49772) ### What problem does this PR solve? Modified some parts of the SetOperator code. 1. expr should use local state. 2. Abstracted out the get_hash_table_size function. 3. Removed some unreachable code. 4. For the output of source, replace it with append_data_by_selector to optimize speed. --- be/src/pipeline/dependency.cpp | 13 ++++++++ be/src/pipeline/dependency.h | 1 + be/src/pipeline/exec/set_probe_sink_operator.cpp | 42 ++++++++++-------------- be/src/pipeline/exec/set_sink_operator.cpp | 30 +++++------------ be/src/pipeline/exec/set_source_operator.cpp | 35 +++++++++++--------- be/src/pipeline/exec/set_source_operator.h | 5 ++- 6 files changed, 62 insertions(+), 64 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index c8a9f5ed528..5624959f630 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -432,6 +432,19 @@ Status SetSharedState::update_build_not_ignore_null(const vectorized::VExprConte return Status::OK(); } +size_t SetSharedState::get_hash_table_size() const { + size_t hash_table_size = 0; + std::visit( + [&](auto&& arg) { + using HashTableCtxType = std::decay_t<decltype(arg)>; + if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { + hash_table_size = arg.hash_table->size(); + } + }, + hash_table_variants->method_variant); + return hash_table_size; +} + Status SetSharedState::hash_table_init() { std::vector<vectorized::DataTypePtr> data_types; for (size_t i = 0; i != child_exprs_lists[0].size(); ++i) { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index cbb2e043a2a..ef8ed63eb16 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -676,6 +676,7 @@ public: // If a calculation involves both nullable and non-nullable columns, the final output should be a nullable column Status update_build_not_ignore_null(const vectorized::VExprContextSPtrs& ctxs); + size_t get_hash_table_size() const; /// init in both upstream side. //The i-th result expr list refers to the i-th child. std::vector<vectorized::VExprContextSPtrs> child_exprs_lists; diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 5abf8b36ab0..c2c9e6741e8 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -144,22 +144,25 @@ Status SetProbeSinkOperatorX<is_intersect>::_extract_probe_column( vectorized::ColumnRawPtrs& raw_ptrs, int child_id) { auto& build_not_ignore_null = local_state._shared_state->build_not_ignore_null; - for (size_t i = 0; i < _child_exprs.size(); ++i) { + auto& child_exprs = local_state._child_exprs; + for (size_t i = 0; i < child_exprs.size(); ++i) { int result_col_id = -1; - RETURN_IF_ERROR(_child_exprs[i]->execute(&block, &result_col_id)); + RETURN_IF_ERROR(child_exprs[i]->execute(&block, &result_col_id)); block.get_by_position(result_col_id).column = block.get_by_position(result_col_id).column->convert_to_full_column_if_const(); - auto column = block.get_by_position(result_col_id).column.get(); - - if (auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) { - auto& col_nested = nullable->get_nested_column(); - if (build_not_ignore_null[i]) { //same as build column - raw_ptrs[i] = nullable; - } else { - raw_ptrs[i] = &col_nested; + const auto* column = block.get_by_position(result_col_id).column.get(); + + if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) { + if (!build_not_ignore_null[i]) { + return Status::InternalError( + "SET operator expects a nullable : {} column in column {}, but the " + "computed " + "output is a nullable : {} column", + build_not_ignore_null[i], i, + nullable->get_nested_column_ptr()->is_nullable()); } - + raw_ptrs[i] = nullable; } else { if (build_not_ignore_null[i]) { auto column_ptr = make_nullable(block.get_by_position(result_col_id).column, false); @@ -179,22 +182,10 @@ template <bool is_intersect> void SetProbeSinkOperatorX<is_intersect>::_finalize_probe( SetProbeSinkLocalState<is_intersect>& local_state) { auto& valid_element_in_hash_tbl = local_state._shared_state->valid_element_in_hash_tbl; - auto& hash_table_variants = local_state._shared_state->hash_table_variants; - if (_cur_child_id != (local_state._shared_state->child_quantity - 1)) { _refresh_hash_table(local_state); - if constexpr (is_intersect) { - valid_element_in_hash_tbl = 0; - } else { - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - valid_element_in_hash_tbl = arg.hash_table->size(); - } - }, - hash_table_variants->method_variant); - } + uint64_t hash_table_size = local_state._shared_state->get_hash_table_size(); + valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size; local_state._probe_columns.resize( local_state._shared_state->child_exprs_lists[_cur_child_id + 1].size()); local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] @@ -256,6 +247,7 @@ void SetProbeSinkOperatorX<is_intersect>::_refresh_hash_table( } arg.hash_table = std::move(tmp_hash_table); } else if (is_intersect) { + DCHECK_EQ(valid_element_in_hash_tbl, arg.hash_table->size()); while (iter != iter_end) { auto& mapped = iter->get_second(); auto* it = &mapped; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 6c5b4483915..41fd67aabf8 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -26,19 +26,6 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" -uint64_t get_hash_table_size(const auto& hash_table_variant) { - uint64_t hash_table_size = 0; - std::visit( - [&](auto&& arg) { - using HashTableCtxType = std::decay_t<decltype(arg)>; - if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - hash_table_size = arg.hash_table->size(); - } - }, - hash_table_variant); - return hash_table_size; -} - template <bool is_intersect> Status SetSinkLocalState<is_intersect>::terminate(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); @@ -58,8 +45,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status exec_s if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) { try { RETURN_IF_ERROR(_runtime_filter_producer_helper->process( - state, &_shared_state->build_block, - get_hash_table_size(_shared_state->hash_table_variants->method_variant))); + state, &_shared_state->build_block, _shared_state->get_hash_table_size())); } catch (Exception& e) { return Status::InternalError( "rf process meet error: {}, _terminated: {}, _finish_dependency: {}", @@ -105,12 +91,12 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized::Blo local_state._mutable_block.clear(); if (eos) { - uint64_t hash_table_size = get_hash_table_size( - local_state._shared_state->hash_table_variants->method_variant); + uint64_t hash_table_size = local_state._shared_state->get_hash_table_size(); valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size; local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1] ->set_ready(); + DCHECK_GT(_child_quantity, 1); RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size( state, hash_table_size, local_state._finish_dependency)); } @@ -152,16 +138,18 @@ template <bool is_intersect> Status SetSinkOperatorX<is_intersect>::_extract_build_column( SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs, size_t& rows) { - std::vector<int> result_locs(local_state._child_exprs.size(), -1); + // use local state child exprs + auto& child_expr = local_state._child_exprs; + std::vector<int> result_locs(child_expr.size(), -1); bool is_all_const = true; - for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { - RETURN_IF_ERROR(local_state._child_exprs[i]->execute(&block, &result_locs[i])); + for (size_t i = 0; i < child_expr.size(); ++i) { + RETURN_IF_ERROR(child_expr[i]->execute(&block, &result_locs[i])); is_all_const &= is_column_const(*block.get_by_position(result_locs[i]).column); } rows = is_all_const ? 1 : rows; - for (size_t i = 0; i < local_state._child_exprs.size(); ++i) { + for (size_t i = 0; i < child_expr.size(); ++i) { size_t result_col_id = result_locs[i]; if (is_all_const) { diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index bc2dc32d577..6d464513b1f 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -127,34 +127,42 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( vectorized::Block* output_block, const int batch_size, bool* eos) { size_t left_col_len = local_state._left_table_data_types.size(); hash_table_ctx.init_iterator(); - auto block_size = 0; + local_state._result_indexs.clear(); + local_state._result_indexs.reserve(batch_size); - auto add_result = [&local_state, &block_size, this](auto value) { + auto add_result = [&local_state](auto value) { auto* it = &value; if constexpr (is_intersect) { if (it->visited) { //intersected: have done probe, so visited values it's the result - _add_result_columns(local_state, value, block_size); + local_state._result_indexs.push_back(value.row_num); } } else { if (!it->visited) { //except: haven't visited values it's the needed result - _add_result_columns(local_state, value, block_size); + local_state._result_indexs.push_back(value.row_num); } } }; auto& iter = hash_table_ctx.iterator; - for (; iter != hash_table_ctx.hash_table->end() && block_size < batch_size; ++iter) { + while (iter != hash_table_ctx.hash_table->end() && + local_state._result_indexs.size() < batch_size) { add_result(iter->get_second()); + ++iter; } *eos = iter == hash_table_ctx.hash_table->end(); if (*eos && hash_table_ctx.hash_table->has_null_key_data()) { auto value = hash_table_ctx.hash_table->template get_null_key_data<RowRefWithFlag>(); + // If the hashmap can store nulldata, the return value is RowRefWithFlag, otherwise it is char* + static_assert(std::is_same_v<RowRefWithFlag, std::decay_t<decltype(value)>> || + std::is_same_v<char*, std::decay_t<decltype(value)>>); if constexpr (std::is_same_v<RowRefWithFlag, std::decay_t<decltype(value)>>) { add_result(value); } } + local_state._add_result_columns(); + if (!output_block->mem_reuse()) { for (int i = 0; i < left_col_len; ++i) { output_block->insert( @@ -169,18 +177,15 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( } template <bool is_intersect> -void SetSourceOperatorX<is_intersect>::_add_result_columns( - SetSourceLocalState<is_intersect>& local_state, RowRefWithFlag& value, int& block_size) { - auto& build_col_idx = local_state._shared_state->build_col_idx; - auto& build_block = local_state._shared_state->build_block; - - for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) { - auto& column = *build_block.get_by_position(idx->second).column; - local_state._mutable_cols[idx->first]->insert_from(column, value.row_num); +void SetSourceLocalState<is_intersect>::_add_result_columns() { + auto& build_col_idx = _shared_state->build_col_idx; + auto& build_block = _shared_state->build_block; + + for (auto& idx : build_col_idx) { + const auto& column = *build_block.get_by_position(idx.second).column; + column.append_data_by_selector(_mutable_cols[idx.first], _result_indexs); } - block_size++; } - template class SetSourceLocalState<true>; template class SetSourceLocalState<false>; template class SetSourceOperatorX<true>; diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index 20cfd885e04..a023888de58 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -41,6 +41,7 @@ public: Status open(RuntimeState* state) override; private: + void _add_result_columns(); friend class SetSourceOperatorX<is_intersect>; friend class OperatorX<SetSourceLocalState<is_intersect>>; std::vector<vectorized::MutableColumnPtr> _mutable_cols; @@ -49,6 +50,7 @@ private: RuntimeProfile::Counter* _get_data_timer = nullptr; RuntimeProfile::Counter* _filter_timer = nullptr; + vectorized::IColumn::Selector _result_indexs; }; template <bool is_intersect> @@ -90,9 +92,6 @@ private: Status _get_data_in_hashtable(SetSourceLocalState<is_intersect>& local_state, HashTableContext& hash_table_ctx, vectorized::Block* output_block, const int batch_size, bool* eos); - - void _add_result_columns(SetSourceLocalState<is_intersect>& local_state, RowRefWithFlag& value, - int& block_size); const size_t _child_quantity; }; #include "common/compile_check_end.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org