This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 53ae24912f [vectorized](feature) support partition sort node (#19708) 53ae24912f is described below commit 53ae24912f18a73cea4d6d7fd2bef3586cb57d60 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Thu May 25 11:22:02 2023 +0800 [vectorized](feature) support partition sort node (#19708) --- be/src/exec/exec_node.cpp | 5 + .../pipeline/exec/partition_sort_sink_operator.h | 54 +++ .../pipeline/exec/partition_sort_source_operator.h | 56 +++ be/src/pipeline/pipeline_fragment_context.cpp | 16 + be/src/vec/CMakeLists.txt | 2 + be/src/vec/common/columns_hashing.h | 32 ++ be/src/vec/common/sort/partition_sorter.cpp | 203 +++++++++ be/src/vec/common/sort/partition_sorter.h | 108 +++++ be/src/vec/common/sort/sorter.h | 17 +- be/src/vec/exec/vpartition_sort_node.cpp | 454 +++++++++++++++++++++ be/src/vec/exec/vpartition_sort_node.h | 386 ++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 15 + 12 files changed, 1342 insertions(+), 6 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 62a63afd98..b74fbd23b1 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -62,6 +62,7 @@ #include "vec/exec/vempty_set_node.h" #include "vec/exec/vexchange_node.h" #include "vec/exec/vmysql_scan_node.h" // IWYU pragma: keep +#include "vec/exec/vpartition_sort_node.h" #include "vec/exec/vrepeat_node.h" #include "vec/exec/vschema_scan_node.h" #include "vec/exec/vselect_node.h" @@ -318,6 +319,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::FILE_SCAN_NODE: case TPlanNodeType::JDBC_SCAN_NODE: case TPlanNodeType::META_SCAN_NODE: + case TPlanNodeType::PARTITION_SORT_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -438,6 +440,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN *node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs)); return Status::OK(); + case TPlanNodeType::PARTITION_SORT_NODE: + *node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, descs)); + return Status::OK(); default: std::map<int, const char*>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h new file mode 100644 index 0000000000..ddcbebbb9d --- /dev/null +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include "operator.h" +#include "vec/exec/vpartition_sort_node.h" + +namespace doris { +class ExecNode; + +namespace pipeline { + +class PartitionSortSinkOperatorBuilder final + : public OperatorBuilder<vectorized::VPartitionSortNode> { +public: + PartitionSortSinkOperatorBuilder(int32_t id, ExecNode* sort_node) + : OperatorBuilder(id, "PartitionSortSinkOperator", sort_node) {} + + bool is_sink() const override { return true; } + + OperatorPtr build_operator() override; +}; + +class PartitionSortSinkOperator final : public StreamingOperator<PartitionSortSinkOperatorBuilder> { +public: + PartitionSortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node) + : StreamingOperator(operator_builder, sort_node) {}; + + bool can_write() override { return true; } +}; + +OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() { + return std::make_shared<PartitionSortSinkOperator>(this, _node); +} + +} // namespace pipeline +} // namespace doris diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h new file mode 100644 index 0000000000..bd55c42e4b --- /dev/null +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include "common/status.h" +#include "operator.h" +#include "vec/exec/vpartition_sort_node.h" + +namespace doris { +class ExecNode; +class RuntimeState; + +namespace pipeline { + +class PartitionSortSourceOperatorBuilder final + : public OperatorBuilder<vectorized::VPartitionSortNode> { +public: + PartitionSortSourceOperatorBuilder(int32_t id, ExecNode* sort_node) + : OperatorBuilder(id, "PartitionSortSourceOperator", sort_node) {} + + bool is_source() const override { return true; } + + OperatorPtr build_operator() override; +}; + +class PartitionSortSourceOperator final + : public SourceOperator<PartitionSortSourceOperatorBuilder> { +public: + PartitionSortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node) + : SourceOperator(operator_builder, sort_node) {} + Status open(RuntimeState*) override { return Status::OK(); } +}; + +OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() { + return std::make_shared<PartitionSortSourceOperator>(this, _node); +} + +} // namespace pipeline +} // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 923558b333..eea8d34b08 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -61,6 +61,8 @@ #include "pipeline/exec/nested_loop_join_probe_operator.h" #include "pipeline/exec/olap_table_sink_operator.h" #include "pipeline/exec/operator.h" +#include "pipeline/exec/partition_sort_sink_operator.h" +#include "pipeline/exec/partition_sort_source_operator.h" #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" @@ -532,6 +534,20 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur RETURN_IF_ERROR(cur_pipe->add_operator(sort_source)); break; } + case TPlanNodeType::PARTITION_SORT_NODE: { + auto new_pipeline = add_pipeline(); + RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); + + OperatorBuilderPtr partition_sort_sink = std::make_shared<PartitionSortSinkOperatorBuilder>( + next_operator_builder_id(), node); + RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink)); + + OperatorBuilderPtr partition_sort_source = + std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(), + node); + RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source)); + break; + } case TPlanNodeType::ANALYTIC_EVAL_NODE: { auto new_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline)); diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 2d5bc7f177..97e77cd4d0 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -71,6 +71,7 @@ set(VEC_FILES common/sort/sorter.cpp common/sort/topn_sorter.cpp common/sort/vsort_exec_exprs.cpp + common/sort/partition_sorter.cpp common/string_utils/string_utils.cpp common/hex.cpp common/allocator.cpp @@ -136,6 +137,7 @@ set(VEC_FILES exec/vrepeat_node.cpp exec/vtable_function_node.cpp exec/vjdbc_connector.cpp + exec/vpartition_sort_node.cpp exec/join/vhash_join_node.cpp exec/join/vjoin_node_base.cpp exec/join/vnested_loop_join_node.cpp diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index 06f0773fc8..64fe7d87b5 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -258,6 +258,38 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { return EmplaceResult(inserted); } + template <typename Data> + ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row, + Arena& pool) { + if (key_column->is_null_at(row)) { + bool has_null_key = data.has_null_key_data(); + data.has_null_key_data() = true; + + if constexpr (has_mapped) { + return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(), + !has_null_key); + } else { + return EmplaceResult(!has_null_key); + } + } + + auto key_holder = Base::get_key_holder(row, pool); + + bool inserted = false; + typename Data::LookupResult it; + data.emplace(key_holder, it, hash_value, inserted); + + if constexpr (has_mapped) { + auto& mapped = *lookup_result_get_mapped(it); + if (inserted) { + new (&mapped) Mapped(); + } + return EmplaceResult(mapped, mapped, inserted); + } else { + return EmplaceResult(inserted); + } + } + template <typename Data, typename Func, typename CreatorForNull> ALWAYS_INLINE typename std::enable_if_t<has_mapped, Mapped>& lazy_emplace_key( Data& data, size_t row, Arena& pool, Func&& f, CreatorForNull&& null_creator) { diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp new file mode 100644 index 0000000000..ca29d62eb4 --- /dev/null +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/common/sort/partition_sorter.h" + +#include <glog/logging.h> + +#include <algorithm> +#include <queue> + +#include "common/object_pool.h" +#include "vec/core/block.h" +#include "vec/core/sort_cursor.h" +#include "vec/functions/function_binary_arithmetic.h" +#include "vec/utils/util.hpp" + +namespace doris { +class RowDescriptor; +class RuntimeProfile; +class RuntimeState; + +namespace vectorized { +class VSortExecExprs; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { + +PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, + ObjectPool* pool, std::vector<bool>& is_asc_order, + std::vector<bool>& nulls_first, const RowDescriptor& row_desc, + RuntimeState* state, RuntimeProfile* profile, + bool has_global_limit, int partition_inner_limit, + TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row) + : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first), + _state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)), + _row_desc(row_desc), + _has_global_limit(has_global_limit), + _partition_inner_limit(partition_inner_limit), + _top_n_algorithm(top_n_algorithm), + _previous_row(previous_row) {} + +Status PartitionSorter::append_block(Block* input_block) { + Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); + DCHECK(input_block->columns() == sorted_block.columns()); + RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); + RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + return Status::OK(); +} + +Status PartitionSorter::prepare_for_read() { + auto& cursors = _state->get_cursors(); + auto& blocks = _state->get_sorted_block(); + auto& priority_queue = _state->get_priority_queue(); + for (const auto& block : blocks) { + cursors.emplace_back(block, _sort_description); + } + for (auto& cursor : cursors) { + priority_queue.push(MergeSortCursor(&cursor)); + } + return Status::OK(); +} + +Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { + if (_state->get_sorted_block().empty()) { + *eos = true; + } else { + if (_state->get_sorted_block().size() == 1 && _has_global_limit) { + auto& sorted_block = _state->get_sorted_block()[0]; + block->swap(sorted_block); + block->set_num_rows(_partition_inner_limit); + *eos = true; + } else { + RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); + } + } + return Status::OK(); +} + +Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { + const auto& sorted_block = _state->get_sorted_block()[0]; + size_t num_columns = sorted_block.columns(); + bool mem_reuse = output_block->mem_reuse(); + MutableColumns merged_columns = + mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns(); + + size_t current_output_rows = 0; + auto& priority_queue = _state->get_priority_queue(); + + bool get_enough_data = false; + bool first_compare_row = false; + while (!priority_queue.empty()) { + auto current = priority_queue.top(); + priority_queue.pop(); + if (UNLIKELY(_previous_row->impl == nullptr)) { + first_compare_row = true; + *_previous_row = current; + } + + switch (_top_n_algorithm) { + case TopNAlgorithm::ROW_NUMBER: { + //1 row_number no need to check distinct, just output partition_inner_limit row + if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { + for (size_t i = 0; i < num_columns; ++i) { + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + } + } else { + //rows has get enough + get_enough_data = true; + } + current_output_rows++; + break; + } + case TopNAlgorithm::DENSE_RANK: { + //3 dense_rank() maybe need distinct rows of partition_inner_limit + if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { + for (size_t i = 0; i < num_columns; ++i) { + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + } + } else { + get_enough_data = true; + } + if (_has_global_limit) { + current_output_rows++; + } else { + //when it's first comes, the rows are same no need compare + if (first_compare_row) { + current_output_rows++; + first_compare_row = false; + } else { + // not the first comes, so need compare those, when is distinct row + // so could current_output_rows++ + bool cmp_res = _previous_row->compare_two_rows(current); + if (cmp_res == false) { // distinct row + current_output_rows++; + *_previous_row = current; + } + } + } + break; + } + case TopNAlgorithm::RANK: { + if (_has_global_limit && + (current_output_rows + _output_total_rows) >= _partition_inner_limit) { + get_enough_data = true; + break; + } + bool cmp_res = _previous_row->compare_two_rows(current); + //get a distinct row + if (cmp_res == false) { + //here must be check distinct of two rows, and then check nums of row + if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) { + get_enough_data = true; + break; + } + *_previous_row = current; + } + for (size_t i = 0; i < num_columns; ++i) { + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + } + current_output_rows++; + break; + } + default: + break; + } + + if (!current->isLast()) { + current->next(); + priority_queue.push(current); + } + + if (current_output_rows == batch_size || get_enough_data == true) { + break; + } + } + + if (!mem_reuse) { + Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns)); + merge_block.swap(*output_block); + } + _output_total_rows += output_block->rows(); + if (current_output_rows == 0 || get_enough_data == true) { + *eos = true; + } + return Status::OK(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h new file mode 100644 index 0000000000..ff17ac2115 --- /dev/null +++ b/be/src/vec/common/sort/partition_sorter.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <gen_cpp/PlanNodes_types.h> +#include <stddef.h> +#include <stdint.h> + +#include <cstdint> +#include <memory> +#include <vector> + +#include "common/status.h" +#include "vec/common/sort/sorter.h" + +namespace doris { +class ObjectPool; +class RowDescriptor; +class RuntimeProfile; +class RuntimeState; + +namespace vectorized { +class Block; +class VSortExecExprs; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { + +struct SortCursorCmp { +public: + SortCursorCmp() { + impl = nullptr; + row = 0; + } + SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {} + + void reset() { + impl = nullptr; + row = 0; + } + bool compare_two_rows(const MergeSortCursor& rhs) const { + for (size_t i = 0; i < impl->sort_columns_size; ++i) { + int direction = impl->desc[i].direction; + int nulls_direction = impl->desc[i].nulls_direction; + int res = direction * impl->sort_columns[i]->compare_at(row, rhs.impl->pos, + *(rhs.impl->sort_columns[i]), + nulls_direction); + if (res != 0) { + return false; + } + } + return true; + } + int row = 0; + MergeSortCursorImpl* impl; +}; + +class PartitionSorter final : public Sorter { + ENABLE_FACTORY_CREATOR(PartitionSorter); + +public: + PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool, + std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first, + const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile, + bool has_global_limit, int partition_inner_limit, + TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row); + + ~PartitionSorter() override = default; + + Status append_block(Block* block) override; + + Status prepare_for_read() override; + + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + + size_t data_size() const override { return _state->data_size(); } + + bool is_spilled() const override { return false; } + + Status partition_sort_read(Block* block, bool* eos, int batch_size); + int64 get_output_rows() const { return _output_total_rows; } + +private: + std::unique_ptr<MergeSorterState> _state; + const RowDescriptor& _row_desc; + int64 _output_total_rows = 0; + bool _has_global_limit = false; + int _partition_inner_limit = 0; + TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER; + SortCursorCmp* _previous_row; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 48ea77852a..f5a12db3c0 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -60,13 +60,14 @@ public: limit_(limit), profile_(profile) { external_sort_bytes_threshold_ = state->external_sort_bytes_threshold(); + if (profile != nullptr) { + block_spill_profile_ = profile->create_child("BlockSpill", true, true); + profile->add_child(block_spill_profile_, false, nullptr); - block_spill_profile_ = profile->create_child("BlockSpill", true, true); - profile->add_child(block_spill_profile_, false, nullptr); - - spilled_block_count_ = ADD_COUNTER(block_spill_profile_, "BlockCount", TUnit::UNIT); - spilled_original_block_size_ = - ADD_COUNTER(block_spill_profile_, "BlockBytes", TUnit::BYTES); + spilled_block_count_ = ADD_COUNTER(block_spill_profile_, "BlockCount", TUnit::UNIT); + spilled_original_block_size_ = + ADD_COUNTER(block_spill_profile_, "BlockBytes", TUnit::BYTES); + } } ~MergeSorterState() = default; @@ -91,6 +92,10 @@ public: const Block& last_sorted_block() const { return sorted_blocks_.back(); } + std::vector<Block>& get_sorted_block() { return sorted_blocks_; } + std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; } + std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; } + std::unique_ptr<Block> unsorted_block_; private: diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp new file mode 100644 index 0000000000..8f3f50b9d4 --- /dev/null +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vpartition_sort_node.h" + +#include <glog/logging.h> + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <sstream> +#include <string> + +#include "common/logging.h" +#include "common/object_pool.h" +#include "runtime/runtime_state.h" +#include "vec/common/hash_table/hash_set.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::vectorized { +// Here is an empirical value. +static constexpr size_t HASH_MAP_PREFETCH_DIST = 16; +VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) { + _partitioned_data = std::make_unique<PartitionedHashMapVariants>(); + _agg_arena_pool = std::make_unique<Arena>(); + _previous_row = std::make_unique<SortCursorCmp>(); +} + +Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::init(tnode, state)); + + //order by key + if (tnode.partition_sort_node.__isset.sort_info) { + RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool)); + _is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order; + _nulls_first = tnode.partition_sort_node.sort_info.nulls_first; + } + //partition by key + if (tnode.partition_sort_node.__isset.partition_exprs) { + RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, tnode.partition_sort_node.partition_exprs, + &_partition_expr_ctxs)); + _partition_exprs_num = _partition_expr_ctxs.size(); + _partition_columns.resize(_partition_exprs_num); + } + if (_partition_exprs_num == 0) { + _value_places.push_back(_pool->add(new PartitionBlocks())); + } + + _has_global_limit = tnode.partition_sort_node.has_global_limit; + _top_n_algorithm = tnode.partition_sort_node.top_n_algorithm; + _partition_inner_limit = tnode.partition_sort_node.partition_inner_limit; + return Status::OK(); +} + +Status VPartitionSortNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VPartitionSortNode::prepare"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + _hash_table_size_counter = ADD_COUNTER(_runtime_profile, "HashTableSize", TUnit::UNIT); + _build_timer = ADD_TIMER(runtime_profile(), "HashTableBuildTime"); + _partition_sort_timer = ADD_TIMER(runtime_profile(), "PartitionSortTime"); + _get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime"); + _selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime"); + _emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime"); + + RETURN_IF_ERROR(ExecNode::prepare(state)); + RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); + RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc())); + _init_hash_method(); + + return Status::OK(); +} + +Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block, + int batch_size) { + for (int i = 0; i < _partition_exprs_num; ++i) { + int result_column_id = -1; + RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id)); + DCHECK(result_column_id != -1); + _partition_columns[i] = input_block->get_by_position(result_column_id).column.get(); + } + _emplace_into_hash_table(_partition_columns, input_block, batch_size); + return Status::OK(); +} + +void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, + int batch_size) { + std::visit( + [&](auto&& agg_method) -> void { + SCOPED_TIMER(_build_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using HashTableType = std::decay_t<decltype(agg_method.data)>; + using AggState = typename HashMethodType::State; + + AggState state(key_columns, _partition_key_sz, nullptr); + size_t num_rows = input_block->rows(); + _pre_serialize_key_if_need(state, agg_method, key_columns, num_rows); + + //PHHashMap + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (_hash_values.size() < num_rows) { + _hash_values.resize(num_rows); + } + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + AggState>::value) { + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = agg_method.data.hash(agg_method.keys[i]); + } + } else { + for (size_t i = 0; i < num_rows; ++i) { + _hash_values[i] = + agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool)); + } + } + } + + for (size_t row = 0; row < num_rows; ++row) { + SCOPED_TIMER(_emplace_key_timer); + PartitionDataPtr aggregate_data = nullptr; + auto emplace_result = [&]() { + if constexpr (HashTableTraits<HashTableType>::is_phmap) { + if (LIKELY(row + HASH_MAP_PREFETCH_DIST < num_rows)) { + agg_method.data.prefetch_by_hash( + _hash_values[row + HASH_MAP_PREFETCH_DIST]); + } + return state.emplace_key(agg_method.data, _hash_values[row], row, + *_agg_arena_pool); + } else { + return state.emplace_key(agg_method.data, row, *_agg_arena_pool); + } + }(); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.is_inserted()) { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.set_mapped(nullptr); + aggregate_data = _pool->add(new PartitionBlocks()); + emplace_result.set_mapped(aggregate_data); + _value_places.push_back(aggregate_data); + _num_partition++; + } else { + aggregate_data = emplace_result.get_mapped(); + } + assert(aggregate_data != nullptr); + aggregate_data->add_row_idx(row); + } + for (auto place : _value_places) { + SCOPED_TIMER(_selector_block_timer); + place->append_block_by_selector(input_block, child(0)->row_desc(), + _has_global_limit, _partition_inner_limit, + batch_size); + } + }, + _partitioned_data->_partition_method_variant); +} + +Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { + auto current_rows = input_block->rows(); + if (current_rows > 0) { + child_input_rows = child_input_rows + current_rows; + if (UNLIKELY(_partition_exprs_num == 0)) { + //no partition key + _value_places[0]->append_whole_block(input_block, child(0)->row_desc()); + } else { + //just simply use partition num to check + //TODO: here could set can read to true directly. need mutex + if (_num_partition > 512 && child_input_rows < 10000 * _num_partition) { + _blocks_buffer.push(std::move(*input_block)); + } else { + RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size())); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR( + state->check_query_state("VPartitionSortNode, while split input block.")); + input_block->clear_column_data(); + } + } + } + + if (eos) { + //seems could free for hashtable + _agg_arena_pool.reset(nullptr); + _partitioned_data.reset(nullptr); + SCOPED_TIMER(_partition_sort_timer); + for (int i = 0; i < _value_places.size(); ++i) { + auto sorter = PartitionSorter::create_unique( + _vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, + child(0)->row_desc(), state, i == 0 ? _runtime_profile.get() : nullptr, + _has_global_limit, _partition_inner_limit, _top_n_algorithm, + _previous_row.get()); + + DCHECK(child(0)->row_desc().num_materialized_slots() == + _value_places[i]->blocks.back()->columns()); + //get blocks from every partition, and sorter get those data. + for (const auto& block : _value_places[i]->blocks) { + RETURN_IF_ERROR(sorter->append_block(block.get())); + } + sorter->init_profile(_runtime_profile.get()); + RETURN_IF_ERROR(sorter->prepare_for_read()); + _partition_sorts.push_back(std::move(sorter)); + } + if (state->enable_profile()) { + debug_profile(); + } + COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition)); + _can_read = true; + } + return Status::OK(); +} + +Status VPartitionSortNode::open(RuntimeState* state) { + VLOG_CRITICAL << "VPartitionSortNode::open"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + RETURN_IF_ERROR(child(0)->open(state)); + + bool eos = false; + std::unique_ptr<Block> input_block = Block::create_unique(); + do { + RETURN_IF_ERROR(child(0)->get_next_after_projects( + state, input_block.get(), &eos, + std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) & + ExecNode::get_next, + _children[0], std::placeholders::_1, std::placeholders::_2, + std::placeholders::_3))); + RETURN_IF_ERROR(sink(state, input_block.get(), eos)); + } while (!eos); + + child(0)->close(state); + + return Status::OK(); +} + +Status VPartitionSortNode::alloc_resource(RuntimeState* state) { + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::alloc_resource(state)); + RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state)); + RETURN_IF_ERROR(_vsort_exec_exprs.open(state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(state->check_query_state("VPartitionSortNode, while open.")); + return Status::OK(); +} + +Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, + bool* eos) { + RETURN_IF_CANCELLED(state); + output_block->clear_column_data(); + bool current_eos = false; + RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos)); + if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) { + if (_blocks_buffer.empty() == false) { + _blocks_buffer.front().swap(*output_block); + _blocks_buffer.pop(); + } else { + *eos = true; + } + } + return Status::OK(); +} + +Status VPartitionSortNode::get_next(RuntimeState* state, Block* output_block, bool* eos) { + if (state == nullptr || output_block == nullptr || eos == nullptr) { + return Status::InternalError("input is nullptr"); + } + VLOG_CRITICAL << "VPartitionSortNode::get_next"; + SCOPED_TIMER(_runtime_profile->total_time_counter()); + + return pull(state, output_block, eos); +} + +Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* output_block, + bool* current_eos) { + SCOPED_TIMER(_get_sorted_timer); + //sorter output data one by one + if (_sort_idx < _partition_sorts.size()) { + RETURN_IF_ERROR(_partition_sorts[_sort_idx]->get_next(state, output_block, current_eos)); + } + if (*current_eos) { + //current sort have eos, so get next idx + _previous_row->reset(); + auto rows = _partition_sorts[_sort_idx]->get_output_rows(); + partition_profile_output_rows.push_back(rows); + _num_rows_returned += rows; + _partition_sorts[_sort_idx].reset(nullptr); + _sort_idx++; + } + + return Status::OK(); +} + +Status VPartitionSortNode::close(RuntimeState* state) { + VLOG_CRITICAL << "VPartitionSortNode::close"; + if (is_closed()) { + return Status::OK(); + } + return ExecNode::close(state); +} + +void VPartitionSortNode::release_resource(RuntimeState* state) { + VExpr::close(_partition_expr_ctxs, state); + _vsort_exec_exprs.close(state); + ExecNode::release_resource(state); +} + +void VPartitionSortNode::_init_hash_method() { + if (_partition_exprs_num == 0) { + return; + } else if (_partition_exprs_num == 1) { + auto is_nullable = _partition_expr_ctxs[0]->root()->is_nullable(); + switch (_partition_expr_ctxs[0]->root()->result_type()) { + case TYPE_TINYINT: + case TYPE_BOOLEAN: + _partitioned_data->init(PartitionedHashMapVariants::Type::int8_key, is_nullable); + return; + case TYPE_SMALLINT: + _partitioned_data->init(PartitionedHashMapVariants::Type::int16_key, is_nullable); + return; + case TYPE_INT: + case TYPE_FLOAT: + case TYPE_DATEV2: + _partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, is_nullable); + return; + case TYPE_BIGINT: + case TYPE_DOUBLE: + case TYPE_DATE: + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + _partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, is_nullable); + return; + case TYPE_LARGEINT: { + _partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, is_nullable); + return; + } + case TYPE_DECIMALV2: + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: { + DataTypePtr& type_ptr = _partition_expr_ctxs[0]->root()->data_type(); + TypeIndex idx = is_nullable ? assert_cast<const DataTypeNullable&>(*type_ptr) + .get_nested_type() + ->get_type_id() + : type_ptr->get_type_id(); + WhichDataType which(idx); + if (which.is_decimal32()) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, is_nullable); + } else if (which.is_decimal64()) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, is_nullable); + } else { + _partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, is_nullable); + } + return; + } + case TYPE_CHAR: + case TYPE_VARCHAR: + case TYPE_STRING: { + _partitioned_data->init(PartitionedHashMapVariants::Type::string_key, is_nullable); + break; + } + default: + _partitioned_data->init(PartitionedHashMapVariants::Type::serialized); + } + } else { + bool use_fixed_key = true; + bool has_null = false; + int key_byte_size = 0; + + _partition_key_sz.resize(_partition_exprs_num); + for (int i = 0; i < _partition_exprs_num; ++i) { + const auto& data_type = _partition_expr_ctxs[i]->root()->data_type(); + + if (!data_type->have_maximum_size_of_value()) { + use_fixed_key = false; + break; + } + + auto is_null = data_type->is_nullable(); + has_null |= is_null; + _partition_key_sz[i] = + data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0); + key_byte_size += _partition_key_sz[i]; + } + + if (std::tuple_size<KeysNullMap<UInt256>>::value + key_byte_size > sizeof(UInt256)) { + use_fixed_key = false; + } + + if (use_fixed_key) { + if (has_null) { + if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null); + } else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <= + sizeof(UInt128)) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys, + has_null); + } else { + _partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys, + has_null); + } + } else { + if (key_byte_size <= sizeof(UInt64)) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null); + } else if (key_byte_size <= sizeof(UInt128)) { + _partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys, + has_null); + } else { + _partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys, + has_null); + } + } + } else { + _partitioned_data->init(PartitionedHashMapVariants::Type::serialized); + } + } +} + +void VPartitionSortNode::debug_profile() { + fmt::memory_buffer partition_rows_read, partition_blocks_read; + fmt::format_to(partition_rows_read, "["); + fmt::format_to(partition_blocks_read, "["); + for (auto place : _value_places) { + fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows()); + fmt::format_to(partition_rows_read, "{}, ", place->blocks.size()); + } + fmt::format_to(partition_rows_read, "]"); + fmt::format_to(partition_blocks_read, "]"); + + runtime_profile()->add_info_string("PerPartitionBlocksRead", partition_blocks_read.data()); + runtime_profile()->add_info_string("PerPartitionRowsRead", partition_rows_read.data()); + fmt::memory_buffer partition_output_rows; + fmt::format_to(partition_output_rows, "["); + for (auto row : partition_profile_output_rows) { + fmt::format_to(partition_output_rows, "{}, ", row); + } + fmt::format_to(partition_output_rows, "]"); + runtime_profile()->add_info_string("PerPartitionOutputRows", partition_output_rows.data()); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h new file mode 100644 index 0000000000..4143b19dc9 --- /dev/null +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -0,0 +1,386 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <glog/logging.h> + +#include <cstdint> +#include <memory> + +#include "exec/exec_node.h" +#include "vec/columns/column.h" +#include "vec/common/columns_hashing.h" +#include "vec/common/hash_table/hash.h" +#include "vec/common/hash_table/ph_hash_map.h" +#include "vec/common/hash_table/string_hash_map.h" +#include "vec/common/sort/partition_sorter.h" +#include "vec/common/sort/vsort_exec_exprs.h" +#include "vec/core/block.h" + +namespace doris { +namespace vectorized { +static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20; + +struct PartitionBlocks { +public: + PartitionBlocks() = default; + ~PartitionBlocks() = default; + + void add_row_idx(size_t row) { selector.push_back(row); } + + void append_block_by_selector(const vectorized::Block* input_block, + const RowDescriptor& row_desc, bool is_limit, + int64_t partition_inner_limit, int batch_size) { + if (blocks.empty() || reach_limit()) { + init_rows = batch_size; + blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc))); + } + auto columns = input_block->get_columns(); + auto mutable_columns = blocks.back()->mutate_columns(); + DCHECK(columns.size() == mutable_columns.size()); + for (int i = 0; i < mutable_columns.size(); ++i) { + columns[i]->append_data_by_selector(mutable_columns[i], selector); + } + init_rows = init_rows - selector.size(); + total_rows = total_rows + selector.size(); + selector.clear(); + } + + void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) { + auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc)); + empty_block->swap(*input_block); + blocks.emplace_back(std::move(empty_block)); + } + + bool reach_limit() { + return init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES; + } + + size_t get_total_rows() const { return total_rows; } + + IColumn::Selector selector; + std::vector<std::unique_ptr<Block>> blocks; + size_t total_rows = 0; + int init_rows = 4096; +}; + +using PartitionDataPtr = PartitionBlocks*; +using PartitionDataWithStringKey = PHHashMap<StringRef, PartitionDataPtr, DefaultHash<StringRef>>; +using PartitionDataWithShortStringKey = StringHashMap<PartitionDataPtr>; +using PartitionDataWithUInt32Key = PHHashMap<UInt32, PartitionDataPtr, HashCRC32<UInt32>>; + +template <typename TData> +struct PartitionMethodSerialized { + using Data = TData; + using Key = typename Data::key_type; + using Mapped = typename Data::mapped_type; + using Iterator = typename Data::iterator; + + Data data; + Iterator iterator; + bool inited = false; + std::vector<StringRef> keys; + size_t keys_memory_usage = 0; + PartitionMethodSerialized() : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) {} + + using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, true>; + + template <typename Other> + explicit PartitionMethodSerialized(const Other& other) : data(other.data) {} + + size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) { + if (keys.size() < num_rows) { + keys.resize(num_rows); + } + + size_t max_one_row_byte_size = 0; + for (const auto& column : key_columns) { + max_one_row_byte_size += column->get_max_row_byte_size(); + } + size_t total_bytes = max_one_row_byte_size * num_rows; + + if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) { + // reach mem limit, don't serialize in batch + // for simplicity, we just create a new arena here. + _arena.reset(new Arena()); + size_t keys_size = key_columns.size(); + for (size_t i = 0; i < num_rows; ++i) { + keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); + } + keys_memory_usage = _arena->size(); + } else { + _arena.reset(); + if (total_bytes > _serialized_key_buffer_size) { + _serialized_key_buffer_size = total_bytes; + _serialize_key_arena.reset(new Arena()); + _serialized_key_buffer = reinterpret_cast<uint8_t*>( + _serialize_key_arena->alloc(_serialized_key_buffer_size)); + } + + for (size_t i = 0; i < num_rows; ++i) { + keys[i].data = + reinterpret_cast<char*>(_serialized_key_buffer + i * max_one_row_byte_size); + keys[i].size = 0; + } + + for (const auto& column : key_columns) { + column->serialize_vec(keys, num_rows, max_one_row_byte_size); + } + keys_memory_usage = _serialized_key_buffer_size; + } + return max_one_row_byte_size; + } + +private: + size_t _serialized_key_buffer_size; + uint8_t* _serialized_key_buffer; + std::unique_ptr<Arena> _serialize_key_arena; + std::unique_ptr<Arena> _arena; + static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 1024; // 16M +}; + +//for string +template <typename TData> +struct PartitionMethodStringNoCache { + using Data = TData; + using Key = typename Data::key_type; + using Mapped = typename Data::mapped_type; + using Iterator = typename Data::iterator; + + Data data; + Iterator iterator; + bool inited = false; + + PartitionMethodStringNoCache() = default; + + explicit PartitionMethodStringNoCache(size_t size_hint) : data(size_hint) {} + + template <typename Other> + explicit PartitionMethodStringNoCache(const Other& other) : data(other.data) {} + + using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false>; + + static const bool low_cardinality_optimization = false; +}; + +/// For the case where there is one numeric key. +/// FieldType is UInt8/16/32/64 for any type with corresponding bit width. +template <typename FieldType, typename TData, bool consecutive_keys_optimization = false> +struct PartitionMethodOneNumber { + using Data = TData; + using Key = typename Data::key_type; + using Mapped = typename Data::mapped_type; + using Iterator = typename Data::iterator; + + Data data; + Iterator iterator; + bool inited = false; + + PartitionMethodOneNumber() = default; + + template <typename Other> + PartitionMethodOneNumber(const Other& other) : data(other.data) {} + + /// To use one `Method` in different threads, use different `State`. + using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType, + consecutive_keys_optimization>; +}; + +template <typename Base> +struct PartitionDataWithNullKey : public Base { + using Base::Base; + + bool& has_null_key_data() { return has_null_key; } + PartitionDataPtr& get_null_key_data() { return null_key_data; } + bool has_null_key_data() const { return has_null_key; } + PartitionDataPtr get_null_key_data() const { return null_key_data; } + size_t size() const { return Base::size() + (has_null_key ? 1 : 0); } + bool empty() const { return Base::empty() && !has_null_key; } + + void clear() { + Base::clear(); + has_null_key = false; + } + + void clear_and_shrink() { + Base::clear_and_shrink(); + has_null_key = false; + } + +private: + bool has_null_key = false; + PartitionDataPtr null_key_data = nullptr; +}; + +template <typename SingleColumnMethod> +struct PartitionMethodSingleNullableColumn : public SingleColumnMethod { + using Base = SingleColumnMethod; + using BaseState = typename Base::State; + + using Data = typename Base::Data; + using Key = typename Base::Key; + using Mapped = typename Base::Mapped; + + using Base::data; + + PartitionMethodSingleNullableColumn() = default; + + template <typename Other> + explicit PartitionMethodSingleNullableColumn(const Other& other) : Base(other) {} + + using State = ColumnsHashing::HashMethodSingleLowNullableColumn<BaseState, Mapped, true>; +}; + +using PartitionedMethodVariants = + std::variant<PartitionMethodSerialized<PartitionDataWithStringKey>, + PartitionMethodOneNumber<UInt32, PartitionDataWithUInt32Key>, + PartitionMethodSingleNullableColumn<PartitionMethodOneNumber< + UInt32, PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>, + PartitionMethodStringNoCache<PartitionDataWithShortStringKey>, + PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache< + PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>; + +struct PartitionedHashMapVariants { + PartitionedHashMapVariants() = default; + PartitionedHashMapVariants(const PartitionedHashMapVariants&) = delete; + PartitionedHashMapVariants& operator=(const PartitionedHashMapVariants&) = delete; + PartitionedMethodVariants _partition_method_variant; + + enum class Type { + EMPTY = 0, + serialized, + int8_key, + int16_key, + int32_key, + int64_key, + int128_key, + int64_keys, + int128_keys, + int256_keys, + string_key, + }; + + Type _type = Type::EMPTY; + + void init(Type type, bool is_nullable = false) { + _type = type; + switch (_type) { + case Type::serialized: + _partition_method_variant + .emplace<PartitionMethodSerialized<PartitionDataWithStringKey>>(); + break; + case Type::int32_key: + if (is_nullable) { + _partition_method_variant + .emplace<PartitionMethodSingleNullableColumn<PartitionMethodOneNumber< + UInt32, PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>>(); + } else { + _partition_method_variant + .emplace<PartitionMethodOneNumber<UInt32, PartitionDataWithUInt32Key>>(); + } + break; + case Type::string_key: + if (is_nullable) { + _partition_method_variant + .emplace<PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache< + PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>(); + } else { + _partition_method_variant + .emplace<PartitionMethodStringNoCache<PartitionDataWithShortStringKey>>(); + } + break; + default: + DCHECK(false) << "Do not have a rigth partition by data type"; + } + } +}; + +class VExprContext; + +class VPartitionSortNode : public ExecNode { +public: + VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~VPartitionSortNode() override = default; + + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status alloc_resource(RuntimeState* state) override; + Status open(RuntimeState* state) override; + void release_resource(RuntimeState* state) override; + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + Status close(RuntimeState* state) override; + + Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override; + Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; + + void debug_profile(); + +private: + template <typename AggState, typename AggMethod> + void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method, + const ColumnRawPtrs& key_columns, const size_t num_rows) { + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) { + (agg_method.serialize_keys(key_columns, num_rows)); + state.set_serialized_keys(agg_method.keys.data()); + } + } + + void _init_hash_method(); + Status _split_block_by_partition(vectorized::Block* input_block, int batch_size); + void _emplace_into_hash_table(const ColumnRawPtrs& key_columns, + const vectorized::Block* input_block, int batch_size); + Status get_sorted_block(RuntimeState* state, Block* output_block, bool* eos); + + // hash table + std::unique_ptr<PartitionedHashMapVariants> _partitioned_data; + std::unique_ptr<Arena> _agg_arena_pool; + // partition by k1,k2 + int _partition_exprs_num = 0; + std::vector<VExprContext*> _partition_expr_ctxs; + std::vector<const IColumn*> _partition_columns; + std::vector<size_t> _partition_key_sz; + std::vector<size_t> _hash_values; + + std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts; + std::vector<PartitionDataPtr> _value_places; + // Expressions and parameters used for build _sort_description + VSortExecExprs _vsort_exec_exprs; + std::vector<bool> _is_asc_order; + std::vector<bool> _nulls_first; + TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER; + bool _has_global_limit = false; + int _num_partition = 0; + int64_t _partition_inner_limit = 0; + int _sort_idx = 0; + std::unique_ptr<SortCursorCmp> _previous_row = nullptr; + std::queue<Block> _blocks_buffer; + int64_t child_input_rows = 0; + + RuntimeProfile::Counter* _build_timer; + RuntimeProfile::Counter* _emplace_key_timer; + RuntimeProfile::Counter* _partition_sort_timer; + RuntimeProfile::Counter* _get_sorted_timer; + RuntimeProfile::Counter* _selector_block_timer; + + RuntimeProfile::Counter* _hash_table_size_counter; + //only for profile record + std::vector<int> partition_profile_output_rows; +}; + +} // namespace vectorized +} // namespace doris diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 8afc3fde4c..ed54a8c3d2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -57,6 +57,7 @@ enum TPlanNodeType { FILE_SCAN_NODE, JDBC_SCAN_NODE, TEST_EXTERNAL_SCAN_NODE, + PARTITION_SORT_NODE, } // phases of an execution node @@ -765,6 +766,19 @@ struct TSortNode { 7: optional bool use_topn_opt } +enum TopNAlgorithm { + RANK, + DENSE_RANK, + ROW_NUMBER + } + + struct TPartitionSortNode { + 1: optional list<Exprs.TExpr> partition_exprs + 2: optional TSortInfo sort_info + 3: optional bool has_global_limit + 4: optional TopNAlgorithm top_n_algorithm + 5: optional i64 partition_inner_limit + } enum TAnalyticWindowType { // Specifies the window as a logical offset RANGE, @@ -1072,6 +1086,7 @@ struct TPlanNode { 101: optional list<Exprs.TExpr> projections 102: optional Types.TTupleId output_tuple_id + 103: optional TPartitionSortNode partition_sort_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org