This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1fad4a207aa [refactor](pipeline) Refactor non-pipeline code structure (#35900) 1fad4a207aa is described below commit 1fad4a207aa7d608dbd6ef51b3d551ebe3538e7e Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Jun 5 19:48:17 2024 +0800 [refactor](pipeline) Refactor non-pipeline code structure (#35900) --- be/src/pipeline/common/agg_utils.h | 340 +++++++++++++++++++ .../data_gen_functions/vdata_gen_function_inf.h | 4 +- .../common}/data_gen_functions/vnumbers_tvf.cpp | 14 +- .../common}/data_gen_functions/vnumbers_tvf.h | 6 +- be/src/pipeline/common/join_utils.h | 68 ++++ .../common}/runtime_filter_consumer.cpp | 10 +- .../common}/runtime_filter_consumer.h | 9 +- be/src/pipeline/dependency.cpp | 20 +- be/src/pipeline/dependency.h | 85 ++--- be/src/pipeline/exec/aggregation_sink_operator.cpp | 38 +-- be/src/pipeline/exec/aggregation_sink_operator.h | 4 +- .../pipeline/exec/aggregation_source_operator.cpp | 1 + be/src/pipeline/exec/analytic_sink_operator.cpp | 1 + be/src/pipeline/exec/analytic_source_operator.cpp | 1 + be/src/pipeline/exec/datagen_operator.cpp | 7 +- be/src/pipeline/exec/datagen_operator.h | 4 +- .../distinct_streaming_aggregation_operator.cpp | 3 +- .../exec/distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/es_scan_operator.cpp | 14 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 29 +- be/src/pipeline/exec/hashjoin_build_sink.h | 60 +++- be/src/pipeline/exec/hashjoin_probe_operator.cpp | 5 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 24 +- .../exec/join/cross_join_impl.cpp | 2 +- .../exec/join/full_outer_join_impl.cpp | 2 +- .../exec/join/inner_join_impl.cpp | 2 +- be/src/{vec => pipeline}/exec/join/join_op.h | 14 +- .../exec/join/left_anti_join_impl.cpp | 2 +- .../exec/join/left_outer_join_impl.cpp | 2 +- .../exec/join/left_semi_join_impl.cpp | 2 +- .../exec/join/null_aware_left_anti_join_impl.cpp | 2 +- .../exec/join/null_aware_left_semi_join_impl.cpp | 2 +- .../exec/join/process_hash_table_probe.h | 56 ++-- .../exec/join/process_hash_table_probe_impl.h | 192 +++++------ .../exec/join/right_anti_join_impl.cpp | 2 +- .../exec/join/right_outer_join_impl.cpp | 2 +- .../exec/join/right_semi_join_impl.cpp | 2 +- .../exec/multi_cast_data_stream_source.cpp | 7 +- .../pipeline/exec/multi_cast_data_stream_source.h | 5 +- be/src/pipeline/exec/olap_scan_operator.cpp | 2 +- .../exec/partitioned_aggregation_sink_operator.h | 1 + be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/pipeline/exec/scan_operator.h | 11 +- be/src/pipeline/exec/set_source_operator.cpp | 2 +- be/src/pipeline/exec/set_source_operator.h | 2 +- .../exec/streaming_aggregation_operator.cpp | 16 +- .../pipeline/exec/streaming_aggregation_operator.h | 4 +- be/src/runtime/fragment_mgr.cpp | 20 -- be/src/runtime/query_context.cpp | 21 +- be/src/runtime/query_context.h | 2 +- be/src/vec/common/hash_table/hash_map_context.h | 12 +- .../vec/common/hash_table/hash_table_set_build.h | 1 + be/src/vec/exec/join/vacquire_list.hpp | 54 --- be/src/vec/exec/join/vhash_join_node.h | 180 ---------- be/src/vec/exec/scan/vscan_node.h | 52 --- be/src/vec/exec/vaggregation_node.h | 366 --------------------- 56 files changed, 791 insertions(+), 1002 deletions(-) diff --git a/be/src/pipeline/common/agg_utils.h b/be/src/pipeline/common/agg_utils.h new file mode 100644 index 00000000000..e0435954b8b --- /dev/null +++ b/be/src/pipeline/common/agg_utils.h @@ -0,0 +1,340 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <variant> +#include <vector> + +#include "vec/common/arena.h" +#include "vec/common/hash_table/hash_map_context.h" +#include "vec/common/hash_table/hash_map_context_creator.h" +#include "vec/common/hash_table/hash_map_util.h" +#include "vec/common/hash_table/ph_hash_map.h" +#include "vec/common/hash_table/string_hash_map.h" + +namespace doris { +namespace pipeline { + +using AggregatedDataWithoutKey = vectorized::AggregateDataPtr; +using AggregatedDataWithStringKey = PHHashMap<StringRef, vectorized::AggregateDataPtr>; +using AggregatedDataWithShortStringKey = StringHashMap<vectorized::AggregateDataPtr>; +using AggregatedDataWithUInt8Key = PHHashMap<vectorized::UInt8, vectorized::AggregateDataPtr>; +using AggregatedDataWithUInt16Key = PHHashMap<vectorized::UInt16, vectorized::AggregateDataPtr>; +using AggregatedDataWithUInt32Key = + PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr, HashCRC32<vectorized::UInt32>>; +using AggregatedDataWithUInt64Key = + PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr, HashCRC32<vectorized::UInt64>>; +using AggregatedDataWithUInt128Key = PHHashMap<vectorized::UInt128, vectorized::AggregateDataPtr, + HashCRC32<vectorized::UInt128>>; +using AggregatedDataWithUInt256Key = PHHashMap<vectorized::UInt256, vectorized::AggregateDataPtr, + HashCRC32<vectorized::UInt256>>; +using AggregatedDataWithUInt136Key = PHHashMap<vectorized::UInt136, vectorized::AggregateDataPtr, + HashCRC32<vectorized::UInt136>>; + +using AggregatedDataWithUInt32KeyPhase2 = + PHHashMap<vectorized::UInt32, vectorized::AggregateDataPtr, + HashMixWrapper<vectorized::UInt32>>; +using AggregatedDataWithUInt64KeyPhase2 = + PHHashMap<vectorized::UInt64, vectorized::AggregateDataPtr, + HashMixWrapper<vectorized::UInt64>>; +using AggregatedDataWithUInt128KeyPhase2 = + PHHashMap<vectorized::UInt128, vectorized::AggregateDataPtr, + HashMixWrapper<vectorized::UInt128>>; +using AggregatedDataWithUInt256KeyPhase2 = + PHHashMap<vectorized::UInt256, vectorized::AggregateDataPtr, + HashMixWrapper<vectorized::UInt256>>; + +using AggregatedDataWithUInt136KeyPhase2 = + PHHashMap<vectorized::UInt136, vectorized::AggregateDataPtr, + HashMixWrapper<vectorized::UInt136>>; + +using AggregatedDataWithNullableUInt8Key = vectorized::DataWithNullKey<AggregatedDataWithUInt8Key>; +using AggregatedDataWithNullableUInt16Key = + vectorized::DataWithNullKey<AggregatedDataWithUInt16Key>; +using AggregatedDataWithNullableUInt32Key = + vectorized::DataWithNullKey<AggregatedDataWithUInt32Key>; +using AggregatedDataWithNullableUInt64Key = + vectorized::DataWithNullKey<AggregatedDataWithUInt64Key>; +using AggregatedDataWithNullableUInt32KeyPhase2 = + vectorized::DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>; +using AggregatedDataWithNullableUInt64KeyPhase2 = + vectorized::DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>; +using AggregatedDataWithNullableShortStringKey = + vectorized::DataWithNullKey<AggregatedDataWithShortStringKey>; +using AggregatedDataWithNullableUInt128Key = + vectorized::DataWithNullKey<AggregatedDataWithUInt128Key>; +using AggregatedDataWithNullableUInt128KeyPhase2 = + vectorized::DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>; + +using AggregatedMethodVariants = std::variant< + std::monostate, vectorized::MethodSerialized<AggregatedDataWithStringKey>, + vectorized::MethodOneNumber<vectorized::UInt8, AggregatedDataWithUInt8Key>, + vectorized::MethodOneNumber<vectorized::UInt16, AggregatedDataWithUInt16Key>, + vectorized::MethodOneNumber<vectorized::UInt32, AggregatedDataWithUInt32Key>, + vectorized::MethodOneNumber<vectorized::UInt64, AggregatedDataWithUInt64Key>, + vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>, + vectorized::MethodOneNumber<vectorized::UInt128, AggregatedDataWithUInt128Key>, + vectorized::MethodOneNumber<vectorized::UInt32, AggregatedDataWithUInt32KeyPhase2>, + vectorized::MethodOneNumber<vectorized::UInt64, AggregatedDataWithUInt64KeyPhase2>, + vectorized::MethodOneNumber<vectorized::UInt128, AggregatedDataWithUInt128KeyPhase2>, + vectorized::MethodSingleNullableColumn< + vectorized::MethodOneNumber<vectorized::UInt8, AggregatedDataWithNullableUInt8Key>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt16, AggregatedDataWithNullableUInt16Key>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt32, AggregatedDataWithNullableUInt32Key>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt64, AggregatedDataWithNullableUInt64Key>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt128, AggregatedDataWithNullableUInt128Key>>, + vectorized::MethodSingleNullableColumn<vectorized::MethodOneNumber< + vectorized::UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>, + vectorized::MethodSingleNullableColumn< + vectorized::MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt64Key, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt128Key, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt256Key, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt136Key, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>, + vectorized::MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>; + +struct AggregatedDataVariants + : public vectorized::DataVariants<AggregatedMethodVariants, + vectorized::MethodSingleNullableColumn, + vectorized::MethodOneNumber, vectorized::MethodKeysFixed, + vectorized::DataWithNullKey> { + AggregatedDataWithoutKey without_key = nullptr; + + template <bool nullable> + void init(Type type) { + _type = type; + switch (_type) { + case Type::without_key: + break; + case Type::serialized: + method_variant.emplace<vectorized::MethodSerialized<AggregatedDataWithStringKey>>(); + break; + case Type::int8_key: + emplace_single<vectorized::UInt8, AggregatedDataWithUInt8Key, nullable>(); + break; + case Type::int16_key: + emplace_single<vectorized::UInt16, AggregatedDataWithUInt16Key, nullable>(); + break; + case Type::int32_key: + emplace_single<vectorized::UInt32, AggregatedDataWithUInt32Key, nullable>(); + break; + case Type::int32_key_phase2: + emplace_single<vectorized::UInt32, AggregatedDataWithUInt32KeyPhase2, nullable>(); + break; + case Type::int64_key: + emplace_single<vectorized::UInt64, AggregatedDataWithUInt64Key, nullable>(); + break; + case Type::int64_key_phase2: + emplace_single<vectorized::UInt64, AggregatedDataWithUInt64KeyPhase2, nullable>(); + break; + case Type::int128_key: + emplace_single<vectorized::UInt128, AggregatedDataWithUInt128Key, nullable>(); + break; + case Type::int128_key_phase2: + emplace_single<vectorized::UInt128, AggregatedDataWithUInt128KeyPhase2, nullable>(); + break; + case Type::string_key: + if (nullable) { + method_variant.emplace< + vectorized::MethodSingleNullableColumn<vectorized::MethodStringNoCache< + AggregatedDataWithNullableShortStringKey>>>(); + } else { + method_variant.emplace< + vectorized::MethodStringNoCache<AggregatedDataWithShortStringKey>>(); + } + break; + default: + throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, type={}", type); + } + } + + void init(Type type, bool is_nullable = false) { + if (is_nullable) { + init<true>(type); + } else { + init<false>(type); + } + } +}; + +using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>; +using ArenaUPtr = std::unique_ptr<vectorized::Arena>; + +struct AggregateDataContainer { +public: + AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states) + : _size_of_key(size_of_key), _size_of_aggregate_states(size_of_aggregate_states) {} + + int64_t memory_usage() const { return _arena_pool.size(); } + + template <typename KeyType> + vectorized::AggregateDataPtr append_data(const KeyType& key) { + DCHECK_EQ(sizeof(KeyType), _size_of_key); + // SUB_CONTAINER_CAPACITY should add a new sub container, and also expand when it is zero + if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) { + _expand(); + } + + *reinterpret_cast<KeyType*>(_current_keys) = key; + auto aggregate_data = _current_agg_data; + ++_total_count; + ++_index_in_sub_container; + _current_agg_data += _size_of_aggregate_states; + _current_keys += _size_of_key; + return aggregate_data; + } + + template <typename Derived, bool IsConst> + class IteratorBase { + using Container = + std::conditional_t<IsConst, const AggregateDataContainer, AggregateDataContainer>; + + Container* container = nullptr; + uint32_t index; + uint32_t sub_container_index; + uint32_t index_in_sub_container; + + friend class HashTable; + + public: + IteratorBase() = default; + IteratorBase(Container* container_, uint32_t index_) + : container(container_), index(index_) { + sub_container_index = index / SUB_CONTAINER_CAPACITY; + index_in_sub_container = index - sub_container_index * SUB_CONTAINER_CAPACITY; + } + + bool operator==(const IteratorBase& rhs) const { return index == rhs.index; } + bool operator!=(const IteratorBase& rhs) const { return index != rhs.index; } + + Derived& operator++() { + index++; + index_in_sub_container++; + if (index_in_sub_container == SUB_CONTAINER_CAPACITY) { + index_in_sub_container = 0; + sub_container_index++; + } + return static_cast<Derived&>(*this); + } + + template <typename KeyType> + KeyType get_key() { + DCHECK_EQ(sizeof(KeyType), container->_size_of_key); + return ((KeyType*)(container->_key_containers[sub_container_index])) + [index_in_sub_container]; + } + + vectorized::AggregateDataPtr get_aggregate_data() { + return &(container->_value_containers[sub_container_index] + [container->_size_of_aggregate_states * + index_in_sub_container]); + } + }; + + class Iterator : public IteratorBase<Iterator, false> { + public: + using IteratorBase<Iterator, false>::IteratorBase; + }; + + class ConstIterator : public IteratorBase<ConstIterator, true> { + public: + using IteratorBase<ConstIterator, true>::IteratorBase; + }; + + ConstIterator begin() const { return ConstIterator(this, 0); } + + ConstIterator cbegin() const { return begin(); } + + Iterator begin() { return Iterator(this, 0); } + + ConstIterator end() const { return ConstIterator(this, _total_count); } + ConstIterator cend() const { return end(); } + Iterator end() { return Iterator(this, _total_count); } + + void init_once() { + if (_inited) { + return; + } + _inited = true; + iterator = begin(); + } + Iterator iterator; + +private: + void _expand() { + _index_in_sub_container = 0; + _current_keys = nullptr; + _current_agg_data = nullptr; + try { + _current_keys = _arena_pool.alloc(_size_of_key * SUB_CONTAINER_CAPACITY); + _key_containers.emplace_back(_current_keys); + + _current_agg_data = (vectorized::AggregateDataPtr)_arena_pool.alloc( + _size_of_aggregate_states * SUB_CONTAINER_CAPACITY); + _value_containers.emplace_back(_current_agg_data); + } catch (...) { + if (_current_keys) { + _key_containers.pop_back(); + _current_keys = nullptr; + } + if (_current_agg_data) { + _value_containers.pop_back(); + _current_agg_data = nullptr; + } + throw; + } + } + + static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192; + vectorized::Arena _arena_pool; + std::vector<char*> _key_containers; + std::vector<vectorized::AggregateDataPtr> _value_containers; + vectorized::AggregateDataPtr _current_agg_data = nullptr; + char* _current_keys = nullptr; + size_t _size_of_key {}; + size_t _size_of_aggregate_states {}; + uint32_t _index_in_sub_container {}; + uint32_t _total_count {}; + bool _inited = false; +}; + +} // namespace pipeline + +constexpr auto init_agg_hash_method = + init_hash_method<pipeline::AggregatedDataVariants, vectorized::AggregateDataPtr>; + +} // namespace doris diff --git a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h similarity index 97% rename from be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h rename to be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h index 515be45ad13..bb9ffdb74b7 100644 --- a/be/src/vec/exec/data_gen_functions/vdata_gen_function_inf.h +++ b/be/src/pipeline/common/data_gen_functions/vdata_gen_function_inf.h @@ -29,7 +29,7 @@ class RuntimeState; class Status; class TScanRangeParams; -namespace vectorized { +namespace pipeline { class VDataGenFunctionInf { public: @@ -51,6 +51,6 @@ protected: const TupleDescriptor* _tuple_desc = nullptr; }; -} // namespace vectorized +} // namespace pipeline } // namespace doris diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp similarity index 87% rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp index d33d02aa953..3bbcb544abc 100644 --- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp +++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/data_gen_functions/vnumbers_tvf.h" +#include "pipeline/common/data_gen_functions/vnumbers_tvf.h" #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/PlanNodes_types.h> @@ -35,7 +35,7 @@ #include "vec/core/types.h" #include "vec/data_types/data_type.h" -namespace doris::vectorized { +namespace doris::pipeline { VNumbersTVF::VNumbersTVF(TupleId tuple_id, const TupleDescriptor* tuple_desc) : VDataGenFunctionInf(tuple_id, tuple_desc) {} @@ -59,7 +59,7 @@ Status VNumbersTVF::get_next(RuntimeState* state, vectorized::Block* block, bool *eos = true; continue; } - auto* column_res = assert_cast<ColumnInt64*>(columns[i].get()); //BIGINT + auto* column_res = assert_cast<vectorized::ColumnInt64*>(columns[i].get()); //BIGINT int64_t end_value = std::min((int64_t)(_next_number + batch_size), _total_numbers); if (_use_const) { column_res->insert_many_vals(_const_value, end_value - _next_number); @@ -78,9 +78,9 @@ Status VNumbersTVF::get_next(RuntimeState* state, vectorized::Block* block, bool } else { size_t n_columns = 0; for (const auto* slot_desc : _tuple_desc->slots()) { - block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]), - slot_desc->get_data_type_ptr(), - slot_desc->col_name())); + block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); } } return Status::OK(); @@ -97,4 +97,4 @@ Status VNumbersTVF::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ra return Status::OK(); } -} // namespace doris::vectorized +} // namespace doris::pipeline diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h similarity index 93% rename from be/src/vec/exec/data_gen_functions/vnumbers_tvf.h rename to be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h index 1968637fd36..bf8b117a378 100644 --- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.h +++ b/be/src/pipeline/common/data_gen_functions/vnumbers_tvf.h @@ -21,7 +21,7 @@ #include <vector> #include "common/global_types.h" -#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h" +#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h" namespace doris { @@ -30,7 +30,7 @@ class RuntimeState; class Status; class TScanRangeParams; -namespace vectorized { +namespace pipeline { class Block; class VNumbersTVF : public VDataGenFunctionInf { @@ -51,6 +51,6 @@ private: int64_t _next_number = 0; }; -} // namespace vectorized +} // namespace pipeline } // namespace doris diff --git a/be/src/pipeline/common/join_utils.h b/be/src/pipeline/common/join_utils.h new file mode 100644 index 00000000000..cd3374995f7 --- /dev/null +++ b/be/src/pipeline/common/join_utils.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <variant> +#include <vector> + +#include "vec/common/hash_table/hash_map_context_creator.h" +#include "vec/common/hash_table/hash_map_util.h" + +namespace doris::pipeline { +using JoinOpVariants = + std::variant<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::LEFT_ANTI_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::LEFT_OUTER_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::FULL_OUTER_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_OUTER_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::CROSS_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, + std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>; + +using I8HashTableContext = vectorized::PrimaryTypeHashTableContext<vectorized::UInt8>; +using I16HashTableContext = vectorized::PrimaryTypeHashTableContext<vectorized::UInt16>; +using I32HashTableContext = vectorized::PrimaryTypeHashTableContext<vectorized::UInt32>; +using I64HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt64>; +using I128HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt128>; +using I256HashTableContext = vectorized::PrimaryTypeHashTableContext<UInt256>; + +template <bool has_null> +using I64FixedKeyHashTableContext = vectorized::FixedKeyHashTableContext<UInt64, has_null>; + +template <bool has_null> +using I128FixedKeyHashTableContext = vectorized::FixedKeyHashTableContext<UInt128, has_null>; + +template <bool has_null> +using I256FixedKeyHashTableContext = vectorized::FixedKeyHashTableContext<UInt256, has_null>; + +template <bool has_null> +using I136FixedKeyHashTableContext = vectorized::FixedKeyHashTableContext<UInt136, has_null>; + +using HashTableVariants = + std::variant<std::monostate, vectorized::SerializedHashTableContext, I8HashTableContext, + I16HashTableContext, I32HashTableContext, I64HashTableContext, + I128HashTableContext, I256HashTableContext, I64FixedKeyHashTableContext<true>, + I64FixedKeyHashTableContext<false>, I128FixedKeyHashTableContext<true>, + I128FixedKeyHashTableContext<false>, I256FixedKeyHashTableContext<true>, + I256FixedKeyHashTableContext<false>, I136FixedKeyHashTableContext<true>, + I136FixedKeyHashTableContext<false>>; + +} // namespace doris::pipeline diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/pipeline/common/runtime_filter_consumer.cpp similarity index 96% rename from be/src/vec/exec/runtime_filter_consumer.cpp rename to be/src/pipeline/common/runtime_filter_consumer.cpp index 8f3a0e9fac1..0e9c2d0f304 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/pipeline/common/runtime_filter_consumer.cpp @@ -15,16 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "vec/exec/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer.h" #include "pipeline/pipeline_task.h" -namespace doris::vectorized { +namespace doris::pipeline { RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, const std::vector<TRuntimeFilterDesc>& runtime_filters, const RowDescriptor& row_descriptor, - VExprContextSPtrs& conjuncts) + vectorized::VExprContextSPtrs& conjuncts) : _filter_id(filter_id), _runtime_filter_descs(runtime_filters), _row_descriptor_ref(row_descriptor), @@ -148,7 +148,7 @@ Status RuntimeFilterConsumer::_append_rf_into_conjuncts( } for (const auto& expr : vexprs) { - VExprContextSPtr conjunct = VExprContext::create_shared(expr); + vectorized::VExprContextSPtr conjunct = vectorized::VExprContext::create_shared(expr); RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref)); RETURN_IF_ERROR(conjunct->open(_state)); _rf_vexpr_set.insert(expr); @@ -202,4 +202,4 @@ void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) { _acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime"); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/pipeline/common/runtime_filter_consumer.h similarity index 92% rename from be/src/vec/exec/runtime_filter_consumer.h rename to be/src/pipeline/common/runtime_filter_consumer.h index 7f06edcbf09..9bee6053f6f 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/pipeline/common/runtime_filter_consumer.h @@ -20,13 +20,14 @@ #include "exprs/runtime_filter.h" #include "pipeline/dependency.h" -namespace doris::vectorized { +namespace doris::pipeline { class RuntimeFilterConsumer { public: RuntimeFilterConsumer(const int32_t filter_id, const std::vector<TRuntimeFilterDesc>& runtime_filters, - const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); + const RowDescriptor& row_descriptor, + vectorized::VExprContextSPtrs& conjuncts); ~RuntimeFilterConsumer() = default; Status init(RuntimeState* state, bool need_local_merge = false); @@ -65,7 +66,7 @@ protected: // Set to true if the runtime filter is ready. std::vector<bool> _runtime_filter_ready_flag; std::mutex _rf_locks; - phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set; + phmap::flat_hash_set<vectorized::VExprSPtr> _rf_vexpr_set; RuntimeState* _state = nullptr; private: @@ -85,4 +86,4 @@ private: RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 5ce5e0a56a3..68c00af409d 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -21,11 +21,14 @@ #include <mutex> #include "common/logging.h" +#include "exprs/runtime_filter.h" #include "pipeline/exec/multi_cast_data_streamer.h" #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "vec/exprs/vectorized_agg_fn.h" +#include "vec/exprs/vslot_ref.h" #include "vec/spill/spill_stream_manager.h" namespace doris::pipeline { @@ -297,7 +300,7 @@ Status AggSharedState::reset_hash_table() { RETURN_IF_ERROR(st); } - aggregate_data_container.reset(new vectorized::AggregateDataContainer( + aggregate_data_container.reset(new AggregateDataContainer( sizeof(typename HashTableType::key_type), ((total_size_of_aggregate_states + align_aggregate_states - 1) / align_aggregate_states) * @@ -376,4 +379,19 @@ MultiCastSharedState::MultiCastSharedState(const RowDescriptor& row_desc, Object : multi_cast_data_streamer(std::make_unique<pipeline::MultiCastDataStreamer>( row_desc, pool, cast_sender_count, true)) {} +int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) { + auto ctxs = evaluator->input_exprs_ctxs(); + CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref()) + << "input_exprs_ctxs is invalid, input_exprs_ctx[0]=" + << ctxs[0]->root()->debug_string(); + return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id(); +} + +Status AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) { + for (int i = 0; i < aggregate_evaluators.size(); ++i) { + aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]); + } + return Status::OK(); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index b7b4258f53a..46335caade5 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -29,16 +29,20 @@ #include "common/logging.h" #include "concurrentqueue.h" #include "gutil/integral_types.h" +#include "pipeline/common/agg_utils.h" +#include "pipeline/common/join_utils.h" #include "pipeline/exec/data_queue.h" -#include "vec/common/hash_table/hash_map_context_creator.h" +#include "pipeline/exec/join/process_hash_table_probe.h" #include "vec/common/sort/partition_sorter.h" #include "vec/common/sort/sorter.h" #include "vec/core/types.h" -#include "vec/exec/join/process_hash_table_probe.h" -#include "vec/exec/join/vhash_join_node.h" -#include "vec/exec/vaggregation_node.h" #include "vec/spill/spill_stream.h" +namespace doris::vectorized { +class AggFnEvaluator; +class VSlotRef; +} // namespace doris::vectorized + namespace doris::pipeline { class Dependency; @@ -294,7 +298,7 @@ struct AggSharedState : public BasicSharedState { ENABLE_FACTORY_CREATOR(AggSharedState) public: AggSharedState() { - agg_data = std::make_unique<vectorized::AggregatedDataVariants>(); + agg_data = std::make_unique<AggregatedDataVariants>(); agg_arena_pool = std::make_unique<vectorized::Arena>(); } ~AggSharedState() override { @@ -313,17 +317,11 @@ public: // We should call this function only at 1st phase. // 1st phase: is_merge=true, only have one SlotRef. // 2nd phase: is_merge=false, maybe have multiple exprs. - static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator) { - auto ctxs = evaluator->input_exprs_ctxs(); - CHECK(ctxs.size() == 1 && ctxs[0]->root()->is_slot_ref()) - << "input_exprs_ctxs is invalid, input_exprs_ctx[0]=" - << ctxs[0]->root()->debug_string(); - return ((vectorized::VSlotRef*)ctxs[0]->root().get())->column_id(); - } + static int get_slot_column_id(const vectorized::AggFnEvaluator* evaluator); - vectorized::AggregatedDataVariantsUPtr agg_data = nullptr; - std::unique_ptr<vectorized::AggregateDataContainer> aggregate_data_container; - vectorized::ArenaUPtr agg_arena_pool; + AggregatedDataVariantsUPtr agg_data = nullptr; + std::unique_ptr<AggregateDataContainer> aggregate_data_container; + ArenaUPtr agg_arena_pool; std::vector<vectorized::AggFnEvaluator*> aggregate_evaluators; // group by k1,k2 vectorized::VExprContextSPtrs probe_expr_ctxs; @@ -445,12 +443,7 @@ private: agg_data_created_without_key = false; } } - Status _destroy_agg_status(vectorized::AggregateDataPtr data) { - for (int i = 0; i < aggregate_evaluators.size(); ++i) { - aggregate_evaluators[i]->function()->destroy(data + offsets_of_aggregate_states[i]); - } - return Status::OK(); - } + Status _destroy_agg_status(vectorized::AggregateDataPtr data); }; struct AggSpillPartition; @@ -613,19 +606,6 @@ public: std::vector<int64_t> ordey_by_column_idxs; }; -using JoinOpVariants = - std::variant<std::integral_constant<TJoinOp::type, TJoinOp::INNER_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::LEFT_SEMI_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::LEFT_ANTI_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::LEFT_OUTER_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::FULL_OUTER_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_OUTER_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::CROSS_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_SEMI_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::RIGHT_ANTI_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, - std::integral_constant<TJoinOp::type, TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>; - struct JoinSharedState : public BasicSharedState { // For some join case, we can apply a short circuit strategy // 1. _has_null_in_build_side = true @@ -646,8 +626,7 @@ struct HashJoinSharedState : public JoinSharedState { std::shared_ptr<vectorized::Arena> arena = std::make_shared<vectorized::Arena>(); // maybe share hash table with other fragment instances - std::shared_ptr<vectorized::HashTableVariants> hash_table_variants = - std::make_shared<vectorized::HashTableVariants>(); + std::shared_ptr<HashTableVariants> hash_table_variants = std::make_shared<HashTableVariants>(); const std::vector<TupleDescriptor*> build_side_child_desc; size_t build_exprs_size = 0; std::shared_ptr<vectorized::Block> build_block; @@ -696,23 +675,23 @@ public: ~AsyncWriterDependency() override = default; }; -using SetHashTableVariants = std::variant< - std::monostate, - vectorized::MethodSerialized<HashMap<StringRef, vectorized::RowRefListWithFlags>>, - vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>, - vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>, - vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>, - vectorized::SetPrimaryTypeHashTableContext<UInt64>, - vectorized::SetPrimaryTypeHashTableContext<UInt128>, - vectorized::SetPrimaryTypeHashTableContext<UInt256>, - vectorized::SetFixedKeyHashTableContext<UInt64, true>, - vectorized::SetFixedKeyHashTableContext<UInt64, false>, - vectorized::SetFixedKeyHashTableContext<UInt128, true>, - vectorized::SetFixedKeyHashTableContext<UInt128, false>, - vectorized::SetFixedKeyHashTableContext<UInt256, true>, - vectorized::SetFixedKeyHashTableContext<UInt256, false>, - vectorized::SetFixedKeyHashTableContext<UInt136, true>, - vectorized::SetFixedKeyHashTableContext<UInt136, false>>; +using SetHashTableVariants = + std::variant<std::monostate, + vectorized::MethodSerialized<HashMap<StringRef, RowRefListWithFlags>>, + vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt8>, + vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt16>, + vectorized::SetPrimaryTypeHashTableContext<vectorized::UInt32>, + vectorized::SetPrimaryTypeHashTableContext<UInt64>, + vectorized::SetPrimaryTypeHashTableContext<UInt128>, + vectorized::SetPrimaryTypeHashTableContext<UInt256>, + vectorized::SetFixedKeyHashTableContext<UInt64, true>, + vectorized::SetFixedKeyHashTableContext<UInt64, false>, + vectorized::SetFixedKeyHashTableContext<UInt128, true>, + vectorized::SetFixedKeyHashTableContext<UInt128, false>, + vectorized::SetFixedKeyHashTableContext<UInt256, true>, + vectorized::SetFixedKeyHashTableContext<UInt256, false>, + vectorized::SetFixedKeyHashTableContext<UInt136, true>, + vectorized::SetFixedKeyHashTableContext<UInt136, false>>; struct SetSharedState : public BasicSharedState { ENABLE_FACTORY_CREATOR(SetSharedState) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 9870571fa5e..41498fd94fa 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -24,6 +24,7 @@ #include "pipeline/exec/operator.h" #include "runtime/primitive_type.h" #include "vec/common/hash_table/hash.h" +#include "vec/exprs/vectorized_agg_fn.h" namespace doris::pipeline { @@ -110,25 +111,24 @@ Status AggSinkLocalState::open(RuntimeState* state) { } else { RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs)); - std::visit(vectorized::Overload { - [&](std::monostate& arg) { - throw doris::Exception(ErrorCode::INTERNAL_ERROR, - "uninited hash table"); - }, - [&](auto& agg_method) { - using HashTableType = std::decay_t<decltype(agg_method)>; - using KeyType = typename HashTableType::Key; - - /// some aggregate functions (like AVG for decimal) have align issues. - Base::_shared_state->aggregate_data_container = - std::make_unique<vectorized::AggregateDataContainer>( - - sizeof(KeyType), - ((p._total_size_of_aggregate_states + - p._align_aggregate_states - 1) / - p._align_aggregate_states) * - p._align_aggregate_states); - }}, + std::visit(vectorized::Overload {[&](std::monostate& arg) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, + "uninited hash table"); + }, + [&](auto& agg_method) { + using HashTableType = + std::decay_t<decltype(agg_method)>; + using KeyType = typename HashTableType::Key; + + /// some aggregate functions (like AVG for decimal) have align issues. + Base::_shared_state->aggregate_data_container = + std::make_unique<AggregateDataContainer>( + sizeof(KeyType), + ((p._total_size_of_aggregate_states + + p._align_aggregate_states - 1) / + p._align_aggregate_states) * + p._align_aggregate_states); + }}, _agg_data->method_variant); if (p._is_merge) { _executor = std::make_unique<Executor<false, true>>(); diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 2964cba9a1d..add6712453f 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -119,7 +119,7 @@ protected: vectorized::Block _preagg_block = vectorized::Block(); - vectorized::AggregatedDataVariants* _agg_data = nullptr; + AggregatedDataVariants* _agg_data = nullptr; vectorized::Arena* _agg_arena_pool = nullptr; std::unique_ptr<ExecutorBase> _executor = nullptr; @@ -156,7 +156,7 @@ public: bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; - vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { + AggregatedDataVariants* get_agg_data(RuntimeState* state) { auto& local_state = get_local_state(state); return local_state._agg_data; } diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index cca9fefbdb2..5b371877f36 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -22,6 +22,7 @@ #include "common/exception.h" #include "pipeline/exec/operator.h" +#include "vec/exprs/vectorized_agg_fn.h" namespace doris::pipeline { diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 5358cffcff5..43859c7cebd 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -21,6 +21,7 @@ #include <string> #include "pipeline/exec/operator.h" +#include "vec/exprs/vectorized_agg_fn.h" namespace doris::pipeline { diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 6d30d02ff53..bc8f3279f92 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -21,6 +21,7 @@ #include "pipeline/exec/operator.h" #include "vec/columns/column_nullable.h" +#include "vec/exprs/vectorized_agg_fn.h" namespace doris::pipeline { diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 95b284c94b4..39e35ed8836 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -19,10 +19,11 @@ #include <memory> +#include "exprs/runtime_filter.h" +#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h" +#include "pipeline/common/data_gen_functions/vnumbers_tvf.h" #include "pipeline/exec/operator.h" #include "util/runtime_profile.h" -#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h" -#include "vec/exec/data_gen_functions/vnumbers_tvf.h" namespace doris { class RuntimeState; @@ -76,7 +77,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); auto& p = _parent->cast<DataGenSourceOperatorX>(); - _table_func = std::make_shared<vectorized::VNumbersTVF>(p._tuple_id, p._tuple_desc); + _table_func = std::make_shared<VNumbersTVF>(p._tuple_id, p._tuple_desc); _table_func->set_tuple_desc(p._tuple_desc); RETURN_IF_ERROR(_table_func->set_scan_ranges(info.scan_ranges)); diff --git a/be/src/pipeline/exec/datagen_operator.h b/be/src/pipeline/exec/datagen_operator.h index d4d649a6c06..8aeeea2a699 100644 --- a/be/src/pipeline/exec/datagen_operator.h +++ b/be/src/pipeline/exec/datagen_operator.h @@ -20,8 +20,8 @@ #include <stdint.h> #include "common/status.h" +#include "pipeline/common/data_gen_functions/vdata_gen_function_inf.h" #include "pipeline/exec/operator.h" -#include "vec/exec/data_gen_functions/vdata_gen_function_inf.h" namespace doris { class RuntimeState; @@ -43,7 +43,7 @@ public: private: friend class DataGenSourceOperatorX; - std::shared_ptr<vectorized::VDataGenFunctionInf> _table_func; + std::shared_ptr<VDataGenFunctionInf> _table_func; }; class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index f0cb738a303..73ce8ce5fb4 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -23,6 +23,7 @@ #include <utility> #include "common/compiler_util.h" // IWYU pragma: keep +#include "vec/exprs/vectorized_agg_fn.h" namespace doris { class ExecNode; @@ -61,7 +62,7 @@ DistinctStreamingAggLocalState::DistinctStreamingAggLocalState(RuntimeState* sta dummy_mapped_data(std::make_shared<char>('A')), batch_size(state->batch_size()), _agg_arena_pool(std::make_unique<vectorized::Arena>()), - _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()), + _agg_data(std::make_unique<AggregatedDataVariants>()), _agg_profile_arena(std::make_unique<vectorized::Arena>()), _child_block(vectorized::Block::create_unique()), _aggregated_block(vectorized::Block::create_unique()) {} diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index e8166446678..d6ff5fde0c5 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -73,7 +73,7 @@ private: bool _stop_emplace_flag = false; const int batch_size; std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; - vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; + AggregatedDataVariantsUPtr _agg_data = nullptr; std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index aab16ff3bff..9fcfdd999d4 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -81,12 +81,11 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca } properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range->shard_id); properties[ESScanReader::KEY_BATCH_SIZE] = - std::to_string(vectorized::RuntimeFilterConsumer::_state->batch_size()); + std::to_string(RuntimeFilterConsumer::_state->batch_size()); properties[ESScanReader::KEY_HOST_PORT] = get_host_and_port(es_scan_range->es_hosts); // push down limit to Elasticsearch // if predicate in _conjunct_ctxs can not be processed by Elasticsearch, we can not push down limit operator to Elasticsearch - if (p.limit() != -1 && - p.limit() <= vectorized::RuntimeFilterConsumer::_state->batch_size()) { + if (p.limit() != -1 && p.limit() <= RuntimeFilterConsumer::_state->batch_size()) { properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(p.limit()); } @@ -95,12 +94,11 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca properties, p._column_names, p._docvalue_context, &doc_value_mode); std::shared_ptr<vectorized::NewEsScanner> scanner = vectorized::NewEsScanner::create_shared( - vectorized::RuntimeFilterConsumer::_state, this, p._limit_per_scanner, p._tuple_id, - properties, p._docvalue_context, doc_value_mode, - vectorized::RuntimeFilterConsumer::_state->runtime_profile()); + RuntimeFilterConsumer::_state, this, p._limit_per_scanner, p._tuple_id, properties, + p._docvalue_context, doc_value_mode, + RuntimeFilterConsumer::_state->runtime_profile()); - RETURN_IF_ERROR( - scanner->prepare(vectorized::RuntimeFilterConsumer::_state, Base::_conjuncts)); + RETURN_IF_ERROR(scanner->prepare(RuntimeFilterConsumer::_state, Base::_conjuncts)); scanners->push_back(scanner); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index c77761b01bf..3ea10110084 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -289,10 +289,8 @@ Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, auto with_other_conjuncts) -> Status { using HashTableCtxType = std::decay_t<decltype(arg)>; using JoinOpType = std::decay_t<decltype(join_op)>; - vectorized::ProcessHashTableBuild<HashTableCtxType, - HashJoinBuildSinkLocalState> - hash_table_build_process(rows, raw_ptrs, this, state->batch_size(), - state); + ProcessHashTableBuild<HashTableCtxType> hash_table_build_process( + rows, raw_ptrs, this, state->batch_size(), state); auto old_hash_table_size = arg.hash_table->get_byte_size(); auto old_key_size = arg.serialized_keys_size(true); auto st = hash_table_build_process.template run< @@ -338,26 +336,22 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { switch (_build_expr_ctxs[0]->root()->result_type()) { case TYPE_BOOLEAN: case TYPE_TINYINT: - _shared_state->hash_table_variants - ->emplace<vectorized::I8HashTableContext>(); + _shared_state->hash_table_variants->emplace<I8HashTableContext>(); break; case TYPE_SMALLINT: - _shared_state->hash_table_variants - ->emplace<vectorized::I16HashTableContext>(); + _shared_state->hash_table_variants->emplace<I16HashTableContext>(); break; case TYPE_INT: case TYPE_FLOAT: case TYPE_DATEV2: - _shared_state->hash_table_variants - ->emplace<vectorized::I32HashTableContext>(); + _shared_state->hash_table_variants->emplace<I32HashTableContext>(); break; case TYPE_BIGINT: case TYPE_DOUBLE: case TYPE_DATETIME: case TYPE_DATE: case TYPE_DATETIMEV2: - _shared_state->hash_table_variants - ->emplace<vectorized::I64HashTableContext>(); + _shared_state->hash_table_variants->emplace<I64HashTableContext>(); break; case TYPE_LARGEINT: case TYPE_DECIMALV2: @@ -375,14 +369,11 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { : type_ptr->get_type_id(); vectorized::WhichDataType which(idx); if (which.is_decimal32()) { - _shared_state->hash_table_variants - ->emplace<vectorized::I32HashTableContext>(); + _shared_state->hash_table_variants->emplace<I32HashTableContext>(); } else if (which.is_decimal64()) { - _shared_state->hash_table_variants - ->emplace<vectorized::I64HashTableContext>(); + _shared_state->hash_table_variants->emplace<I64HashTableContext>(); } else { - _shared_state->hash_table_variants - ->emplace<vectorized::I128HashTableContext>(); + _shared_state->hash_table_variants->emplace<I128HashTableContext>(); } break; } @@ -606,7 +597,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } }, *local_state._shared_state->hash_table_variants, - *std::static_pointer_cast<vectorized::HashTableVariants>( + *std::static_pointer_cast<HashTableVariants>( _shared_hash_table_context->hash_table_variants)); local_state._shared_state->build_block = _shared_hash_table_context->block; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index fb66c5222ab..d785c20ee7f 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -17,8 +17,7 @@ #pragma once -#include <stdint.h> - +#include "exprs/runtime_filter_slots.h" #include "join_build_sink_operator.h" #include "operator.h" @@ -67,8 +66,8 @@ protected: const std::vector<int>& res_col_ids); friend class HashJoinBuildSinkOperatorX; friend class PartitionedHashJoinSinkLocalState; - template <class HashTableContext, typename Parent> - friend struct vectorized::ProcessHashTableBuild; + template <class HashTableContext> + friend struct ProcessHashTableBuild; // build expr vectorized::VExprContextSPtrs _build_expr_ctxs; @@ -176,4 +175,57 @@ private: const bool _need_local_merge; }; +template <class HashTableContext> +struct ProcessHashTableBuild { + ProcessHashTableBuild(int rows, vectorized::ColumnRawPtrs& build_raw_ptrs, + HashJoinBuildSinkLocalState* parent, int batch_size, RuntimeState* state) + : _rows(rows), + _build_raw_ptrs(build_raw_ptrs), + _parent(parent), + _batch_size(batch_size), + _state(state) {} + + template <int JoinOpType, bool ignore_null, bool short_circuit_for_null, + bool with_other_conjuncts> + Status run(HashTableContext& hash_table_ctx, vectorized::ConstNullMapPtr null_map, + bool* has_null_key) { + if (short_circuit_for_null || ignore_null) { + // first row is mocked and is null + for (uint32_t i = 1; i < _rows; i++) { + if ((*null_map)[i]) { + *has_null_key = true; + } + } + if (short_circuit_for_null && *has_null_key) { + return Status::OK(); + } + } + + SCOPED_TIMER(_parent->_build_table_insert_timer); + hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _batch_size, + *has_null_key); + + hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, + null_map ? null_map->data() : nullptr, true, true, + hash_table_ctx.hash_table->get_bucket_size()); + hash_table_ctx.hash_table->template build<JoinOpType, with_other_conjuncts>( + hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows); + hash_table_ctx.bucket_nums.resize(_batch_size); + hash_table_ctx.bucket_nums.shrink_to_fit(); + + COUNTER_SET(_parent->_hash_table_memory_usage, + (int64_t)hash_table_ctx.hash_table->get_byte_size()); + COUNTER_SET(_parent->_build_arena_memory_usage, + (int64_t)hash_table_ctx.serialized_keys_size(true)); + return Status::OK(); + } + +private: + const uint32_t _rows; + vectorized::ColumnRawPtrs& _build_raw_ptrs; + HashJoinBuildSinkLocalState* _parent = nullptr; + int _batch_size; + RuntimeState* _state = nullptr; +}; + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 56630efba3a..002ead551f6 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -73,9 +73,8 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) { std::visit( [&](auto&& join_op_variants, auto have_other_join_conjunct) { using JoinOpType = std::decay_t<decltype(join_op_variants)>; - _process_hashtable_ctx_variants - ->emplace<vectorized::ProcessHashTableProbe<JoinOpType::value>>( - this, state->batch_size()); + _process_hashtable_ctx_variants->emplace<ProcessHashTableProbe<JoinOpType::value>>( + this, state->batch_size()); }, _shared_state->join_op_variants, vectorized::make_bool_variant(p._have_other_join_conjunct)); diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 310cf52fec6..028b0583167 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -30,17 +30,17 @@ namespace pipeline { class HashJoinProbeLocalState; using HashTableCtxVariants = - std::variant<std::monostate, vectorized::ProcessHashTableProbe<TJoinOp::INNER_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::CROSS_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, - vectorized::ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>; + std::variant<std::monostate, ProcessHashTableProbe<TJoinOp::INNER_JOIN>, + ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>, + ProcessHashTableProbe<TJoinOp::LEFT_ANTI_JOIN>, + ProcessHashTableProbe<TJoinOp::LEFT_OUTER_JOIN>, + ProcessHashTableProbe<TJoinOp::FULL_OUTER_JOIN>, + ProcessHashTableProbe<TJoinOp::RIGHT_OUTER_JOIN>, + ProcessHashTableProbe<TJoinOp::CROSS_JOIN>, + ProcessHashTableProbe<TJoinOp::RIGHT_SEMI_JOIN>, + ProcessHashTableProbe<TJoinOp::RIGHT_ANTI_JOIN>, + ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>, + ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>; class HashJoinProbeOperatorX; class HashJoinProbeLocalState final @@ -85,7 +85,7 @@ private: Status _extract_join_column(vectorized::Block& block, const std::vector<int>& res_col_ids); friend class HashJoinProbeOperatorX; template <int JoinOpType> - friend struct vectorized::ProcessHashTableProbe; + friend struct ProcessHashTableProbe; int _probe_index = -1; uint32_t _build_index = 0; diff --git a/be/src/vec/exec/join/cross_join_impl.cpp b/be/src/pipeline/exec/join/cross_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/cross_join_impl.cpp rename to be/src/pipeline/exec/join/cross_join_impl.cpp index 74fbba20277..16c4bf7f583 100644 --- a/be/src/vec/exec/join/cross_join_impl.cpp +++ b/be/src/pipeline/exec/join/cross_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::CROSS_JOIN); diff --git a/be/src/vec/exec/join/full_outer_join_impl.cpp b/be/src/pipeline/exec/join/full_outer_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/full_outer_join_impl.cpp rename to be/src/pipeline/exec/join/full_outer_join_impl.cpp index 27868390970..0e1ddb58642 100644 --- a/be/src/vec/exec/join/full_outer_join_impl.cpp +++ b/be/src/pipeline/exec/join/full_outer_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::FULL_OUTER_JOIN); diff --git a/be/src/vec/exec/join/inner_join_impl.cpp b/be/src/pipeline/exec/join/inner_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/inner_join_impl.cpp rename to be/src/pipeline/exec/join/inner_join_impl.cpp index 6c1e0d399a5..21767ecafa0 100644 --- a/be/src/vec/exec/join/inner_join_impl.cpp +++ b/be/src/pipeline/exec/join/inner_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::INNER_JOIN); diff --git a/be/src/vec/exec/join/join_op.h b/be/src/pipeline/exec/join/join_op.h similarity index 92% rename from be/src/vec/exec/join/join_op.h rename to be/src/pipeline/exec/join/join_op.h index 62569270d9e..616753b72de 100644 --- a/be/src/vec/exec/join/join_op.h +++ b/be/src/pipeline/exec/join/join_op.h @@ -20,7 +20,7 @@ #include "vec/common/columns_hashing.h" #include "vec/core/block.h" -namespace doris::vectorized { +namespace doris::pipeline { /** * Now we have different kinds of RowRef for join operation. Overall, RowRef is the base class and * the class inheritance is below: @@ -72,7 +72,7 @@ struct Batch { bool full() const { return size == MAX_SIZE; } - Batch<RowRefType>* insert(RowRefType&& row_ref, Arena& pool) { + Batch<RowRefType>* insert(RowRefType&& row_ref, vectorized::Arena& pool) { if (full()) { auto batch = pool.alloc<Batch<RowRefType>>(); *batch = Batch<RowRefType>(this); @@ -132,7 +132,9 @@ struct RowRefList : RowRef { ForwardIterator<RowRefList> begin() { return ForwardIterator<RowRefList>(this); } /// insert element after current one - void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(std::move(row_ref)); } + void insert(RowRefType&& row_ref, vectorized::Arena& pool) { + next.emplace_back(std::move(row_ref)); + } void clear() { next.clear(); } @@ -152,7 +154,7 @@ struct RowRefListWithFlag : RowRef { } /// insert element after current one - void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(row_ref); } + void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); } void clear() { next.clear(); } @@ -174,7 +176,7 @@ struct RowRefListWithFlags : RowRefWithFlag { } /// insert element after current one - void insert(RowRefType&& row_ref, Arena& pool) { next.emplace_back(row_ref); } + void insert(RowRefType&& row_ref, vectorized::Arena& pool) { next.emplace_back(row_ref); } void clear() { next.clear(); } @@ -183,4 +185,4 @@ private: std::vector<RowRefType> next; }; -} // namespace doris::vectorized +} // namespace doris::pipeline diff --git a/be/src/vec/exec/join/left_anti_join_impl.cpp b/be/src/pipeline/exec/join/left_anti_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/left_anti_join_impl.cpp rename to be/src/pipeline/exec/join/left_anti_join_impl.cpp index e1e1037eeab..ab6a45442b2 100644 --- a/be/src/vec/exec/join/left_anti_join_impl.cpp +++ b/be/src/pipeline/exec/join/left_anti_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::LEFT_ANTI_JOIN); diff --git a/be/src/vec/exec/join/left_outer_join_impl.cpp b/be/src/pipeline/exec/join/left_outer_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/left_outer_join_impl.cpp rename to be/src/pipeline/exec/join/left_outer_join_impl.cpp index 681bd2b8164..4c8e2c5ac9f 100644 --- a/be/src/vec/exec/join/left_outer_join_impl.cpp +++ b/be/src/pipeline/exec/join/left_outer_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::LEFT_OUTER_JOIN); diff --git a/be/src/vec/exec/join/left_semi_join_impl.cpp b/be/src/pipeline/exec/join/left_semi_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/left_semi_join_impl.cpp rename to be/src/pipeline/exec/join/left_semi_join_impl.cpp index d06fe1783f6..8365403abfe 100644 --- a/be/src/vec/exec/join/left_semi_join_impl.cpp +++ b/be/src/pipeline/exec/join/left_semi_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::LEFT_SEMI_JOIN); diff --git a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp rename to be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp index 8b541d37495..834335f282f 100644 --- a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp +++ b/be/src/pipeline/exec/join/null_aware_left_anti_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); diff --git a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp rename to be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp index 98d39e61479..b079a0ec95f 100644 --- a/be/src/vec/exec/join/null_aware_left_semi_join_impl.cpp +++ b/be/src/pipeline/exec/join/null_aware_left_semi_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN); diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/pipeline/exec/join/process_hash_table_probe.h similarity index 73% rename from be/src/vec/exec/join/process_hash_table_probe.h rename to be/src/pipeline/exec/join/process_hash_table_probe.h index 11df66d4aaa..965d62192b2 100644 --- a/be/src/vec/exec/join/process_hash_table_probe.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe.h @@ -25,38 +25,40 @@ #include "vec/common/arena.h" namespace doris { -namespace pipeline { -class HashJoinProbeLocalState; -} namespace vectorized { - class Block; class MutableBlock; struct HashJoinProbeContext; +} // namespace vectorized +namespace pipeline { + +class HashJoinProbeLocalState; -using MutableColumnPtr = IColumn::MutablePtr; -using MutableColumns = std::vector<MutableColumnPtr>; +using MutableColumnPtr = vectorized::IColumn::MutablePtr; +using MutableColumns = std::vector<vectorized::MutableColumnPtr>; -using NullMap = ColumnUInt8::Container; -using ConstNullMapPtr = const NullMap*; +using NullMap = vectorized::ColumnUInt8::Container; +using ConstNullMapPtr = const vectorized::NullMap*; template <int JoinOpType> struct ProcessHashTableProbe { - ProcessHashTableProbe(pipeline::HashJoinProbeLocalState* parent, int batch_size); + ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size); ~ProcessHashTableProbe() = default; // output build side result column - void build_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags, - int size, bool have_other_join_conjunct, bool is_mark_join); + void build_side_output_column(vectorized::MutableColumns& mcol, + const std::vector<bool>& output_slot_flags, int size, + bool have_other_join_conjunct, bool is_mark_join); - void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags, - int size, int last_probe_index, bool all_match_one, + void probe_side_output_column(vectorized::MutableColumns& mcol, + const std::vector<bool>& output_slot_flags, int size, + int last_probe_index, bool all_match_one, bool have_other_join_conjunct); template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, size_t probe_rows, - bool is_mark_join, bool have_other_join_conjunct); + vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, + size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); // Only process the join with no other join conjunct, because of no other join conjunt // the output block struct is same with mutable block. we can do more opt on it and simplify @@ -65,16 +67,17 @@ struct ProcessHashTableProbe { template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType, bool with_other_conjuncts, bool is_mark_join> Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, size_t probe_rows); + vectorized::MutableBlock& mutable_block, vectorized::Block* output_block, + size_t probe_rows); // In the presence of other join conjunct, the process of join become more complicated. // each matching join column need to be processed by other join conjunct. so the struct of mutable block // and output block may be different // The output result is determined by the other join conjunct result and same_to_prev struct - Status do_other_join_conjuncts(Block* output_block, std::vector<uint8_t>& visited, + Status do_other_join_conjuncts(vectorized::Block* output_block, std::vector<uint8_t>& visited, bool has_null_in_build_side); template <bool with_other_conjuncts> - Status do_mark_join_conjuncts(Block* output_block, size_t hash_table_bucket_size); + Status do_mark_join_conjuncts(vectorized::Block* output_block, size_t hash_table_bucket_size); template <typename HashTableType> typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows, @@ -84,8 +87,9 @@ struct ProcessHashTableProbe { // Process full outer join/ right join / right semi/anti join to output the join result // in hash table template <typename HashTableType> - Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block, - Block* output_block, bool* eos, bool is_mark_join); + Status process_data_in_hashtable(HashTableType& hash_table_ctx, + vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, bool* eos, bool is_mark_join); /// For null aware join with other conjuncts, if the probe key of one row on left side is null, /// we should make this row match with all rows in build side. @@ -93,8 +97,8 @@ struct ProcessHashTableProbe { pipeline::HashJoinProbeLocalState* _parent = nullptr; const int _batch_size; - const std::shared_ptr<Block>& _build_block; - std::unique_ptr<Arena> _arena; + const std::shared_ptr<vectorized::Block>& _build_block; + std::unique_ptr<vectorized::Arena> _arena; std::vector<StringRef> _probe_keys; std::vector<uint32_t> _probe_indexs; @@ -110,13 +114,13 @@ struct ProcessHashTableProbe { std::vector<int> _build_blocks_locs; // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr; + vectorized::ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr; // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr; + vectorized::ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr; size_t _serialized_key_buffer_size {0}; uint8_t* _serialized_key_buffer = nullptr; - std::unique_ptr<Arena> _serialize_key_arena; + std::unique_ptr<vectorized::Arena> _serialize_key_arena; std::vector<char> _probe_side_find_result; bool _have_other_join_conjunct; @@ -139,5 +143,5 @@ struct ProcessHashTableProbe { int _right_col_len; }; -} // namespace vectorized +} // namespace pipeline } // namespace doris diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h similarity index 83% rename from be/src/vec/exec/join/process_hash_table_probe_impl.h rename to be/src/pipeline/exec/join/process_hash_table_probe_impl.h index 8654f17591c..5e023f2c861 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h @@ -27,23 +27,22 @@ #include "vec/columns/column_filter_helper.h" #include "vec/columns/column_nullable.h" #include "vec/exprs/vexpr_context.h" -#include "vhash_join_node.h" -namespace doris::vectorized { +namespace doris::pipeline { template <int JoinOpType> -ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbeLocalState* parent, +ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState* parent, int batch_size) : _parent(parent), _batch_size(batch_size), _build_block(parent->build_block()), _tuple_is_null_left_flags(parent->is_outer_join() - ? &(reinterpret_cast<ColumnUInt8&>( + ? &(reinterpret_cast<vectorized::ColumnUInt8&>( *parent->_tuple_is_null_left_flag_column) .get_data()) : nullptr), _tuple_is_null_right_flags(parent->is_outer_join() - ? &(reinterpret_cast<ColumnUInt8&>( + ? &(reinterpret_cast<vectorized::ColumnUInt8&>( *parent->_tuple_is_null_right_flag_column) .get_data()) : nullptr), @@ -65,7 +64,7 @@ ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(pipeline::HashJoinProbe template <int JoinOpType> void ProcessHashTableProbe<JoinOpType>::build_side_output_column( - MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, + vectorized::MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, bool have_other_join_conjunct, bool is_mark_join) { SCOPED_TIMER(_build_side_output_timer); constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || @@ -104,7 +103,7 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column( const auto& column = *_build_block->safe_get_by_position(i).column; _build_column_has_null[i] = false; if (output_slot_flags[i] && column.is_nullable()) { - const auto& nullable = assert_cast<const ColumnNullable&>(column); + const auto& nullable = assert_cast<const vectorized::ColumnNullable&>(column); _build_column_has_null[i] = !simd::contain_byte( nullable.get_null_map_data().data() + 1, nullable.size() - 1, 1); _need_calculate_build_index_has_zero |= _build_column_has_null[i]; @@ -116,7 +115,7 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column( const auto& column = *_build_block->safe_get_by_position(i).column; if (output_slot_flags[i]) { if (!build_index_has_zero && _build_column_has_null[i]) { - assert_cast<ColumnNullable*>(mcol[i + _right_col_idx].get()) + assert_cast<vectorized::ColumnNullable*>(mcol[i + _right_col_idx].get()) ->insert_indices_from_not_has_null(column, _build_indexs.data(), _build_indexs.data() + size); } else { @@ -132,7 +131,7 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column( template <int JoinOpType> void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( - MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, + vectorized::MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, int last_probe_index, bool all_match_one, bool have_other_join_conjunct) { SCOPED_TIMER(_probe_side_output_timer); auto& probe_block = _parent->_probe_block; @@ -189,9 +188,10 @@ template <int JoinOpType> template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType, bool with_other_conjuncts, bool is_mark_join> Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, - Block* output_block, size_t probe_rows) { + vectorized::ConstNullMapPtr null_map, + vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, + size_t probe_rows) { if (_right_col_len && !_build_block) { return Status::InternalError("build block is nullptr"); } @@ -361,7 +361,7 @@ size_t ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t probe */ template <int JoinOpType> template <bool with_other_conjuncts> -Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_block, +Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* output_block, size_t hash_table_bucket_size) { DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || @@ -380,8 +380,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b auto mark_column_mutable = output_block->get_by_position(_parent->_mark_column_id).column->assume_mutable(); - auto& mark_column = assert_cast<ColumnNullable&>(*mark_column_mutable); - IColumn::Filter& filter = assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data(); + auto& mark_column = assert_cast<vectorized::ColumnNullable&>(*mark_column_mutable); + vectorized::IColumn::Filter& filter = + assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()).get_data(); if (_parent->_mark_join_conjuncts.empty()) { // For null aware anti/semi join, if the equal conjuncts was not matched and the build side has null value, @@ -391,8 +392,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b const bool should_be_null_if_build_side_has_null = *_has_null_in_build_side; mark_column.resize(row_count); - auto* filter_data = - assert_cast<ColumnUInt8&>(mark_column.get_nested_column()).get_data().data(); + auto* filter_data = assert_cast<vectorized::ColumnUInt8&>(mark_column.get_nested_column()) + .get_data() + .data(); auto* mark_null_map = mark_column.get_null_map_data().data(); int last_probe_matched = -1; for (size_t i = 0; i != row_count; ++i) { @@ -417,20 +419,21 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b memset(mark_null_map, 0, row_count); } } else { - RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_mark_join_conjuncts, output_block, - mark_column.get_null_map_column(), filter)); + RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( + _parent->_mark_join_conjuncts, output_block, mark_column.get_null_map_column(), + filter)); } auto* mark_null_map = mark_column.get_null_map_data().data(); auto* mark_filter_data = filter.data(); if constexpr (with_other_conjuncts) { - IColumn::Filter other_conjunct_filter(row_count, 1); + vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); { bool can_be_filter_all = false; - RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, nullptr, - output_block, &other_conjunct_filter, - &can_be_filter_all)); + RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( + _parent->_other_join_conjuncts, nullptr, output_block, &other_conjunct_filter, + &can_be_filter_all)); } DCHECK_EQ(filter.size(), other_conjunct_filter.size()); const auto* other_filter_data = other_conjunct_filter.data(); @@ -444,7 +447,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b } } - auto filter_column = ColumnUInt8::create(row_count, 0); + auto filter_column = vectorized::ColumnUInt8::create(row_count, 0); auto* __restrict filter_map = filter_column->get_data().data(); /** @@ -483,12 +486,13 @@ Status ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(Block* output_b } auto result_column_id = output_block->columns(); - output_block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), ""}); - return Block::filter_block(output_block, result_column_id, result_column_id); + output_block->insert( + {std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); + return vectorized::Block::filter_block(output_block, result_column_id, result_column_id); } template <int JoinOpType> -Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_block, +Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Block* output_block, std::vector<uint8_t>& visited, bool has_null_in_build_side) { // dispose the other join conjunct exec @@ -499,27 +503,28 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_ SCOPED_TIMER(_parent->_process_other_join_conjunct_timer); int orig_columns = output_block->columns(); - IColumn::Filter other_conjunct_filter(row_count, 1); + vectorized::IColumn::Filter other_conjunct_filter(row_count, 1); { bool can_be_filter_all = false; - RETURN_IF_ERROR(VExprContext::execute_conjuncts(_parent->_other_join_conjuncts, nullptr, - output_block, &other_conjunct_filter, - &can_be_filter_all)); + RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts( + _parent->_other_join_conjuncts, nullptr, output_block, &other_conjunct_filter, + &can_be_filter_all)); } - auto filter_column = ColumnUInt8::create(); + auto filter_column = vectorized::ColumnUInt8::create(); filter_column->get_data() = std::move(other_conjunct_filter); auto result_column_id = output_block->columns(); - output_block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), ""}); + output_block->insert( + {std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(), ""}); uint8_t* __restrict filter_column_ptr = - assert_cast<ColumnUInt8&>( + assert_cast<vectorized::ColumnUInt8&>( output_block->get_by_position(result_column_id).column->assume_mutable_ref()) .get_data() .data(); if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - auto new_filter_column = ColumnUInt8::create(row_count); + auto new_filter_column = vectorized::ColumnUInt8::create(row_count); auto* __restrict filter_map = new_filter_column->get_data().data(); // process equal-conjuncts-matched tuples that are newly generated @@ -551,7 +556,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_ } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - auto new_filter_column = ColumnUInt8::create(row_count); + auto new_filter_column = vectorized::ColumnUInt8::create(row_count); auto* __restrict filter_map = new_filter_column->get_data().data(); for (size_t i = 0; i < row_count; ++i) { @@ -602,7 +607,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_ JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { orig_columns = _right_col_idx; } - RETURN_IF_ERROR(Block::filter_block(output_block, result_column_id, orig_columns)); + RETURN_IF_ERROR( + vectorized::Block::filter_block(output_block, result_column_id, orig_columns)); } return Status::OK(); @@ -610,15 +616,14 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(Block* output_ template <int JoinOpType> template <typename HashTableType> -Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& hash_table_ctx, - MutableBlock& mutable_block, - Block* output_block, bool* eos, - bool is_mark_join) { +Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable( + HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, bool* eos, bool is_mark_join) { SCOPED_TIMER(_probe_process_hashtable_timer); auto& mcol = mutable_block.mutable_columns(); if (is_mark_join) { - std::unique_ptr<ColumnFilterHelper> mark_column = - std::make_unique<ColumnFilterHelper>(*mcol[mcol.size() - 1]); + std::unique_ptr<vectorized::ColumnFilterHelper> mark_column = + std::make_unique<vectorized::ColumnFilterHelper>(*mcol[mcol.size() - 1]); *eos = hash_table_ctx.hash_table->template iterate_map<JoinOpType, true>(_build_indexs, mark_column.get()); } else { @@ -651,7 +656,8 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN) { for (int i = 0; i < _right_col_idx; ++i) { - assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size); + assert_cast<vectorized::ColumnNullable*>(mcol[i].get()) + ->insert_many_defaults(block_size); } _tuple_is_null_left_flags->resize_fill(block_size, 1); } @@ -664,8 +670,9 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp template <int JoinOpType> template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, + vectorized::ConstNullMapPtr null_map, + vectorized::MutableBlock& mutable_block, + vectorized::Block* output_block, size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct) { Status res; @@ -675,7 +682,8 @@ Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx, have_other_join_conjunct, is_mark_join>( hash_table_ctx, null_map, mutable_block, output_block, probe_rows); }, - make_bool_variant(is_mark_join), make_bool_variant(have_other_join_conjunct)); + vectorized::make_bool_variant(is_mark_join), + vectorized::make_bool_variant(have_other_join_conjunct)); return res; } @@ -687,50 +695,50 @@ struct ExtractType<T(U)> { using Type = U; }; -#define INSTANTIATION(JoinOpType, T) \ - template Status \ - ProcessHashTableProbe<JoinOpType>::process<false, false, ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe<JoinOpType>::process<false, true, ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe<JoinOpType>::process<true, false, ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - template Status \ - ProcessHashTableProbe<JoinOpType>::process<true, true, ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ - MutableBlock & mutable_block, Block * output_block, size_t probe_rows, \ - bool is_mark_join, bool have_other_join_conjunct); \ - \ - template Status \ - ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \ - ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock & mutable_block, \ - Block * output_block, bool* eos, bool is_mark_join); - -#define INSTANTIATION_FOR(JoinOpType) \ - template struct ProcessHashTableProbe<JoinOpType>; \ - \ - INSTANTIATION(JoinOpType, (SerializedHashTableContext)); \ - INSTANTIATION(JoinOpType, (I8HashTableContext)); \ - INSTANTIATION(JoinOpType, (I16HashTableContext)); \ - INSTANTIATION(JoinOpType, (I32HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64HashTableContext)); \ - INSTANTIATION(JoinOpType, (I128HashTableContext)); \ - INSTANTIATION(JoinOpType, (I256HashTableContext)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>)); \ - INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>)); \ - INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>)); \ - INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>)); \ - INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \ +#define INSTANTIATION(JoinOpType, T) \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process<false, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ + vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ + size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process<false, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ + vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ + size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process<true, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ + vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ + size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process<true, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, vectorized::ConstNullMapPtr null_map, \ + vectorized::MutableBlock & mutable_block, vectorized::Block * output_block, \ + size_t probe_rows, bool is_mark_join, bool have_other_join_conjunct); \ + \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, vectorized::MutableBlock & mutable_block, \ + vectorized::Block * output_block, bool* eos, bool is_mark_join); + +#define INSTANTIATION_FOR(JoinOpType) \ + template struct ProcessHashTableProbe<JoinOpType>; \ + \ + INSTANTIATION(JoinOpType, (vectorized::SerializedHashTableContext)); \ + INSTANTIATION(JoinOpType, (I8HashTableContext)); \ + INSTANTIATION(JoinOpType, (I16HashTableContext)); \ + INSTANTIATION(JoinOpType, (I32HashTableContext)); \ + INSTANTIATION(JoinOpType, (I64HashTableContext)); \ + INSTANTIATION(JoinOpType, (I128HashTableContext)); \ + INSTANTIATION(JoinOpType, (I256HashTableContext)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false>)); \ + INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<true>)); \ INSTANTIATION(JoinOpType, (I136FixedKeyHashTableContext<false>)); -} // namespace doris::vectorized +} // namespace doris::pipeline diff --git a/be/src/vec/exec/join/right_anti_join_impl.cpp b/be/src/pipeline/exec/join/right_anti_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/right_anti_join_impl.cpp rename to be/src/pipeline/exec/join/right_anti_join_impl.cpp index 1c4eec5cf76..e3cc943bf3b 100644 --- a/be/src/vec/exec/join/right_anti_join_impl.cpp +++ b/be/src/pipeline/exec/join/right_anti_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::RIGHT_ANTI_JOIN); diff --git a/be/src/vec/exec/join/right_outer_join_impl.cpp b/be/src/pipeline/exec/join/right_outer_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/right_outer_join_impl.cpp rename to be/src/pipeline/exec/join/right_outer_join_impl.cpp index e732639610e..65c6eaaab18 100644 --- a/be/src/vec/exec/join/right_outer_join_impl.cpp +++ b/be/src/pipeline/exec/join/right_outer_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::RIGHT_OUTER_JOIN); diff --git a/be/src/vec/exec/join/right_semi_join_impl.cpp b/be/src/pipeline/exec/join/right_semi_join_impl.cpp similarity index 96% rename from be/src/vec/exec/join/right_semi_join_impl.cpp rename to be/src/pipeline/exec/join/right_semi_join_impl.cpp index 0ee93734439..f89826d8845 100644 --- a/be/src/vec/exec/join/right_semi_join_impl.cpp +++ b/be/src/pipeline/exec/join/right_semi_join_impl.cpp @@ -19,7 +19,7 @@ #include "process_hash_table_probe_impl.h" -namespace doris::vectorized { +namespace doris::pipeline { INSTANTIATION_FOR(TJoinOp::RIGHT_SEMI_JOIN); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 03d0d380fcb..25bc28b5d43 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -28,10 +28,9 @@ namespace doris::pipeline { MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), - vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(), - parent->runtime_filter_descs(), - static_cast<Parent*>(parent)->_row_desc(), _conjuncts) { -} + RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(), + parent->runtime_filter_descs(), + static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {} Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index a772f1ca70f..8ecbd23764d 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -23,7 +23,7 @@ #include "common/status.h" #include "operator.h" -#include "vec/exec/runtime_filter_consumer.h" +#include "pipeline/common/runtime_filter_consumer.h" namespace doris { class RuntimeState; @@ -37,7 +37,7 @@ class MultiCastDataStreamer; class MultiCastDataStreamerSourceOperatorX; class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastSharedState>, - public vectorized::RuntimeFilterConsumer { + public RuntimeFilterConsumer { public: ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState); using Base = PipelineXLocalState<MultiCastSharedState>; @@ -85,7 +85,6 @@ public: Status prepare(RuntimeState* state) override { RETURN_IF_ERROR(Base::prepare(state)); - // RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state)); // init profile for runtime filter // RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile()); if (_t_data_stream_sink.__isset.output_exprs) { diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 5b458314d25..346540e03ec 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -27,10 +27,10 @@ #include "olap/parallel_scanner_builder.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "pipeline/common/runtime_filter_consumer.h" #include "pipeline/exec/scan_operator.h" #include "service/backend_options.h" #include "util/to_string.h" -#include "vec/exec/runtime_filter_consumer.h" #include "vec/exec/scan/new_olap_scanner.h" #include "vec/exec/scan/vscan_node.h" #include "vec/exprs/vcompound_pred.h" diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index af3b0fa4077..e5e44498ec0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -18,6 +18,7 @@ #pragma once #include "aggregation_sink_operator.h" #include "pipeline/exec/operator.h" +#include "vec/exprs/vectorized_agg_fn.h" #include "vec/exprs/vexpr.h" #include "vec/spill/spill_stream_manager.h" diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 39e4b8dac26..3f1ce8e7bbb 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -22,6 +22,7 @@ #include <cstdint> #include <memory> +#include "pipeline/common/runtime_filter_consumer.h" #include "pipeline/exec/es_scan_operator.h" #include "pipeline/exec/file_scan_operator.h" #include "pipeline/exec/group_commit_scan_operator.h" @@ -30,7 +31,6 @@ #include "pipeline/exec/olap_scan_operator.h" #include "pipeline/exec/operator.h" #include "util/runtime_profile.h" -#include "vec/exec/runtime_filter_consumer.h" #include "vec/exprs/vcast_expr.h" #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vectorized_fn_call.h" diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 120c39bfa43..84db26da051 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -23,10 +23,15 @@ #include <string> #include "common/status.h" +#include "exprs/function_filter.h" #include "operator.h" +#include "pipeline/common/runtime_filter_consumer.h" #include "pipeline/dependency.h" #include "runtime/descriptors.h" #include "vec/exec/scan/vscan_node.h" +#include "vec/exprs/vectorized_fn_call.h" +#include "vec/exprs/vin_predicate.h" +#include "vec/utils/util.hpp" namespace doris::vectorized { class ScannerDelegate; @@ -55,12 +60,12 @@ struct FilterPredicates { std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters; }; -class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer { +class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterConsumer { public: ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState<>(state, parent), - vectorized::RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(), - parent->row_descriptor(), _conjuncts) {} + RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(), + parent->row_descriptor(), _conjuncts) {} ~ScanLocalStateBase() override = default; virtual bool ready_to_read() = 0; diff --git a/be/src/pipeline/exec/set_source_operator.cpp b/be/src/pipeline/exec/set_source_operator.cpp index 0994350430b..c6a80f8d06c 100644 --- a/be/src/pipeline/exec/set_source_operator.cpp +++ b/be/src/pipeline/exec/set_source_operator.cpp @@ -144,7 +144,7 @@ Status SetSourceOperatorX<is_intersect>::_get_data_in_hashtable( template <bool is_intersect> void SetSourceOperatorX<is_intersect>::_add_result_columns( - SetSourceLocalState<is_intersect>& local_state, vectorized::RowRefListWithFlags& value, + SetSourceLocalState<is_intersect>& local_state, RowRefListWithFlags& value, int& block_size) { auto& build_col_idx = local_state._shared_state->build_col_idx; auto& build_block = local_state._shared_state->build_block; diff --git a/be/src/pipeline/exec/set_source_operator.h b/be/src/pipeline/exec/set_source_operator.h index 2bbc4257193..5157a2f9c97 100644 --- a/be/src/pipeline/exec/set_source_operator.h +++ b/be/src/pipeline/exec/set_source_operator.h @@ -81,7 +81,7 @@ private: const int batch_size, bool* eos); void _add_result_columns(SetSourceLocalState<is_intersect>& local_state, - vectorized::RowRefListWithFlags& value, int& block_size); + RowRefListWithFlags& value, int& block_size); const int _child_quantity; }; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index bea67b3050a..837a33dc437 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -24,6 +24,8 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "pipeline/exec/operator.h" +#include "vec/exprs/vectorized_agg_fn.h" +#include "vec/exprs/vslot_ref.h" namespace doris { class RuntimeState; @@ -76,7 +78,7 @@ static constexpr int STREAMING_HT_MIN_REDUCTION_SIZE = StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), _agg_arena_pool(std::make_unique<vectorized::Arena>()), - _agg_data(std::make_unique<vectorized::AggregatedDataVariants>()), + _agg_data(std::make_unique<AggregatedDataVariants>()), _agg_profile_arena(std::make_unique<vectorized::Arena>()), _child_block(vectorized::Block::create_unique()), _pre_aggregated_block(vectorized::Block::create_unique()) {} @@ -164,13 +166,11 @@ Status StreamingAggLocalState::open(RuntimeState* state) { using KeyType = typename HashTableType::Key; /// some aggregate functions (like AVG for decimal) have align issues. - _aggregate_data_container = - std::make_unique<vectorized::AggregateDataContainer>( - sizeof(KeyType), - ((p._total_size_of_aggregate_states + - p._align_aggregate_states - 1) / - p._align_aggregate_states) * - p._align_aggregate_states); + _aggregate_data_container = std::make_unique<AggregateDataContainer>( + sizeof(KeyType), ((p._total_size_of_aggregate_states + + p._align_aggregate_states - 1) / + p._align_aggregate_states) * + p._align_aggregate_states); }}, _agg_data->method_variant); if (p._is_merge) { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 227536170ea..bf856435982 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -108,12 +108,12 @@ private: bool _should_expand_hash_table = true; int64_t _cur_num_rows_returned = 0; std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr; - vectorized::AggregatedDataVariantsUPtr _agg_data = nullptr; + AggregatedDataVariantsUPtr _agg_data = nullptr; std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr; - std::unique_ptr<vectorized::AggregateDataContainer> _aggregate_data_container = nullptr; + std::unique_ptr<AggregateDataContainer> _aggregate_data_container = nullptr; bool _should_limit_output = false; bool _reach_limit = false; size_t _input_num_rows = 0; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index bb3ef4ff0f8..41ba68e8cbb 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -817,14 +817,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { return; } } - if (query_ctx->enable_pipeline_x_exec()) { - query_ctx->cancel_all_pipeline_context(reason); - } else { - for (auto it : all_instance_ids) { - cancel_instance(it, reason); - } - } - query_ctx->cancel(reason); { std::lock_guard<std::mutex> state_lock(_lock); @@ -862,7 +854,6 @@ void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reas void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; do { - std::vector<TUniqueId> to_cancel; std::vector<TUniqueId> queries_lost_coordinator; std::vector<TUniqueId> queries_timeout; @@ -937,17 +928,6 @@ void FragmentMgr::cancel_worker() { } } - // TODO(zhiqiang): It seems that timeout_canceled_fragment_count is - // designed to count canceled fragment of non-pipeline query. - timeout_canceled_fragment_count->increment(to_cancel.size()); - for (auto& id : to_cancel) { - cancel_instance(id, - Status::Error<ErrorCode::TIMEOUT>( - "FragmentMgr cancel worker going to cancel timeout instance ")); - LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance " - << print_id(id); - } - if (!queries_lost_coordinator.empty()) { LOG(INFO) << "There are " << queries_lost_coordinator.size() << " queries need to be cancelled, coordinator dead or restarted."; diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index a8efd4d9392..44bdaa5971a 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -213,6 +213,10 @@ void QueryContext::cancel(Status new_status, int fragment_id) { } set_ready_to_execute(new_status); + cancel_all_pipeline_context(new_status, fragment_id); +} + +void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) { std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; { std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); @@ -223,23 +227,6 @@ void QueryContext::cancel(Status new_status, int fragment_id) { ctx_to_cancel.push_back(f_context); } } - // Must not add lock here. There maybe dead lock because it will call fragment - // ctx cancel and fragment ctx will call query ctx cancel. - for (auto& f_context : ctx_to_cancel) { - if (auto pipeline_ctx = f_context.lock()) { - pipeline_ctx->cancel(new_status); - } - } -} - -void QueryContext::cancel_all_pipeline_context(const Status& reason) { - std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel; - { - std::lock_guard<std::mutex> lock(_pipeline_map_write_lock); - for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) { - ctx_to_cancel.push_back(f_context); - } - } for (auto& f_context : ctx_to_cancel) { if (auto pipeline_ctx = f_context.lock()) { pipeline_ctx->cancel(reason); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index dc7ea7e29bf..ee744a89466 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -99,7 +99,7 @@ public: [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); } - void cancel_all_pipeline_context(const Status& reason); + void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1); std::string print_all_pipeline_context(); Status cancel_pipeline_context(const int fragment_id, const Status& reason); void set_pipeline_context(const int fragment_id, diff --git a/be/src/vec/common/hash_table/hash_map_context.h b/be/src/vec/common/hash_table/hash_map_context.h index 8795d90553a..6ca0653f7a9 100644 --- a/be/src/vec/common/hash_table/hash_map_context.h +++ b/be/src/vec/common/hash_table/hash_map_context.h @@ -31,9 +31,12 @@ #include "vec/common/hash_table/string_hash_map.h" #include "vec/common/string_ref.h" #include "vec/core/types.h" -#include "vec/exec/join/join_op.h" #include "vec/utils/util.hpp" +namespace doris::pipeline { +struct RowRefListWithFlags; +} + namespace doris::vectorized { constexpr auto BITSIZE = 8; @@ -595,12 +598,13 @@ using FixedKeyHashTableContext = MethodKeysFixed<JoinHashMap<Key, HashCRC32<Key> template <class Key, bool has_null> using SetFixedKeyHashTableContext = - MethodKeysFixed<HashMap<Key, RowRefListWithFlags, HashCRC32<Key>>, has_null>; + MethodKeysFixed<HashMap<Key, pipeline::RowRefListWithFlags, HashCRC32<Key>>, has_null>; template <class T> using SetPrimaryTypeHashTableContext = - MethodOneNumber<T, HashMap<T, RowRefListWithFlags, HashCRC32<T>>>; + MethodOneNumber<T, HashMap<T, pipeline::RowRefListWithFlags, HashCRC32<T>>>; -using SetSerializedHashTableContext = MethodSerialized<HashMap<StringRef, RowRefListWithFlags>>; +using SetSerializedHashTableContext = + MethodSerialized<HashMap<StringRef, pipeline::RowRefListWithFlags>>; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h b/be/src/vec/common/hash_table/hash_table_set_build.h index 8101360a9f9..f9aeeeef14c 100644 --- a/be/src/vec/common/hash_table/hash_table_set_build.h +++ b/be/src/vec/common/hash_table/hash_table_set_build.h @@ -20,6 +20,7 @@ #include "vec/columns/column.h" namespace doris::vectorized { +constexpr size_t CHECK_FRECUENCY = 65536; template <class HashTableContext, bool is_intersect> struct HashTableBuild { template <typename Parent> diff --git a/be/src/vec/exec/join/vacquire_list.hpp b/be/src/vec/exec/join/vacquire_list.hpp deleted file mode 100644 index 6a3157cc384..00000000000 --- a/be/src/vec/exec/join/vacquire_list.hpp +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <memory> -#include <vector> - -namespace doris::vectorized { - -template <typename Element, int batch_size = 8> -struct AcquireList { - using Batch = Element[batch_size]; - - Element& acquire(Element&& element) { - if (_current_batch == nullptr) { - _current_batch.reset(new Element[batch_size]); - } - if (current_full()) { - _lst.emplace_back(std::move(_current_batch)); - _current_batch.reset(new Element[batch_size]); - _current_offset = 0; - } - - auto base_addr = _current_batch.get(); - base_addr[_current_offset] = std::move(element); - auto& ref = base_addr[_current_offset]; - _current_offset++; - return ref; - } - - void remove_last_element() { _current_offset--; } - -private: - bool current_full() { return _current_offset == batch_size; } - std::vector<std::unique_ptr<Element[]>> _lst; - std::unique_ptr<Element[]> _current_batch; - int _current_offset = 0; -}; -} // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h deleted file mode 100644 index 80d278a220f..00000000000 --- a/be/src/vec/exec/join/vhash_join_node.h +++ /dev/null @@ -1,180 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <gen_cpp/PlanNodes_types.h> -#include <stddef.h> -#include <stdint.h> - -#include <atomic> -#include <iosfwd> -#include <memory> -#include <string> -#include <unordered_map> -#include <variant> -#include <vector> - -#include "common/global_types.h" -#include "common/status.h" -#include "exprs/runtime_filter_slots.h" -#include "util/runtime_profile.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/column.h" -#include "vec/columns/columns_number.h" -#include "vec/common/arena.h" -#include "vec/common/columns_hashing.h" -#include "vec/common/hash_table/hash_map_context.h" -#include "vec/common/hash_table/hash_map_context_creator.h" -#include "vec/common/hash_table/partitioned_hash_map.h" -#include "vec/common/string_ref.h" -#include "vec/core/block.h" -#include "vec/core/types.h" -#include "vec/exec/join/join_op.h" // IWYU pragma: keep -#include "vec/exprs/vexpr_fwd.h" - -template <typename T> -struct HashCRC32; - -namespace doris { -class ObjectPool; -class DescriptorTbl; -class RuntimeState; - -namespace pipeline { -class HashJoinProbeLocalState; -class HashJoinBuildSinkLocalState; -} // namespace pipeline - -namespace vectorized { - -constexpr size_t CHECK_FRECUENCY = 65536; - -struct UInt128; -struct UInt256; -template <int JoinOpType> -struct ProcessHashTableProbe; - -template <typename Parent> -Status process_runtime_filter_build(RuntimeState* state, Block* block, Parent* parent, - bool is_global = false) { - if (parent->runtime_filters().empty()) { - return Status::OK(); - } - uint64_t rows = block->rows(); - { - SCOPED_TIMER(parent->_runtime_filter_init_timer); - RETURN_IF_ERROR(parent->_runtime_filter_slots->init_filters(state, rows)); - RETURN_IF_ERROR(parent->_runtime_filter_slots->ignore_filters(state)); - } - - if (!parent->_runtime_filter_slots->empty() && rows > 1) { - SCOPED_TIMER(parent->_runtime_filter_compute_timer); - parent->_runtime_filter_slots->insert(block); - } - { - SCOPED_TIMER(parent->_publish_runtime_filter_timer); - RETURN_IF_ERROR(parent->_runtime_filter_slots->publish()); - } - - return Status::OK(); -} - -using ProfileCounter = RuntimeProfile::Counter; - -template <class HashTableContext, typename Parent> -struct ProcessHashTableBuild { - ProcessHashTableBuild(int rows, ColumnRawPtrs& build_raw_ptrs, Parent* parent, int batch_size, - RuntimeState* state) - : _rows(rows), - _build_raw_ptrs(build_raw_ptrs), - _parent(parent), - _batch_size(batch_size), - _state(state) {} - - template <int JoinOpType, bool ignore_null, bool short_circuit_for_null, - bool with_other_conjuncts> - Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool* has_null_key) { - if (short_circuit_for_null || ignore_null) { - // first row is mocked and is null - for (uint32_t i = 1; i < _rows; i++) { - if ((*null_map)[i]) { - *has_null_key = true; - } - } - if (short_circuit_for_null && *has_null_key) { - return Status::OK(); - } - } - - SCOPED_TIMER(_parent->_build_table_insert_timer); - hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows, _batch_size, - *has_null_key); - - hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows, - null_map ? null_map->data() : nullptr, true, true, - hash_table_ctx.hash_table->get_bucket_size()); - hash_table_ctx.hash_table->template build<JoinOpType, with_other_conjuncts>( - hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows); - hash_table_ctx.bucket_nums.resize(_batch_size); - hash_table_ctx.bucket_nums.shrink_to_fit(); - - COUNTER_SET(_parent->_hash_table_memory_usage, - (int64_t)hash_table_ctx.hash_table->get_byte_size()); - COUNTER_SET(_parent->_build_arena_memory_usage, - (int64_t)hash_table_ctx.serialized_keys_size(true)); - return Status::OK(); - } - -private: - const uint32_t _rows; - ColumnRawPtrs& _build_raw_ptrs; - Parent* _parent = nullptr; - int _batch_size; - RuntimeState* _state = nullptr; -}; - -using I8HashTableContext = PrimaryTypeHashTableContext<UInt8>; -using I16HashTableContext = PrimaryTypeHashTableContext<UInt16>; -using I32HashTableContext = PrimaryTypeHashTableContext<UInt32>; -using I64HashTableContext = PrimaryTypeHashTableContext<UInt64>; -using I128HashTableContext = PrimaryTypeHashTableContext<UInt128>; -using I256HashTableContext = PrimaryTypeHashTableContext<UInt256>; - -template <bool has_null> -using I64FixedKeyHashTableContext = FixedKeyHashTableContext<UInt64, has_null>; - -template <bool has_null> -using I128FixedKeyHashTableContext = FixedKeyHashTableContext<UInt128, has_null>; - -template <bool has_null> -using I256FixedKeyHashTableContext = FixedKeyHashTableContext<UInt256, has_null>; - -template <bool has_null> -using I136FixedKeyHashTableContext = FixedKeyHashTableContext<UInt136, has_null>; - -using HashTableVariants = - std::variant<std::monostate, SerializedHashTableContext, I8HashTableContext, - I16HashTableContext, I32HashTableContext, I64HashTableContext, - I128HashTableContext, I256HashTableContext, I64FixedKeyHashTableContext<true>, - I64FixedKeyHashTableContext<false>, I128FixedKeyHashTableContext<true>, - I128FixedKeyHashTableContext<false>, I256FixedKeyHashTableContext<true>, - I256FixedKeyHashTableContext<false>, I136FixedKeyHashTableContext<true>, - I136FixedKeyHashTableContext<false>>; - -} // namespace vectorized -} // namespace doris diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index cd869cae5d2..70ac5970274 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -17,59 +17,7 @@ #pragma once -#include <fmt/format.h> -#include <gen_cpp/Exprs_types.h> -#include <gen_cpp/PlanNodes_types.h> -#include <parallel_hashmap/phmap.h> -#include <stdint.h> - -#include <functional> -#include <list> -#include <map> -#include <memory> -#include <string> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "common/global_types.h" -#include "common/object_pool.h" -#include "common/status.h" -#include "exec/olap_common.h" -#include "exprs/function_filter.h" -#include "runtime/define_primitive_type.h" -#include "runtime/query_context.h" -#include "runtime/runtime_state.h" -#include "util/runtime_profile.h" -#include "vec/exec/runtime_filter_consumer.h" -#include "vec/exec/scan/scanner_context.h" #include "vec/exec/scan/vscanner.h" -#include "vec/runtime/shared_scanner_controller.h" - -namespace doris { -class BitmapFilterFuncBase; -class BloomFilterFuncBase; -class DescriptorTbl; -class FunctionContext; -class HybridSetBase; -class IRuntimeFilter; -class SlotDescriptor; -class TScanRangeParams; -class TupleDescriptor; - -namespace vectorized { -class Block; -class VExpr; -class VExprContext; -class VInPredicate; -class VectorizedFnCall; -} // namespace vectorized -struct StringRef; -} // namespace doris - -namespace doris::pipeline { -class ScanOperator; -} namespace doris::vectorized { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h deleted file mode 100644 index fd369d80278..00000000000 --- a/be/src/vec/exec/vaggregation_node.h +++ /dev/null @@ -1,366 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include <assert.h> -#include <glog/logging.h> -#include <stddef.h> -#include <stdint.h> - -#include <algorithm> -#include <functional> -#include <memory> -#include <ostream> -#include <string> -#include <type_traits> -#include <utility> -#include <variant> -#include <vector> - -#include "common/compiler_util.h" // IWYU pragma: keep -#include "common/global_types.h" -#include "common/status.h" -#include "util/runtime_profile.h" -#include "vec/aggregate_functions/aggregate_function.h" -#include "vec/columns/column.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_string.h" -#include "vec/columns/columns_number.h" -#include "vec/common/allocator.h" -#include "vec/common/arena.h" -#include "vec/common/assert_cast.h" -#include "vec/common/columns_hashing.h" -#include "vec/common/hash_table/hash.h" -#include "vec/common/hash_table/hash_map_context.h" -#include "vec/common/hash_table/hash_map_context_creator.h" -#include "vec/common/hash_table/hash_map_util.h" -#include "vec/common/hash_table/partitioned_hash_map.h" -#include "vec/common/hash_table/ph_hash_map.h" -#include "vec/common/hash_table/string_hash_map.h" -#include "vec/common/pod_array.h" -#include "vec/common/string_ref.h" -#include "vec/common/uint128.h" -#include "vec/core/block.h" -#include "vec/core/block_spill_reader.h" -#include "vec/core/block_spill_writer.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/core/types.h" -#include "vec/exprs/vectorized_agg_fn.h" -#include "vec/exprs/vexpr.h" -#include "vec/exprs/vexpr_context.h" -#include "vec/exprs/vslot_ref.h" - -namespace doris { -class TPlanNode; -class DescriptorTbl; -class ObjectPool; -class RuntimeState; -class TupleDescriptor; - -namespace pipeline { -class AggSinkOperator; -class AggSourceOperator; -class StreamingAggSinkOperator; -class StreamingAggSourceOperator; -} // namespace pipeline - -namespace vectorized { - -using AggregatedDataWithoutKey = AggregateDataPtr; -using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr>; -using AggregatedDataWithShortStringKey = StringHashMap<AggregateDataPtr>; -using AggregatedDataWithUInt8Key = PHHashMap<UInt8, AggregateDataPtr>; -using AggregatedDataWithUInt16Key = PHHashMap<UInt16, AggregateDataPtr>; -using AggregatedDataWithUInt32Key = PHHashMap<UInt32, AggregateDataPtr, HashCRC32<UInt32>>; -using AggregatedDataWithUInt64Key = PHHashMap<UInt64, AggregateDataPtr, HashCRC32<UInt64>>; -using AggregatedDataWithUInt128Key = PHHashMap<UInt128, AggregateDataPtr, HashCRC32<UInt128>>; -using AggregatedDataWithUInt256Key = PHHashMap<UInt256, AggregateDataPtr, HashCRC32<UInt256>>; -using AggregatedDataWithUInt136Key = PHHashMap<UInt136, AggregateDataPtr, HashCRC32<UInt136>>; - -using AggregatedDataWithUInt32KeyPhase2 = - PHHashMap<UInt32, AggregateDataPtr, HashMixWrapper<UInt32>>; -using AggregatedDataWithUInt64KeyPhase2 = - PHHashMap<UInt64, AggregateDataPtr, HashMixWrapper<UInt64>>; -using AggregatedDataWithUInt128KeyPhase2 = - PHHashMap<UInt128, AggregateDataPtr, HashMixWrapper<UInt128>>; -using AggregatedDataWithUInt256KeyPhase2 = - PHHashMap<UInt256, AggregateDataPtr, HashMixWrapper<UInt256>>; - -using AggregatedDataWithUInt136KeyPhase2 = - PHHashMap<UInt136, AggregateDataPtr, HashMixWrapper<UInt136>>; - -using AggregatedDataWithNullableUInt8Key = DataWithNullKey<AggregatedDataWithUInt8Key>; -using AggregatedDataWithNullableUInt16Key = DataWithNullKey<AggregatedDataWithUInt16Key>; -using AggregatedDataWithNullableUInt32Key = DataWithNullKey<AggregatedDataWithUInt32Key>; -using AggregatedDataWithNullableUInt64Key = DataWithNullKey<AggregatedDataWithUInt64Key>; -using AggregatedDataWithNullableUInt32KeyPhase2 = - DataWithNullKey<AggregatedDataWithUInt32KeyPhase2>; -using AggregatedDataWithNullableUInt64KeyPhase2 = - DataWithNullKey<AggregatedDataWithUInt64KeyPhase2>; -using AggregatedDataWithNullableShortStringKey = DataWithNullKey<AggregatedDataWithShortStringKey>; -using AggregatedDataWithNullableUInt128Key = DataWithNullKey<AggregatedDataWithUInt128Key>; -using AggregatedDataWithNullableUInt128KeyPhase2 = - DataWithNullKey<AggregatedDataWithUInt128KeyPhase2>; - -using AggregatedMethodVariants = std::variant< - std::monostate, MethodSerialized<AggregatedDataWithStringKey>, - MethodOneNumber<UInt8, AggregatedDataWithUInt8Key>, - MethodOneNumber<UInt16, AggregatedDataWithUInt16Key>, - MethodOneNumber<UInt32, AggregatedDataWithUInt32Key>, - MethodOneNumber<UInt64, AggregatedDataWithUInt64Key>, - MethodStringNoCache<AggregatedDataWithShortStringKey>, - MethodOneNumber<UInt128, AggregatedDataWithUInt128Key>, - MethodOneNumber<UInt32, AggregatedDataWithUInt32KeyPhase2>, - MethodOneNumber<UInt64, AggregatedDataWithUInt64KeyPhase2>, - MethodOneNumber<UInt128, AggregatedDataWithUInt128KeyPhase2>, - MethodSingleNullableColumn<MethodOneNumber<UInt8, AggregatedDataWithNullableUInt8Key>>, - MethodSingleNullableColumn<MethodOneNumber<UInt16, AggregatedDataWithNullableUInt16Key>>, - MethodSingleNullableColumn<MethodOneNumber<UInt32, AggregatedDataWithNullableUInt32Key>>, - MethodSingleNullableColumn<MethodOneNumber<UInt64, AggregatedDataWithNullableUInt64Key>>, - MethodSingleNullableColumn< - MethodOneNumber<UInt32, AggregatedDataWithNullableUInt32KeyPhase2>>, - MethodSingleNullableColumn< - MethodOneNumber<UInt64, AggregatedDataWithNullableUInt64KeyPhase2>>, - MethodSingleNullableColumn<MethodOneNumber<UInt128, AggregatedDataWithNullableUInt128Key>>, - MethodSingleNullableColumn< - MethodOneNumber<UInt128, AggregatedDataWithNullableUInt128KeyPhase2>>, - MethodSingleNullableColumn<MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>, - MethodKeysFixed<AggregatedDataWithUInt64Key, false>, - MethodKeysFixed<AggregatedDataWithUInt64Key, true>, - MethodKeysFixed<AggregatedDataWithUInt128Key, false>, - MethodKeysFixed<AggregatedDataWithUInt128Key, true>, - MethodKeysFixed<AggregatedDataWithUInt256Key, false>, - MethodKeysFixed<AggregatedDataWithUInt256Key, true>, - MethodKeysFixed<AggregatedDataWithUInt136Key, false>, - MethodKeysFixed<AggregatedDataWithUInt136Key, true>, - MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, false>, - MethodKeysFixed<AggregatedDataWithUInt64KeyPhase2, true>, - MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, false>, - MethodKeysFixed<AggregatedDataWithUInt128KeyPhase2, true>, - MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, false>, - MethodKeysFixed<AggregatedDataWithUInt256KeyPhase2, true>, - MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, false>, - MethodKeysFixed<AggregatedDataWithUInt136KeyPhase2, true>>; - -struct AggregatedDataVariants - : public DataVariants<AggregatedMethodVariants, MethodSingleNullableColumn, MethodOneNumber, - MethodKeysFixed, DataWithNullKey> { - AggregatedDataWithoutKey without_key = nullptr; - - template <bool nullable> - void init(Type type) { - _type = type; - switch (_type) { - case Type::without_key: - break; - case Type::serialized: - method_variant.emplace<MethodSerialized<AggregatedDataWithStringKey>>(); - break; - case Type::int8_key: - emplace_single<UInt8, AggregatedDataWithUInt8Key, nullable>(); - break; - case Type::int16_key: - emplace_single<UInt16, AggregatedDataWithUInt16Key, nullable>(); - break; - case Type::int32_key: - emplace_single<UInt32, AggregatedDataWithUInt32Key, nullable>(); - break; - case Type::int32_key_phase2: - emplace_single<UInt32, AggregatedDataWithUInt32KeyPhase2, nullable>(); - break; - case Type::int64_key: - emplace_single<UInt64, AggregatedDataWithUInt64Key, nullable>(); - break; - case Type::int64_key_phase2: - emplace_single<UInt64, AggregatedDataWithUInt64KeyPhase2, nullable>(); - break; - case Type::int128_key: - emplace_single<UInt128, AggregatedDataWithUInt128Key, nullable>(); - break; - case Type::int128_key_phase2: - emplace_single<UInt128, AggregatedDataWithUInt128KeyPhase2, nullable>(); - break; - case Type::string_key: - if (nullable) { - method_variant.emplace<MethodSingleNullableColumn< - MethodStringNoCache<AggregatedDataWithNullableShortStringKey>>>(); - } else { - method_variant.emplace<MethodStringNoCache<AggregatedDataWithShortStringKey>>(); - } - break; - default: - throw Exception(ErrorCode::INTERNAL_ERROR, "meet invalid key type, type={}", type); - } - } - - void init(Type type, bool is_nullable = false) { - if (is_nullable) { - init<true>(type); - } else { - init<false>(type); - } - } -}; - -using AggregatedDataVariantsUPtr = std::unique_ptr<AggregatedDataVariants>; -using ArenaUPtr = std::unique_ptr<Arena>; - -struct AggregateDataContainer { -public: - AggregateDataContainer(size_t size_of_key, size_t size_of_aggregate_states) - : _size_of_key(size_of_key), _size_of_aggregate_states(size_of_aggregate_states) {} - - int64_t memory_usage() const { return _arena_pool.size(); } - - template <typename KeyType> - AggregateDataPtr append_data(const KeyType& key) { - DCHECK_EQ(sizeof(KeyType), _size_of_key); - // SUB_CONTAINER_CAPACITY should add a new sub container, and also expand when it is zero - if (UNLIKELY(_index_in_sub_container % SUB_CONTAINER_CAPACITY == 0)) { - _expand(); - } - - *reinterpret_cast<KeyType*>(_current_keys) = key; - auto aggregate_data = _current_agg_data; - ++_total_count; - ++_index_in_sub_container; - _current_agg_data += _size_of_aggregate_states; - _current_keys += _size_of_key; - return aggregate_data; - } - - template <typename Derived, bool IsConst> - class IteratorBase { - using Container = - std::conditional_t<IsConst, const AggregateDataContainer, AggregateDataContainer>; - - Container* container = nullptr; - uint32_t index; - uint32_t sub_container_index; - uint32_t index_in_sub_container; - - friend class HashTable; - - public: - IteratorBase() = default; - IteratorBase(Container* container_, uint32_t index_) - : container(container_), index(index_) { - sub_container_index = index / SUB_CONTAINER_CAPACITY; - index_in_sub_container = index - sub_container_index * SUB_CONTAINER_CAPACITY; - } - - bool operator==(const IteratorBase& rhs) const { return index == rhs.index; } - bool operator!=(const IteratorBase& rhs) const { return index != rhs.index; } - - Derived& operator++() { - index++; - index_in_sub_container++; - if (index_in_sub_container == SUB_CONTAINER_CAPACITY) { - index_in_sub_container = 0; - sub_container_index++; - } - return static_cast<Derived&>(*this); - } - - template <typename KeyType> - KeyType get_key() { - DCHECK_EQ(sizeof(KeyType), container->_size_of_key); - return ((KeyType*)(container->_key_containers[sub_container_index])) - [index_in_sub_container]; - } - - AggregateDataPtr get_aggregate_data() { - return &(container->_value_containers[sub_container_index] - [container->_size_of_aggregate_states * - index_in_sub_container]); - } - }; - - class Iterator : public IteratorBase<Iterator, false> { - public: - using IteratorBase<Iterator, false>::IteratorBase; - }; - - class ConstIterator : public IteratorBase<ConstIterator, true> { - public: - using IteratorBase<ConstIterator, true>::IteratorBase; - }; - - ConstIterator begin() const { return ConstIterator(this, 0); } - - ConstIterator cbegin() const { return begin(); } - - Iterator begin() { return Iterator(this, 0); } - - ConstIterator end() const { return ConstIterator(this, _total_count); } - ConstIterator cend() const { return end(); } - Iterator end() { return Iterator(this, _total_count); } - - void init_once() { - if (_inited) { - return; - } - _inited = true; - iterator = begin(); - } - Iterator iterator; - -private: - void _expand() { - _index_in_sub_container = 0; - _current_keys = nullptr; - _current_agg_data = nullptr; - try { - _current_keys = _arena_pool.alloc(_size_of_key * SUB_CONTAINER_CAPACITY); - _key_containers.emplace_back(_current_keys); - - _current_agg_data = (AggregateDataPtr)_arena_pool.alloc(_size_of_aggregate_states * - SUB_CONTAINER_CAPACITY); - _value_containers.emplace_back(_current_agg_data); - } catch (...) { - if (_current_keys) { - _key_containers.pop_back(); - _current_keys = nullptr; - } - if (_current_agg_data) { - _value_containers.pop_back(); - _current_agg_data = nullptr; - } - throw; - } - } - - static constexpr uint32_t SUB_CONTAINER_CAPACITY = 8192; - Arena _arena_pool; - std::vector<char*> _key_containers; - std::vector<AggregateDataPtr> _value_containers; - AggregateDataPtr _current_agg_data = nullptr; - char* _current_keys = nullptr; - size_t _size_of_key {}; - size_t _size_of_aggregate_states {}; - uint32_t _index_in_sub_container {}; - uint32_t _total_count {}; - bool _inited = false; -}; - -} // namespace vectorized - -constexpr auto init_agg_hash_method = - init_hash_method<vectorized::AggregatedDataVariants, vectorized::AggregateDataPtr>; - -} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org