This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 31a5e072e76 [refactor](pipelineX) Simplify set operation (#25502) 31a5e072e76 is described below commit 31a5e072e76eb6f5956c1fedc135cc6e95d982d8 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Tue Oct 17 15:11:46 2023 +0800 [refactor](pipelineX) Simplify set operation (#25502) --- be/src/pipeline/exec/set_probe_sink_operator.cpp | 6 +-- be/src/pipeline/exec/set_probe_sink_operator.h | 5 +- be/src/pipeline/exec/set_sink_operator.cpp | 6 +-- be/src/pipeline/exec/set_sink_operator.h | 6 ++- .../vec/common/hash_table/hash_table_set_build.h | 55 +++------------------- .../vec/common/hash_table/hash_table_set_probe.h | 52 +++----------------- be/src/vec/exec/vset_operation_node.cpp | 2 +- be/src/vec/exec/vset_operation_node.h | 3 ++ 8 files changed, 30 insertions(+), 105 deletions(-) diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp b/be/src/pipeline/exec/set_probe_sink_operator.cpp index 81c30d45d1e..aea9aff0e75 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.cpp +++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp @@ -131,9 +131,9 @@ Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - vectorized::HashTableProbeX<HashTableCtxType, is_intersect> - process_hashtable_ctx(local_state, probe_rows); - return process_hashtable_ctx.mark_data_in_hashtable(local_state, arg); + vectorized::HashTableProbe<HashTableCtxType, is_intersect> + process_hashtable_ctx(&local_state, probe_rows); + return process_hashtable_ctx.mark_data_in_hashtable(arg); } else { LOG(FATAL) << "FATAL: uninited hash table"; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 2c2f1ce1c60..45176fd0099 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -31,7 +31,7 @@ class RuntimeState; namespace vectorized { class Block; template <class HashTableContext, bool is_intersected> -struct HashTableProbeX; +struct HashTableProbe; } // namespace vectorized namespace pipeline { @@ -81,11 +81,12 @@ public: : Base(parent, state) {} Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + int64_t* valid_element_in_hash_tbl() { return &_shared_state->valid_element_in_hash_tbl; } private: friend class SetProbeSinkOperatorX<is_intersect>; template <class HashTableContext, bool is_intersected> - friend struct vectorized::HashTableProbeX; + friend struct vectorized::HashTableProbe; //record insert column id during probe std::vector<uint16_t> _probe_column_inserted_id; diff --git a/be/src/pipeline/exec/set_sink_operator.cpp b/be/src/pipeline/exec/set_sink_operator.cpp index 604729a4700..6725deffa14 100644 --- a/be/src/pipeline/exec/set_sink_operator.cpp +++ b/be/src/pipeline/exec/set_sink_operator.cpp @@ -116,9 +116,9 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block( [&](auto&& arg) { using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { - vectorized::HashTableBuildX<HashTableCtxType, is_intersect> - hash_table_build_process(rows, raw_ptrs, offset, state); - static_cast<void>(hash_table_build_process(local_state, arg)); + vectorized::HashTableBuild<HashTableCtxType, is_intersect> + hash_table_build_process(&local_state, rows, raw_ptrs, offset, state); + static_cast<void>(hash_table_build_process(arg, local_state._arena)); } else { LOG(FATAL) << "FATAL: uninited hash table"; } diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 945ec06891c..5383b1b3a55 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -29,7 +29,7 @@ class ExecNode; namespace vectorized { template <class HashTableContext, bool is_intersected> -struct HashTableBuildX; +struct HashTableBuild; } namespace pipeline { @@ -74,10 +74,12 @@ public: Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + int64_t* mem_used() { return &_shared_state->mem_used; }; + private: friend class SetSinkOperatorX<is_intersect>; template <class HashTableContext, bool is_intersected> - friend struct vectorized::HashTableBuildX; + friend struct vectorized::HashTableBuild; RuntimeProfile::Counter* _build_timer; // time to build hash table vectorized::MutableBlock _mutable_block; diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index ff1fec3ab1c..e3c1ed27b1f 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -23,13 +23,13 @@ namespace doris::vectorized { template <class HashTableContext, bool is_intersect> struct HashTableBuild { - HashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, - VSetOperationNode<is_intersect>* operation_node, uint8_t offset, + template <typename Parent> + HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, RuntimeState* state) - : _rows(rows), + : _mem_used(parent->mem_used()), + _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), - _operation_node(operation_node), _state(state) {} Status operator()(HashTableContext& hash_table_ctx, Arena& arena) { @@ -39,7 +39,7 @@ struct HashTableBuild { Defer defer {[&]() { int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - _operation_node->_mem_used += bucket_bytes - old_bucket_bytes; + *_mem_used += bucket_bytes - old_bucket_bytes; }}; KeyGetter key_getter(_build_raw_ptrs); @@ -62,54 +62,11 @@ struct HashTableBuild { } private: + int64_t* _mem_used; const int _rows; const uint8_t _offset; ColumnRawPtrs& _build_raw_ptrs; - VSetOperationNode<is_intersect>* _operation_node; RuntimeState* _state; }; -template <class HashTableContext, bool is_intersect> -struct HashTableBuildX { - HashTableBuildX(int rows, ColumnRawPtrs& build_raw_ptrs, uint8_t offset, RuntimeState* state) - : _rows(rows), _offset(offset), _build_raw_ptrs(build_raw_ptrs), _state(state) {} - - Status operator()(pipeline::SetSinkLocalState<is_intersect>& local_state, - HashTableContext& hash_table_ctx) { - using KeyGetter = typename HashTableContext::State; - using Mapped = typename HashTableContext::Mapped; - int64_t old_bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - - Defer defer {[&]() { - int64_t bucket_bytes = hash_table_ctx.hash_table->get_buffer_size_in_bytes(); - local_state._shared_state->mem_used += bucket_bytes - old_bucket_bytes; - }}; - - KeyGetter key_getter(_build_raw_ptrs); - hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows); - - size_t k = 0; - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashTableContext::try_presis_key(key, origin, local_state._arena); - ctor(key, Mapped {k, _offset}); - }; - auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset}; }; - - for (; k < _rows; ++k) { - if (k % CHECK_FRECUENCY == 0) { - RETURN_IF_CANCELLED(_state); - } - hash_table_ctx.lazy_emplace(key_getter, k, creator, creator_for_null_key); - } - return Status::OK(); - } - -private: - const int _rows; - const uint8_t _offset; - ColumnRawPtrs& _build_raw_ptrs; - RuntimeState* _state; - std::vector<size_t> _build_side_hash_values; -}; - } // namespace doris::vectorized diff --git a/be/src/vec/common/hash_table/hash_table_set_probe.h b/be/src/vec/common/hash_table/hash_table_set_probe.h index 4a79b86a146..eb00cca8561 100644 --- a/be/src/vec/common/hash_table/hash_table_set_probe.h +++ b/be/src/vec/common/hash_table/hash_table_set_probe.h @@ -24,10 +24,11 @@ namespace doris::vectorized { template <class HashTableContext, bool is_intersected> struct HashTableProbe { - HashTableProbe(VSetOperationNode<is_intersected>* operation_node, int probe_rows) - : _operation_node(operation_node), + template <typename Parent> + HashTableProbe(Parent* parent, int probe_rows) + : _valid_element_in_hash_tbl(parent->valid_element_in_hash_tbl()), _probe_rows(probe_rows), - _probe_raw_ptrs(operation_node->_probe_columns) {} + _probe_raw_ptrs(parent->_probe_columns) {} Status mark_data_in_hashtable(HashTableContext& hash_table_ctx) { using KeyGetter = typename HashTableContext::State; @@ -43,49 +44,9 @@ struct HashTableProbe { if (!(it->visited)) { it->visited = true; if constexpr (is_intersected) { //intersected - _operation_node->_valid_element_in_hash_tbl++; + (*_valid_element_in_hash_tbl)++; } else { - _operation_node->_valid_element_in_hash_tbl--; //except - } - } - } - } - } else { - LOG(FATAL) << "Invalid RowRefListType!"; - } - return Status::OK(); - } - -private: - VSetOperationNode<is_intersected>* _operation_node; - const size_t _probe_rows; - ColumnRawPtrs& _probe_raw_ptrs; - std::vector<StringRef> _probe_keys; -}; - -template <class HashTableContext, bool is_intersected> -struct HashTableProbeX { - HashTableProbeX(pipeline::SetProbeSinkLocalState<is_intersected>& local_state, int probe_rows) - : _probe_rows(probe_rows), _probe_raw_ptrs(local_state._probe_columns) {} - - Status mark_data_in_hashtable(pipeline::SetProbeSinkLocalState<is_intersected>& local_state, - HashTableContext& hash_table_ctx) { - using KeyGetter = typename HashTableContext::State; - - KeyGetter key_getter(_probe_raw_ptrs); - hash_table_ctx.init_serialized_keys(_probe_raw_ptrs, _probe_rows); - - if constexpr (std::is_same_v<typename HashTableContext::Mapped, RowRefListWithFlags>) { - for (int probe_index = 0; probe_index < _probe_rows; probe_index++) { - auto find_result = hash_table_ctx.find(key_getter, probe_index); - if (find_result.is_found()) { //if found, marked visited - auto it = find_result.get_mapped().begin(); - if (!(it->visited)) { - it->visited = true; - if constexpr (is_intersected) { //intersected - local_state._shared_state->valid_element_in_hash_tbl++; - } else { - local_state._shared_state->valid_element_in_hash_tbl--; //except + (*_valid_element_in_hash_tbl)--; //except } } } @@ -97,6 +58,7 @@ struct HashTableProbeX { } private: + int64_t* _valid_element_in_hash_tbl; const size_t _probe_rows; ColumnRawPtrs& _probe_raw_ptrs; std::vector<StringRef> _probe_keys; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 9b15db67b3c..d284385b8ed 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -320,7 +320,7 @@ Status VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_ using HashTableCtxType = std::decay_t<decltype(arg)>; if constexpr (!std::is_same_v<HashTableCtxType, std::monostate>) { HashTableBuild<HashTableCtxType, is_intersect> hash_table_build_process( - rows, raw_ptrs, this, offset, state); + this, rows, raw_ptrs, offset, state); st = hash_table_build_process(arg, _arena); } else { LOG(FATAL) << "FATAL: uninited hash table"; diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index dfd96430115..ff016469f49 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -73,6 +73,9 @@ public: bool is_child_finished(int child_id) const; + int64_t* valid_element_in_hash_tbl() { return &_valid_element_in_hash_tbl; } + int64_t* mem_used() { return &_mem_used; }; + private: void _finalize_probe(int child_id); //Todo: In build process of hashtable, It's same as join node. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org