This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 7bb3792d51 [chore](build) Split the compliation units to build them in parallel (#14232) 7bb3792d51 is described below commit 7bb3792d519cd6432ae5d6bc6e9511f7dc55ed7e Author: Adonis Ling <adonis0...@gmail.com> AuthorDate: Mon Nov 14 10:57:10 2022 +0800 [chore](build) Split the compliation units to build them in parallel (#14232) --- be/src/vec/CMakeLists.txt | 11 + be/src/vec/exec/join/cross_join_impl.cpp | 25 + be/src/vec/exec/join/full_outer_join_impl.cpp | 25 + be/src/vec/exec/join/inner_join_impl.cpp | 25 + be/src/vec/exec/join/left_anti_join_impl.cpp | 25 + be/src/vec/exec/join/left_outer_join_impl.cpp | 25 + be/src/vec/exec/join/left_semi_join_impl.cpp | 25 + .../exec/join/null_aware_left_anti_join_impl.cpp | 25 + be/src/vec/exec/join/process_hash_table_probe.h | 96 +++ .../vec/exec/join/process_hash_table_probe_impl.h | 785 +++++++++++++++++++++ be/src/vec/exec/join/right_anti_join_impl.cpp | 25 + be/src/vec/exec/join/right_outer_join_impl.cpp | 25 + be/src/vec/exec/join/right_semi_join_impl.cpp | 25 + be/src/vec/exec/join/vhash_join_node.cpp | 670 +----------------- be/src/vec/exec/join/vhash_join_node.h | 62 +- .../function_date_or_datetime_computation.cpp | 173 +---- ...> function_date_or_datetime_computation_v2.cpp} | 170 +---- be/src/vec/functions/simple_function_factory.h | 2 + 18 files changed, 1153 insertions(+), 1066 deletions(-) diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index b40f7c1955..b97378e32c 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -112,6 +112,16 @@ set(VEC_FILES exec/join/vhash_join_node.cpp exec/join/vjoin_node_base.cpp exec/join/vnested_loop_join_node.cpp + exec/join/inner_join_impl.cpp + exec/join/left_semi_join_impl.cpp + exec/join/left_anti_join_impl.cpp + exec/join/left_outer_join_impl.cpp + exec/join/full_outer_join_impl.cpp + exec/join/right_outer_join_impl.cpp + exec/join/cross_join_impl.cpp + exec/join/right_semi_join_impl.cpp + exec/join/right_anti_join_impl.cpp + exec/join/null_aware_left_anti_join_impl.cpp exec/data_gen_functions/vnumbers_tvf.cpp exec/vdata_gen_scan_node.cpp exprs/vectorized_agg_fn.cpp @@ -200,6 +210,7 @@ set(VEC_FILES functions/uuid.cpp functions/function_coalesce.cpp functions/function_date_or_datetime_computation.cpp + functions/function_date_or_datetime_computation_v2.cpp functions/function_date_or_datetime_to_string.cpp functions/function_datetime_string_to_string.cpp functions/function_grouping.cpp diff --git a/be/src/vec/exec/join/cross_join_impl.cpp b/be/src/vec/exec/join/cross_join_impl.cpp new file mode 100644 index 0000000000..ce79087a7a --- /dev/null +++ b/be/src/vec/exec/join/cross_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::CROSS_JOIN); + +} diff --git a/be/src/vec/exec/join/full_outer_join_impl.cpp b/be/src/vec/exec/join/full_outer_join_impl.cpp new file mode 100644 index 0000000000..11486c3d0a --- /dev/null +++ b/be/src/vec/exec/join/full_outer_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::FULL_OUTER_JOIN); + +} diff --git a/be/src/vec/exec/join/inner_join_impl.cpp b/be/src/vec/exec/join/inner_join_impl.cpp new file mode 100644 index 0000000000..cde5f141a6 --- /dev/null +++ b/be/src/vec/exec/join/inner_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::INNER_JOIN); + +} diff --git a/be/src/vec/exec/join/left_anti_join_impl.cpp b/be/src/vec/exec/join/left_anti_join_impl.cpp new file mode 100644 index 0000000000..762117e7c3 --- /dev/null +++ b/be/src/vec/exec/join/left_anti_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::LEFT_ANTI_JOIN); + +} diff --git a/be/src/vec/exec/join/left_outer_join_impl.cpp b/be/src/vec/exec/join/left_outer_join_impl.cpp new file mode 100644 index 0000000000..c663f64a94 --- /dev/null +++ b/be/src/vec/exec/join/left_outer_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::LEFT_OUTER_JOIN); + +} diff --git a/be/src/vec/exec/join/left_semi_join_impl.cpp b/be/src/vec/exec/join/left_semi_join_impl.cpp new file mode 100644 index 0000000000..f62e6d5cd0 --- /dev/null +++ b/be/src/vec/exec/join/left_semi_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::LEFT_SEMI_JOIN); + +} diff --git a/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp b/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp new file mode 100644 index 0000000000..7432e9ef1f --- /dev/null +++ b/be/src/vec/exec/join/null_aware_left_anti_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); + +} diff --git a/be/src/vec/exec/join/process_hash_table_probe.h b/be/src/vec/exec/join/process_hash_table_probe.h new file mode 100644 index 0000000000..5e715415a8 --- /dev/null +++ b/be/src/vec/exec/join/process_hash_table_probe.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "vec/columns/column.h" +#include "vec/columns/columns_number.h" +#include "vec/common/arena.h" + +namespace doris { +namespace vectorized { + +class Block; +class MutableBlock; +class HashJoinNode; + +using MutableColumnPtr = IColumn::MutablePtr; +using MutableColumns = std::vector<MutableColumnPtr>; + +using NullMap = ColumnUInt8::Container; +using ConstNullMapPtr = const NullMap*; + +template <int JoinOpType> +struct ProcessHashTableProbe { + ProcessHashTableProbe(HashJoinNode* join_node, int batch_size); + + // output build side result column + template <bool have_other_join_conjunct = false> + void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, + const std::vector<bool>& output_slot_flags, int size); + + void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags, + int size, int last_probe_index, size_t probe_size, + bool all_match_one, bool have_other_join_conjunct); + // Only process the join with no other join conjunct, because of no other join conjunt + // the output block struct is same with mutable block. we can do more opt on it and simplify + // the logic of probe + // TODO: opt the visited here to reduce the size of hash table + template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> + Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, size_t probe_rows); + // In the presence of other join conjunct, the process of join become more complicated. + // each matching join column need to be processed by other join conjunct. so the struct of mutable block + // and output block may be different + // The output result is determined by the other join conjunct result and same_to_prev struct + template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> + Status do_process_with_other_join_conjuncts(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, Block* output_block, + size_t probe_rows); + + // Process full outer join/ right join / right semi/anti join to output the join result + // in hash table + template <typename HashTableType> + Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block, + Block* output_block, bool* eos); + + vectorized::HashJoinNode* _join_node; + const int _batch_size; + const std::vector<Block>& _build_blocks; + Arena _arena; + + std::vector<uint32_t> _items_counts; + std::vector<int8_t> _build_block_offsets; + std::vector<int> _build_block_rows; + // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN + ColumnUInt8::Container* _tuple_is_null_left_flags; + // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN + ColumnUInt8::Container* _tuple_is_null_right_flags; + + RuntimeProfile::Counter* _rows_returned_counter; + RuntimeProfile::Counter* _search_hashtable_timer; + RuntimeProfile::Counter* _build_side_output_timer; + RuntimeProfile::Counter* _probe_side_output_timer; + + static constexpr int PROBE_SIDE_EXPLODE_RATE = 3; +}; + +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h new file mode 100644 index 0000000000..7da2100784 --- /dev/null +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -0,0 +1,785 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "process_hash_table_probe.h" +#include "vhash_join_node.h" + +namespace doris::vectorized { + +static constexpr int PREFETCH_STEP = HashJoinNode::PREFETCH_STEP; + +template <int JoinOpType> +ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size) + : _join_node(join_node), + _batch_size(batch_size), + _build_blocks(join_node->_build_blocks), + _tuple_is_null_left_flags(join_node->_is_outer_join + ? &(reinterpret_cast<ColumnUInt8&>( + *join_node->_tuple_is_null_left_flag_column) + .get_data()) + : nullptr), + _tuple_is_null_right_flags( + join_node->_is_outer_join + ? &(reinterpret_cast<ColumnUInt8&>( + *join_node->_tuple_is_null_right_flag_column) + .get_data()) + : nullptr), + _rows_returned_counter(join_node->_rows_returned_counter), + _search_hashtable_timer(join_node->_search_hashtable_timer), + _build_side_output_timer(join_node->_build_side_output_timer), + _probe_side_output_timer(join_node->_probe_side_output_timer) {} + +template <int JoinOpType> +template <bool have_other_join_conjunct> +void ProcessHashTableProbe<JoinOpType>::build_side_output_column( + MutableColumns& mcol, int column_offset, int column_length, + const std::vector<bool>& output_slot_flags, int size) { + constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || + JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::LEFT_SEMI_JOIN; + + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; + + if constexpr (!is_semi_anti_join || have_other_join_conjunct) { + if (_build_blocks.size() == 1) { + for (int i = 0; i < column_length; i++) { + auto& column = *_build_blocks[0].get_by_position(i).column; + if (output_slot_flags[i]) { + mcol[i + column_offset]->insert_indices_from(column, _build_block_rows.data(), + _build_block_rows.data() + size); + } else { + mcol[i + column_offset]->insert_many_defaults(size); + } + } + } else { + for (int i = 0; i < column_length; i++) { + if (output_slot_flags[i]) { + for (int j = 0; j < size; j++) { + if constexpr (probe_all) { + if (_build_block_offsets[j] == -1) { + DCHECK(mcol[i + column_offset]->is_nullable()); + assert_cast<ColumnNullable*>(mcol[i + column_offset].get()) + ->insert_default(); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + } + } else { + if (_build_block_offsets[j] == -1) { + // the only case to reach here: + // 1. left anti join with other conjuncts, and + // 2. equal conjuncts does not match + // since nullptr is emplaced back to visited_map, + // the output value of the build side does not matter, + // just insert default value + mcol[i + column_offset]->insert_default(); + } else { + auto& column = *_build_blocks[_build_block_offsets[j]] + .get_by_position(i) + .column; + mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); + } + } + } + } else { + mcol[i + column_offset]->insert_many_defaults(size); + } + } + } + } + + // Dispose right tuple is null flags columns + if constexpr (probe_all && !have_other_join_conjunct) { + _tuple_is_null_right_flags->resize(size); + auto* __restrict null_data = _tuple_is_null_right_flags->data(); + for (int i = 0; i < size; ++i) { + null_data[i] = _build_block_rows[i] == -1; + } + } +} + +template <int JoinOpType> +void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( + MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, + int last_probe_index, size_t probe_size, bool all_match_one, + bool have_other_join_conjunct) { + auto& probe_block = _join_node->_probe_block; + for (int i = 0; i < output_slot_flags.size(); ++i) { + if (output_slot_flags[i]) { + auto& column = probe_block.get_by_position(i).column; + if (all_match_one) { + DCHECK_EQ(probe_size, column->size() - last_probe_index); + mcol[i]->insert_range_from(*column, last_probe_index, probe_size); + } else { + DCHECK_GE(_items_counts.size(), last_probe_index + probe_size); + column->replicate(&_items_counts[0], size, *mcol[i], last_probe_index, probe_size); + } + } else { + mcol[i]->insert_many_defaults(size); + } + } + + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { + if (!have_other_join_conjunct) { + _tuple_is_null_left_flags->resize_fill(size, 0); + } + } +} + +template <int JoinOpType> +template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> +Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_ctx, + ConstNullMapPtr null_map, + MutableBlock& mutable_block, + Block* output_block, size_t probe_rows) { + auto& probe_index = _join_node->_probe_index; + auto& probe_raw_ptrs = _join_node->_probe_columns; + if (probe_index == 0 && _items_counts.size() < probe_rows) { + _items_counts.resize(probe_rows); + } + + if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { + _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + } + using KeyGetter = typename HashTableType::State; + using Mapped = typename HashTableType::Mapped; + + int right_col_idx = + _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); + auto& mcol = mutable_block.mutable_columns(); + int current_offset = 0; + + constexpr auto is_right_semi_anti_join = + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; + + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; + + bool all_match_one = true; + int last_probe_index = probe_index; + { + SCOPED_TIMER(_search_hashtable_timer); + while (probe_index < probe_rows) { + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + _items_counts[probe_index++] = (uint32_t)1; + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } else { + _items_counts[probe_index++] = (uint32_t)0; + } + all_match_one = false; + continue; + } + } + int last_offset = current_offset; + auto find_result = + !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, + probe_index, _arena) + : (*null_map)[probe_index] + ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, + probe_index, _arena)) {nullptr, false} + : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, + _arena); + if (probe_index + PREFETCH_STEP < probe_rows) + key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr, + probe_index + PREFETCH_STEP, _arena); + + if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + if (!find_result.is_found()) { + ++current_offset; + } + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + if (find_result.is_found()) { + ++current_offset; + } + } else { + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + mapped.visited = true; + } + + if constexpr (!is_right_semi_anti_join) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; + } else { + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); + } + ++current_offset; + } + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (!is_right_semi_anti_join) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + } + } + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + mapped.visited = true; + } + } + } else { + if constexpr (probe_all) { + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } + } + } + + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[probe_index++] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size && !all_match_one) { + break; + } + } + } + + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); + } + + if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && + JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, + last_probe_index, probe_index - last_probe_index, all_match_one, + false); + } + + output_block->swap(mutable_block.to_block()); + + return Status::OK(); +} + +template <int JoinOpType> +template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> +Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( + HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, + Block* output_block, size_t probe_rows) { + auto& probe_index = _join_node->_probe_index; + auto& probe_raw_ptrs = _join_node->_probe_columns; + if (probe_index == 0 && _items_counts.size() < probe_rows) { + _items_counts.resize(probe_rows); + } + if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { + _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); + } + + using KeyGetter = typename HashTableType::State; + using Mapped = typename HashTableType::Mapped; + if constexpr (std::is_same_v<Mapped, RowRefListWithFlags>) { + constexpr auto probe_all = + JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; + KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); + + int right_col_idx = _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + auto& mcol = mutable_block.mutable_columns(); + // use in right join to change visited state after + // exec the vother join conjunct + std::vector<bool*> visited_map; + visited_map.reserve(1.2 * _batch_size); + + std::vector<bool> same_to_prev; + same_to_prev.reserve(1.2 * _batch_size); + + int current_offset = 0; + + bool all_match_one = true; + int last_probe_index = probe_index; + while (probe_index < probe_rows) { + // ignore null rows + if constexpr (ignore_null && need_null_map_for_probe) { + if ((*null_map)[probe_index]) { + if constexpr (probe_all) { + _items_counts[probe_index++] = (uint32_t)1; + same_to_prev.emplace_back(false); + visited_map.emplace_back(nullptr); + // only full outer / left outer need insert the data of right table + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } else { + _items_counts[probe_index++] = (uint32_t)0; + } + all_match_one = false; + continue; + } + } + + auto last_offset = current_offset; + auto find_result = + !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, + probe_index, _arena) + : (*null_map)[probe_index] + ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, + probe_index, _arena)) {nullptr, false} + : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, + _arena); + if (probe_index + PREFETCH_STEP < probe_rows) + key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr, + probe_index + PREFETCH_STEP, _arena); + if (find_result.is_found()) { + auto& mapped = find_result.get_mapped(); + auto origin_offset = current_offset; + // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. + // We should rethink whether to use this iterator mode in the future. Now just opt the one row case + if (mapped.get_row_count() == 1) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = mapped.block_offset; + _build_block_rows[current_offset] = mapped.row_num; + } else { + _build_block_offsets.emplace_back(mapped.block_offset); + _build_block_rows.emplace_back(mapped.row_num); + } + ++current_offset; + visited_map.emplace_back(&mapped.visited); + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = it->block_offset; + _build_block_rows[current_offset] = it->row_num; + } else { + _build_block_offsets.emplace_back(it->block_offset); + _build_block_rows.emplace_back(it->row_num); + } + ++current_offset; + visited_map.emplace_back(&it->visited); + } + } + same_to_prev.emplace_back(false); + for (int i = 0; i < current_offset - origin_offset - 1; ++i) { + same_to_prev.emplace_back(true); + } + } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + same_to_prev.emplace_back(false); + visited_map.emplace_back(nullptr); + // only full outer / left outer need insert the data of right table + // left anti use -1 use a default value + if (LIKELY(current_offset < _build_block_rows.size())) { + _build_block_offsets[current_offset] = -1; + _build_block_rows[current_offset] = -1; + } else { + _build_block_offsets.emplace_back(-1); + _build_block_rows.emplace_back(-1); + } + ++current_offset; + } else { + // other join, no nothing + } + uint32_t count = (uint32_t)(current_offset - last_offset); + _items_counts[probe_index++] = count; + all_match_one &= (count == 1); + if (current_offset >= _batch_size && !all_match_one) { + break; + } + } + + { + SCOPED_TIMER(_build_side_output_timer); + build_side_output_column<true>(mcol, right_col_idx, right_col_len, + _join_node->_right_output_slot_flags, current_offset); + } + { + SCOPED_TIMER(_probe_side_output_timer); + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, + last_probe_index, probe_index - last_probe_index, + all_match_one, true); + } + output_block->swap(mutable_block.to_block()); + + // dispose the other join conjunct exec + if (output_block->rows()) { + int result_column_id = -1; + int orig_columns = output_block->columns(); + (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); + + auto column = output_block->get_by_position(result_column_id).column; + if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + auto new_filter_column = ColumnVector<UInt8>::create(); + auto& filter_map = new_filter_column->get_data(); + + auto null_map_column = ColumnVector<UInt8>::create(column->size(), 0); + auto* __restrict null_map_data = null_map_column->get_data().data(); + + for (int i = 0; i < column->size(); ++i) { + auto join_hit = visited_map[i] != nullptr; + auto other_hit = column->get_bool(i); + + if (!other_hit) { + for (size_t j = 0; j < right_col_len; ++j) { + typeid_cast<ColumnNullable*>( + std::move(*output_block->get_by_position(j + right_col_idx) + .column) + .assume_mutable() + .get()) + ->get_null_map_data()[i] = true; + } + } + null_map_data[i] = !join_hit || !other_hit; + + if (join_hit) { + *visited_map[i] |= other_hit; + filter_map.push_back(other_hit || !same_to_prev[i] || + (!column->get_bool(i - 1) && filter_map.back())); + // Here to keep only hit join conjunct and other join conjunt is true need to be output. + // if not, only some key must keep one row will output will null right table column + if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) { + filter_map[i - 1] = false; + } + } else { + filter_map.push_back(true); + } + } + + for (int i = 0; i < column->size(); ++i) { + if (filter_map[i]) { + _tuple_is_null_right_flags->emplace_back(null_map_data[i]); + } + } + output_block->get_by_position(result_column_id).column = + std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { + auto new_filter_column = ColumnVector<UInt8>::create(); + auto& filter_map = new_filter_column->get_data(); + + if (!column->empty()) { + filter_map.emplace_back(column->get_bool(0)); + } + for (int i = 1; i < column->size(); ++i) { + if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { + // Only last same element is true, output last one + filter_map.push_back(true); + filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map.push_back(false); + } + } + + output_block->get_by_position(result_column_id).column = + std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + auto new_filter_column = ColumnVector<UInt8>::create(); + auto& filter_map = new_filter_column->get_data(); + + if (!column->empty()) { + // Both equal conjuncts and other conjuncts are true + filter_map.emplace_back(column->get_bool(0) && visited_map[0]); + } + for (int i = 1; i < column->size(); ++i) { + if ((visited_map[i] && column->get_bool(i)) || + (same_to_prev[i] && filter_map[i - 1])) { + // When either of two conditions is meet: + // 1. Both equal conjuncts and other conjuncts are true or same_to_prev + // 2. This row is joined from the same build side row as the previous row + // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] + // is true. + filter_map.push_back(true); + filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; + } else { + filter_map.push_back(false); + } + } + + // Same to the semi join, but change the last value to opposite value + for (int i = 1; i < same_to_prev.size(); ++i) { + if (!same_to_prev[i]) { + filter_map[i - 1] = !filter_map[i - 1]; + } + } + filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1]; + + output_block->get_by_position(result_column_id).column = + std::move(new_filter_column); + } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { + for (int i = 0; i < column->size(); ++i) { + DCHECK(visited_map[i]); + *visited_map[i] |= column->get_bool(i); + } + } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { + auto filter_size = 0; + for (int i = 0; i < column->size(); ++i) { + DCHECK(visited_map[i]); + auto result = column->get_bool(i); + *visited_map[i] |= result; + filter_size += result; + } + _tuple_is_null_left_flags->resize_fill(filter_size, 0); + } else { + // inner join do nothing + } + + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || + JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { + output_block->clear(); + } else { + if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN || + JoinOpType == TJoinOp::LEFT_ANTI_JOIN || + JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + orig_columns = right_col_idx; + } + Block::filter_block(output_block, result_column_id, orig_columns); + } + } + + return Status::OK(); + } else { + LOG(FATAL) << "Invalid RowRefList"; + return Status::InvalidArgument("Invalid RowRefList"); + } +} + +template <int JoinOpType> +template <typename HashTableType> +Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableType& hash_table_ctx, + MutableBlock& mutable_block, + Block* output_block, + bool* eos) { + using Mapped = typename HashTableType::Mapped; + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> || + std::is_same_v<Mapped, RowRefListWithFlags>) { + hash_table_ctx.init_once(); + auto& mcol = mutable_block.mutable_columns(); + + bool right_semi_anti_without_other = + _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct; + int right_col_idx = + right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size(); + int right_col_len = _join_node->_right_table_data_types.size(); + + auto& iter = hash_table_ctx.iter; + auto block_size = 0; + + auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { + block_size++; + for (size_t j = 0; j < right_col_len; ++j) { + auto& column = *_build_blocks[offset].get_by_position(j).column; + mcol[j + right_col_idx]->insert_from(column, row_num); + } + }; + + for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size < _batch_size; ++iter) { + auto& mapped = iter->get_second(); + if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { + if (mapped.visited) { + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + insert_from_hash_table(it->block_offset, it->row_num); + } + } + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { + insert_from_hash_table(it->block_offset, it->row_num); + } + } + } + } else { + for (auto it = mapped.begin(); it.ok(); ++it) { + if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { + if (it->visited) { + insert_from_hash_table(it->block_offset, it->row_num); + } + } else { + if (!it->visited) { + insert_from_hash_table(it->block_offset, it->row_num); + } + } + } + } + } + + // just resize the left table column in case with other conjunct to make block size is not zero + if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) { + auto target_size = mcol[right_col_idx]->size(); + for (int i = 0; i < right_col_idx; ++i) { + mcol[i]->resize(target_size); + } + } + + // right outer join / full join need insert data of left table + if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || + JoinOpType == TJoinOp::FULL_OUTER_JOIN) { + for (int i = 0; i < right_col_idx; ++i) { + assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size); + } + _tuple_is_null_left_flags->resize_fill(block_size, 1); + } + *eos = iter == hash_table_ctx.hash_table_ptr->end(); + output_block->swap( + mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); + return Status::OK(); + } else { + LOG(FATAL) << "Invalid RowRefList"; + return Status::InvalidArgument("Invalid RowRefList"); + } +} + +template <typename T> +struct ExtractType; + +template <typename T, typename U> +struct ExtractType<T(U)> { + using Type = U; +}; + +#define INSTANTIATION(JoinOpType, T) \ + template Status \ + ProcessHashTableProbe<JoinOpType>::do_process<false, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::do_process<false, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::do_process<true, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status \ + ProcessHashTableProbe<JoinOpType>::do_process<true, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + \ + template Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts< \ + false, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts< \ + false, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts< \ + true, false, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + template Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts< \ + true, true, ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, ConstNullMapPtr null_map, \ + MutableBlock & mutable_block, Block * output_block, size_t probe_rows); \ + \ + template Status \ + ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable<ExtractType<void(T)>::Type>( \ + ExtractType<void(T)>::Type & hash_table_ctx, MutableBlock & mutable_block, \ + Block * output_block, bool* eos) + +#define INSTANTIATION_FOR(JoinOpType) \ + template struct ProcessHashTableProbe<JoinOpType>; \ + \ + template void ProcessHashTableProbe<JoinOpType>::build_side_output_column<false>( \ + MutableColumns & mcol, int column_offset, int column_length, \ + const std::vector<bool>& output_slot_flags, int size); \ + template void ProcessHashTableProbe<JoinOpType>::build_side_output_column<true>( \ + MutableColumns & mcol, int column_offset, int column_length, \ + const std::vector<bool>& output_slot_flags, int size); \ + \ + INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefList>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefList>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefList>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefList>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefList>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefList>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefList>)); \ + INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefListWithFlag>)); \ + INSTANTIATION(JoinOpType, (SerializedHashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I8HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I16HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I32HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I64HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I128HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I256HashTableContext<RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<true, RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I64FixedKeyHashTableContext<false, RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<true, RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I128FixedKeyHashTableContext<false, RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<true, RowRefListWithFlags>)); \ + INSTANTIATION(JoinOpType, (I256FixedKeyHashTableContext<false, RowRefListWithFlags>)) + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/join/right_anti_join_impl.cpp b/be/src/vec/exec/join/right_anti_join_impl.cpp new file mode 100644 index 0000000000..79d7ab7a49 --- /dev/null +++ b/be/src/vec/exec/join/right_anti_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::RIGHT_ANTI_JOIN); + +} diff --git a/be/src/vec/exec/join/right_outer_join_impl.cpp b/be/src/vec/exec/join/right_outer_join_impl.cpp new file mode 100644 index 0000000000..eea4bda8a1 --- /dev/null +++ b/be/src/vec/exec/join/right_outer_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::RIGHT_OUTER_JOIN); + +} diff --git a/be/src/vec/exec/join/right_semi_join_impl.cpp b/be/src/vec/exec/join/right_semi_join_impl.cpp new file mode 100644 index 0000000000..354b3018b9 --- /dev/null +++ b/be/src/vec/exec/join/right_semi_join_impl.cpp @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gen_cpp/PlanNodes_types.h" +#include "process_hash_table_probe_impl.h" + +namespace doris::vectorized { + +INSTANTIATION_FOR(TJoinOp::RIGHT_SEMI_JOIN); + +} diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 561f9ca3f2..a761ca2419 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -30,22 +30,7 @@ namespace doris::vectorized { -#define DISABLE_OPTIMIZATION - -#ifndef NDEBUG - -#undef DISABLE_OPTIMIZATION -#if defined(__GNUC__) -#define DISABLE_OPTIMIZATION __attribute__((optimize("O0"))) -#elif defined(__clang__) -#define DISABLE_OPTIMIZATION __attribute__((optnone)) -#endif - -#endif - -// TODO: Best prefetch step is decided by machine. We should also provide a -// SQL hint to allow users to tune by hand. -static constexpr int PREFETCH_STEP = 64; +static constexpr int PREFETCH_STEP = HashJoinNode::PREFETCH_STEP; template Status HashJoinNode::_extract_join_column<true>( Block&, COW<IColumn>::mutable_ptr<ColumnVector<unsigned char>>&, @@ -236,657 +221,6 @@ private: HashJoinNode* _join_node; }; -template <int JoinOpType> -ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinNode* join_node, int batch_size) - : _join_node(join_node), - _batch_size(batch_size), - _build_blocks(join_node->_build_blocks), - _tuple_is_null_left_flags(join_node->_is_outer_join - ? &(reinterpret_cast<ColumnUInt8&>( - *join_node->_tuple_is_null_left_flag_column) - .get_data()) - : nullptr), - _tuple_is_null_right_flags( - join_node->_is_outer_join - ? &(reinterpret_cast<ColumnUInt8&>( - *join_node->_tuple_is_null_right_flag_column) - .get_data()) - : nullptr), - _rows_returned_counter(join_node->_rows_returned_counter), - _search_hashtable_timer(join_node->_search_hashtable_timer), - _build_side_output_timer(join_node->_build_side_output_timer), - _probe_side_output_timer(join_node->_probe_side_output_timer) {} - -template <int JoinOpType> -template <bool have_other_join_conjunct> -void ProcessHashTableProbe<JoinOpType>::build_side_output_column( - MutableColumns& mcol, int column_offset, int column_length, - const std::vector<bool>& output_slot_flags, int size) { - constexpr auto is_semi_anti_join = JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || - JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::LEFT_SEMI_JOIN; - - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - - if constexpr (!is_semi_anti_join || have_other_join_conjunct) { - if (_build_blocks.size() == 1) { - for (int i = 0; i < column_length; i++) { - auto& column = *_build_blocks[0].get_by_position(i).column; - if (output_slot_flags[i]) { - mcol[i + column_offset]->insert_indices_from(column, _build_block_rows.data(), - _build_block_rows.data() + size); - } else { - mcol[i + column_offset]->insert_many_defaults(size); - } - } - } else { - for (int i = 0; i < column_length; i++) { - if (output_slot_flags[i]) { - for (int j = 0; j < size; j++) { - if constexpr (probe_all) { - if (_build_block_offsets[j] == -1) { - DCHECK(mcol[i + column_offset]->is_nullable()); - assert_cast<ColumnNullable*>(mcol[i + column_offset].get()) - ->insert_default(); - } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); - } - } else { - if (_build_block_offsets[j] == -1) { - // the only case to reach here: - // 1. left anti join with other conjuncts, and - // 2. equal conjuncts does not match - // since nullptr is emplaced back to visited_map, - // the output value of the build side does not matter, - // just insert default value - mcol[i + column_offset]->insert_default(); - } else { - auto& column = *_build_blocks[_build_block_offsets[j]] - .get_by_position(i) - .column; - mcol[i + column_offset]->insert_from(column, _build_block_rows[j]); - } - } - } - } else { - mcol[i + column_offset]->insert_many_defaults(size); - } - } - } - } - - // Dispose right tuple is null flags columns - if constexpr (probe_all && !have_other_join_conjunct) { - _tuple_is_null_right_flags->resize(size); - auto* __restrict null_data = _tuple_is_null_right_flags->data(); - for (int i = 0; i < size; ++i) { - null_data[i] = _build_block_rows[i] == -1; - } - } -} - -template <int JoinOpType> -void ProcessHashTableProbe<JoinOpType>::probe_side_output_column( - MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size, - int last_probe_index, size_t probe_size, bool all_match_one, - bool have_other_join_conjunct) { - auto& probe_block = _join_node->_probe_block; - for (int i = 0; i < output_slot_flags.size(); ++i) { - if (output_slot_flags[i]) { - auto& column = probe_block.get_by_position(i).column; - if (all_match_one) { - DCHECK_EQ(probe_size, column->size() - last_probe_index); - mcol[i]->insert_range_from(*column, last_probe_index, probe_size); - } else { - DCHECK_GE(_items_counts.size(), last_probe_index + probe_size); - column->replicate(&_items_counts[0], size, *mcol[i], last_probe_index, probe_size); - } - } else { - mcol[i]->insert_many_defaults(size); - } - } - - if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { - if (!have_other_join_conjunct) { - _tuple_is_null_left_flags->resize_fill(size, 0); - } - } -} - -template <int JoinOpType> -template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> -DISABLE_OPTIMIZATION Status ProcessHashTableProbe<JoinOpType>::do_process( - HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, - Block* output_block, size_t probe_rows) { - auto& probe_index = _join_node->_probe_index; - auto& probe_raw_ptrs = _join_node->_probe_columns; - if (probe_index == 0 && _items_counts.size() < probe_rows) { - _items_counts.resize(probe_rows); - } - - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - } - using KeyGetter = typename HashTableType::State; - using Mapped = typename HashTableType::Mapped; - - int right_col_idx = - _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); - - KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - auto& mcol = mutable_block.mutable_columns(); - int current_offset = 0; - - constexpr auto is_right_semi_anti_join = - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType == TJoinOp::RIGHT_SEMI_JOIN; - - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - - bool all_match_one = true; - int last_probe_index = probe_index; - { - SCOPED_TIMER(_search_hashtable_timer); - while (probe_index < probe_rows) { - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } else { - _items_counts[probe_index++] = (uint32_t)0; - } - all_match_one = false; - continue; - } - } - int last_offset = current_offset; - auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) - : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); - if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr, - probe_index + PREFETCH_STEP, _arena); - - if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (!find_result.is_found()) { - ++current_offset; - } - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - if (find_result.is_found()) { - ++current_offset; - } - } else { - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } - - if constexpr (!is_right_semi_anti_join) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - } - } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (!is_right_semi_anti_join) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - } - } - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - mapped.visited = true; - } - } - } else { - if constexpr (probe_all) { - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } - } - } - - uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[probe_index++] = count; - all_match_one &= (count == 1); - if (current_offset >= _batch_size && !all_match_one) { - break; - } - } - } - - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column(mcol, right_col_idx, right_col_len, - _join_node->_right_output_slot_flags, current_offset); - } - - if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && - JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { - SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_index - last_probe_index, all_match_one, - false); - } - - output_block->swap(mutable_block.to_block()); - - return Status::OK(); -} - -template <int JoinOpType> -template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> -DISABLE_OPTIMIZATION Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( - HashTableType& hash_table_ctx, ConstNullMapPtr null_map, MutableBlock& mutable_block, - Block* output_block, size_t probe_rows) { - auto& probe_index = _join_node->_probe_index; - auto& probe_raw_ptrs = _join_node->_probe_columns; - if (probe_index == 0 && _items_counts.size() < probe_rows) { - _items_counts.resize(probe_rows); - } - if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) { - _build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - _build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE); - } - - using KeyGetter = typename HashTableType::State; - using Mapped = typename HashTableType::Mapped; - if constexpr (std::is_same_v<Mapped, RowRefListWithFlags>) { - constexpr auto probe_all = - JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN; - KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr); - - int right_col_idx = _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); - - auto& mcol = mutable_block.mutable_columns(); - // use in right join to change visited state after - // exec the vother join conjunct - std::vector<bool*> visited_map; - visited_map.reserve(1.2 * _batch_size); - - std::vector<bool> same_to_prev; - same_to_prev.reserve(1.2 * _batch_size); - - int current_offset = 0; - - bool all_match_one = true; - int last_probe_index = probe_index; - while (probe_index < probe_rows) { - // ignore null rows - if constexpr (ignore_null && need_null_map_for_probe) { - if ((*null_map)[probe_index]) { - if constexpr (probe_all) { - _items_counts[probe_index++] = (uint32_t)1; - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } else { - _items_counts[probe_index++] = (uint32_t)0; - } - all_match_one = false; - continue; - } - } - - auto last_offset = current_offset; - auto find_result = - !need_null_map_for_probe ? key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena) - : (*null_map)[probe_index] - ? decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr, - probe_index, _arena)) {nullptr, false} - : key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index, - _arena); - if (probe_index + PREFETCH_STEP < probe_rows) - key_getter.template prefetch<true>(*hash_table_ctx.hash_table_ptr, - probe_index + PREFETCH_STEP, _arena); - if (find_result.is_found()) { - auto& mapped = find_result.get_mapped(); - auto origin_offset = current_offset; - // TODO: Iterators are currently considered to be a heavy operation and have a certain impact on performance. - // We should rethink whether to use this iterator mode in the future. Now just opt the one row case - if (mapped.get_row_count() == 1) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = mapped.block_offset; - _build_block_rows[current_offset] = mapped.row_num; - } else { - _build_block_offsets.emplace_back(mapped.block_offset); - _build_block_rows.emplace_back(mapped.row_num); - } - ++current_offset; - visited_map.emplace_back(&mapped.visited); - } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = it->block_offset; - _build_block_rows[current_offset] = it->row_num; - } else { - _build_block_offsets.emplace_back(it->block_offset); - _build_block_rows.emplace_back(it->row_num); - } - ++current_offset; - visited_map.emplace_back(&it->visited); - } - } - same_to_prev.emplace_back(false); - for (int i = 0; i < current_offset - origin_offset - 1; ++i) { - same_to_prev.emplace_back(true); - } - } else if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - same_to_prev.emplace_back(false); - visited_map.emplace_back(nullptr); - // only full outer / left outer need insert the data of right table - // left anti use -1 use a default value - if (LIKELY(current_offset < _build_block_rows.size())) { - _build_block_offsets[current_offset] = -1; - _build_block_rows[current_offset] = -1; - } else { - _build_block_offsets.emplace_back(-1); - _build_block_rows.emplace_back(-1); - } - ++current_offset; - } else { - // other join, no nothing - } - uint32_t count = (uint32_t)(current_offset - last_offset); - _items_counts[probe_index++] = count; - all_match_one &= (count == 1); - if (current_offset >= _batch_size && !all_match_one) { - break; - } - } - - { - SCOPED_TIMER(_build_side_output_timer); - build_side_output_column<true>(mcol, right_col_idx, right_col_len, - _join_node->_right_output_slot_flags, current_offset); - } - { - SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_index - last_probe_index, - all_match_one, true); - } - output_block->swap(mutable_block.to_block()); - - // dispose the other join conjunct exec - if (output_block->rows()) { - int result_column_id = -1; - int orig_columns = output_block->columns(); - (*_join_node->_vother_join_conjunct_ptr)->execute(output_block, &result_column_id); - - auto column = output_block->get_by_position(result_column_id).column; - if constexpr (JoinOpType == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - auto new_filter_column = ColumnVector<UInt8>::create(); - auto& filter_map = new_filter_column->get_data(); - - auto null_map_column = ColumnVector<UInt8>::create(column->size(), 0); - auto* __restrict null_map_data = null_map_column->get_data().data(); - - for (int i = 0; i < column->size(); ++i) { - auto join_hit = visited_map[i] != nullptr; - auto other_hit = column->get_bool(i); - - if (!other_hit) { - for (size_t j = 0; j < right_col_len; ++j) { - typeid_cast<ColumnNullable*>( - std::move(*output_block->get_by_position(j + right_col_idx) - .column) - .assume_mutable() - .get()) - ->get_null_map_data()[i] = true; - } - } - null_map_data[i] = !join_hit || !other_hit; - - if (join_hit) { - *visited_map[i] |= other_hit; - filter_map.push_back(other_hit || !same_to_prev[i] || - (!column->get_bool(i - 1) && filter_map.back())); - // Here to keep only hit join conjunct and other join conjunt is true need to be output. - // if not, only some key must keep one row will output will null right table column - if (same_to_prev[i] && filter_map.back() && !column->get_bool(i - 1)) - filter_map[i - 1] = false; - } else { - filter_map.push_back(true); - } - } - - for (int i = 0; i < column->size(); ++i) { - if (filter_map[i]) { - _tuple_is_null_right_flags->emplace_back(null_map_data[i]); - } - } - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) { - auto new_filter_column = ColumnVector<UInt8>::create(); - auto& filter_map = new_filter_column->get_data(); - - if (!column->empty()) { - filter_map.emplace_back(column->get_bool(0)); - } - for (int i = 1; i < column->size(); ++i) { - if (column->get_bool(i) || (same_to_prev[i] && filter_map[i - 1])) { - // Only last same element is true, output last one - filter_map.push_back(true); - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map.push_back(false); - } - } - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - auto new_filter_column = ColumnVector<UInt8>::create(); - auto& filter_map = new_filter_column->get_data(); - - if (!column->empty()) { - // Both equal conjuncts and other conjuncts are true - filter_map.emplace_back(column->get_bool(0) && visited_map[0]); - } - for (int i = 1; i < column->size(); ++i) { - if ((visited_map[i] && column->get_bool(i)) || - (same_to_prev[i] && filter_map[i - 1])) { - // When either of two conditions is meet: - // 1. Both equal conjuncts and other conjuncts are true or same_to_prev - // 2. This row is joined from the same build side row as the previous row - // Set filter_map[i] to true and filter_map[i - 1] to false if same_to_prev[i] - // is true. - filter_map.push_back(true); - filter_map[i - 1] = !same_to_prev[i] && filter_map[i - 1]; - } else { - filter_map.push_back(false); - } - } - - // Same to the semi join, but change the last value to opposite value - for (int i = 1; i < same_to_prev.size(); ++i) { - if (!same_to_prev[i]) filter_map[i - 1] = !filter_map[i - 1]; - } - filter_map[same_to_prev.size() - 1] = !filter_map[same_to_prev.size() - 1]; - - output_block->get_by_position(result_column_id).column = - std::move(new_filter_column); - } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { - for (int i = 0; i < column->size(); ++i) { - DCHECK(visited_map[i]); - *visited_map[i] |= column->get_bool(i); - } - } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) { - auto filter_size = 0; - for (int i = 0; i < column->size(); ++i) { - DCHECK(visited_map[i]); - auto result = column->get_bool(i); - *visited_map[i] |= result; - filter_size += result; - } - _tuple_is_null_left_flags->resize_fill(filter_size, 0); - } else { - // inner join do nothing - } - - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN || - JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) { - output_block->clear(); - } else { - if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN || - JoinOpType == TJoinOp::LEFT_ANTI_JOIN || - JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - orig_columns = right_col_idx; - } - Block::filter_block(output_block, result_column_id, orig_columns); - } - } - - return Status::OK(); - } else { - LOG(FATAL) << "Invalid RowRefList"; - return Status::InvalidArgument("Invalid RowRefList"); - } -} - -template <int JoinOpType> -template <typename HashTableType> -DISABLE_OPTIMIZATION Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable( - HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block* output_block, - bool* eos) { - using Mapped = typename HashTableType::Mapped; - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> || - std::is_same_v<Mapped, RowRefListWithFlags>) { - hash_table_ctx.init_once(); - auto& mcol = mutable_block.mutable_columns(); - - bool right_semi_anti_without_other = - _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct; - int right_col_idx = - right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size(); - int right_col_len = _join_node->_right_table_data_types.size(); - - auto& iter = hash_table_ctx.iter; - auto block_size = 0; - - auto insert_from_hash_table = [&](uint8_t offset, uint32_t row_num) { - block_size++; - for (size_t j = 0; j < right_col_len; ++j) { - auto& column = *_build_blocks[offset].get_by_position(j).column; - mcol[j + right_col_idx]->insert_from(column, row_num); - } - }; - - for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size < _batch_size; ++iter) { - auto& mapped = iter->get_second(); - if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) { - if (mapped.visited) { - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - insert_from_hash_table(it->block_offset, it->row_num); - } - } - } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) { - insert_from_hash_table(it->block_offset, it->row_num); - } - } - } - } else { - for (auto it = mapped.begin(); it.ok(); ++it) { - if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) { - if (it->visited) insert_from_hash_table(it->block_offset, it->row_num); - } else { - if (!it->visited) insert_from_hash_table(it->block_offset, it->row_num); - } - } - } - } - - // just resize the left table column in case with other conjunct to make block size is not zero - if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) { - auto target_size = mcol[right_col_idx]->size(); - for (int i = 0; i < right_col_idx; ++i) { - mcol[i]->resize(target_size); - } - } - - // right outer join / full join need insert data of left table - if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || - JoinOpType == TJoinOp::FULL_OUTER_JOIN) { - for (int i = 0; i < right_col_idx; ++i) { - assert_cast<ColumnNullable*>(mcol[i].get())->insert_many_defaults(block_size); - } - _tuple_is_null_left_flags->resize_fill(block_size, 1); - } - *eos = iter == hash_table_ctx.hash_table_ptr->end(); - output_block->swap( - mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); - return Status::OK(); - } else { - LOG(FATAL) << "Invalid RowRefList"; - return Status::InvalidArgument("Invalid RowRefList"); - } -} - HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VJoinNodeBase(pool, tnode, descs), _mem_used(0), @@ -904,8 +238,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _build_blocks.reserve(_MAX_BUILD_BLOCK_COUNT); } -HashJoinNode::~HashJoinNode() = default; - Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(VJoinNodeBase::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 25a40159bc..f2774979c3 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -22,6 +22,7 @@ #include "exprs/runtime_filter_slots.h" #include "join_op.h" +#include "process_hash_table_probe.h" #include "vec/common/columns_hashing.h" #include "vec/common/hash_table/hash_map.h" #include "vjoin_node_base.h" @@ -165,62 +166,6 @@ using HashTableVariants = std::variant< class VExprContext; class HashJoinNode; -template <int JoinOpType> -struct ProcessHashTableProbe { - ProcessHashTableProbe(HashJoinNode* join_node, int batch_size); - - // output build side result column - template <bool have_other_join_conjunct = false> - void build_side_output_column(MutableColumns& mcol, int column_offset, int column_length, - const std::vector<bool>& output_slot_flags, int size); - - void probe_side_output_column(MutableColumns& mcol, const std::vector<bool>& output_slot_flags, - int size, int last_probe_index, size_t probe_size, - bool all_match_one, bool have_other_join_conjunct); - // Only process the join with no other join conjunct, because of no other join conjunt - // the output block struct is same with mutable block. we can do more opt on it and simplify - // the logic of probe - // TODO: opt the visited here to reduce the size of hash table - template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> - Status do_process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, size_t probe_rows); - // In the presence of other join conjunct, the process of join become more complicated. - // each matching join column need to be processed by other join conjunct. so the struct of mutable block - // and output block may be different - // The output result is determined by the other join conjunct result and same_to_prev struct - template <bool need_null_map_for_probe, bool ignore_null, typename HashTableType> - Status do_process_with_other_join_conjuncts(HashTableType& hash_table_ctx, - ConstNullMapPtr null_map, - MutableBlock& mutable_block, Block* output_block, - size_t probe_rows); - - // Process full outer join/ right join / right semi/anti join to output the join result - // in hash table - template <typename HashTableType> - Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block, - Block* output_block, bool* eos); - - vectorized::HashJoinNode* _join_node; - const int _batch_size; - const std::vector<Block>& _build_blocks; - Arena _arena; - - std::vector<uint32_t> _items_counts; - std::vector<int8_t> _build_block_offsets; - std::vector<int> _build_block_rows; - // only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container* _tuple_is_null_left_flags; - // only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN - ColumnUInt8::Container* _tuple_is_null_right_flags; - - RuntimeProfile::Counter* _rows_returned_counter; - RuntimeProfile::Counter* _search_hashtable_timer; - RuntimeProfile::Counter* _build_side_output_timer; - RuntimeProfile::Counter* _probe_side_output_timer; - - static constexpr int PROBE_SIDE_EXPLODE_RATE = 3; -}; - using HashTableCtxVariants = std::variant<std::monostate, ProcessHashTableProbe<TJoinOp::INNER_JOIN>, ProcessHashTableProbe<TJoinOp::LEFT_SEMI_JOIN>, @@ -235,8 +180,11 @@ using HashTableCtxVariants = class HashJoinNode final : public VJoinNodeBase { public: + // TODO: Best prefetch step is decided by machine. We should also provide a + // SQL hint to allow users to tune by hand. + static constexpr int PREFETCH_STEP = 64; + HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~HashJoinNode() override; Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/vec/functions/function_date_or_datetime_computation.cpp b/be/src/vec/functions/function_date_or_datetime_computation.cpp index f63b3d8be3..02dd86b58e 100644 --- a/be/src/vec/functions/function_date_or_datetime_computation.cpp +++ b/be/src/vec/functions/function_date_or_datetime_computation.cpp @@ -23,8 +23,6 @@ namespace doris::vectorized { using FunctionAddSeconds = FunctionDateOrDateTimeComputation< AddSecondsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddSecondsV2 = FunctionDateOrDateTimeComputation< - AddSecondsImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; using FunctionAddMinutes = FunctionDateOrDateTimeComputation< AddMinutesImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddHours = @@ -35,24 +33,10 @@ using FunctionAddWeeks = FunctionDateOrDateTimeComputation<AddWeeksImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddMonths = FunctionDateOrDateTimeComputation<AddMonthsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddMinutesV2 = FunctionDateOrDateTimeComputation< - AddMinutesImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionAddHoursV2 = - FunctionDateOrDateTimeComputation<AddHoursImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionAddDaysV2 = - FunctionDateOrDateTimeComputation<AddDaysImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionAddWeeksV2 = - FunctionDateOrDateTimeComputation<AddWeeksImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionAddMonthsV2 = - FunctionDateOrDateTimeComputation<AddMonthsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; using FunctionAddQuarters = FunctionDateOrDateTimeComputation< AddQuartersImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddQuartersV2 = - FunctionDateOrDateTimeComputation<AddQuartersImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; using FunctionAddYears = FunctionDateOrDateTimeComputation<AddYearsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddYearsV2 = - FunctionDateOrDateTimeComputation<AddYearsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; using FunctionSubSeconds = FunctionDateOrDateTimeComputation< SubtractSecondsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; @@ -70,22 +54,6 @@ using FunctionSubQuarters = FunctionDateOrDateTimeComputation< SubtractQuartersImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionSubYears = FunctionDateOrDateTimeComputation< SubtractYearsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubSecondsV2 = FunctionDateOrDateTimeComputation< - SubtractSecondsImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionSubMinutesV2 = FunctionDateOrDateTimeComputation< - SubtractMinutesImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionSubHoursV2 = FunctionDateOrDateTimeComputation< - SubtractHoursImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionSubDaysV2 = - FunctionDateOrDateTimeComputation<SubtractDaysImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionSubWeeksV2 = FunctionDateOrDateTimeComputation< - SubtractWeeksImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionSubMonthsV2 = FunctionDateOrDateTimeComputation< - SubtractMonthsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionSubQuartersV2 = FunctionDateOrDateTimeComputation< - SubtractQuartersImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionSubYearsV2 = FunctionDateOrDateTimeComputation< - SubtractYearsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; using FunctionDateDiff = FunctionDateOrDateTimeComputation<DateDiffImpl< VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; @@ -108,86 +76,8 @@ using FunctionSecondsDiff = FunctionDateOrDateTimeComputation<SecondsDiffImpl< using FunctionToYearWeekTwoArgs = FunctionDateOrDateTimeComputation< ToYearWeekTwoArgsImpl<VecDateTimeValue, DataTypeDateTime, Int64>>; -using FunctionToYearWeekTwoArgsV2 = FunctionDateOrDateTimeComputation< - ToYearWeekTwoArgsImpl<DateV2Value<DateV2ValueType>, DataTypeDateV2, UInt32>>; using FunctionToWeekTwoArgs = FunctionDateOrDateTimeComputation< ToWeekTwoArgsImpl<VecDateTimeValue, DataTypeDateTime, Int64>>; -using FunctionToWeekTwoArgsV2 = FunctionDateOrDateTimeComputation< - ToWeekTwoArgsImpl<DateV2Value<DateV2ValueType>, DataTypeDateV2, UInt32>>; - -using FunctionDatetimeV2AddSeconds = FunctionDateOrDateTimeComputation< - AddSecondsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddMinutes = FunctionDateOrDateTimeComputation< - AddMinutesImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddHours = FunctionDateOrDateTimeComputation< - AddHoursImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddDays = FunctionDateOrDateTimeComputation< - AddDaysImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddWeeks = FunctionDateOrDateTimeComputation< - AddWeeksImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddMonths = FunctionDateOrDateTimeComputation< - AddMonthsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; - -using FunctionDatetimeV2AddQuarters = FunctionDateOrDateTimeComputation< - AddQuartersImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2AddYears = FunctionDateOrDateTimeComputation< - AddYearsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; - -using FunctionDatetimeV2SubSeconds = FunctionDateOrDateTimeComputation< - SubtractSecondsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubMinutes = FunctionDateOrDateTimeComputation< - SubtractMinutesImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubHours = FunctionDateOrDateTimeComputation< - SubtractHoursImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubDays = FunctionDateOrDateTimeComputation< - SubtractDaysImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubWeeks = FunctionDateOrDateTimeComputation< - SubtractWeeksImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubMonths = FunctionDateOrDateTimeComputation< - SubtractMonthsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubQuarters = FunctionDateOrDateTimeComputation< - SubtractQuartersImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; -using FunctionDatetimeV2SubYears = FunctionDateOrDateTimeComputation< - SubtractYearsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; - -#define FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, TYPE1, TYPE2, ARG1, ARG2, DATE_VALUE1, \ - DATE_VALUE2) \ - using NAME##_##TYPE1##_##TYPE2 = FunctionDateOrDateTimeComputation< \ - IMPL<DATE_VALUE1, DATE_VALUE2, TYPE1, TYPE2, ARG1, ARG2>>; - -#define ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateTimeV2, DataTypeDateTimeV2, UInt64, \ - UInt64, DateV2Value<DateTimeV2ValueType>, \ - DateV2Value<DateTimeV2ValueType>) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateTimeV2, DataTypeDateV2, UInt64, UInt32, \ - DateV2Value<DateTimeV2ValueType>, DateV2Value<DateV2ValueType>) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateV2, DataTypeDateTimeV2, UInt32, UInt64, \ - DateV2Value<DateV2ValueType>, DateV2Value<DateTimeV2ValueType>) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateTimeV2, DataTypeDateTime, UInt64, Int64, \ - DateV2Value<DateTimeV2ValueType>, VecDateTimeValue) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateTime, DataTypeDateTimeV2, Int64, UInt64, \ - VecDateTimeValue, DateV2Value<DateTimeV2ValueType>) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateTime, DataTypeDateV2, Int64, UInt32, \ - VecDateTimeValue, DateV2Value<DateV2ValueType>) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateV2, DataTypeDateTime, UInt32, Int64, \ - DateV2Value<DateV2ValueType>, VecDateTimeValue) \ - FUNCTION_DATEV2_WITH_TWO_ARGS(NAME, IMPL, DataTypeDateV2, DataTypeDateV2, UInt32, UInt32, \ - DateV2Value<DateV2ValueType>, DateV2Value<DateV2ValueType>) - -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2DateDiff, DateDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2TimeDiff, TimeDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2YearsDiff, YearsDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2MonthsDiff, MonthsDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2WeeksDiff, WeeksDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2HoursDiff, HoursDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2MinutesDiff, MintueSDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2SecondsDiff, SecondsDiffImpl) -ALL_FUNCTION_DATEV2_WITH_TWO_ARGS(FunctionDatetimeV2DaysDiff, DaysDiffImpl) - -using FunctionDatetimeV2ToYearWeekTwoArgs = FunctionDateOrDateTimeComputation< - ToYearWeekTwoArgsImpl<DateV2Value<DateTimeV2ValueType>, DataTypeDateTimeV2, UInt64>>; -using FunctionDatetimeV2ToWeekTwoArgs = FunctionDateOrDateTimeComputation< - ToWeekTwoArgsImpl<DateV2Value<DateTimeV2ValueType>, DataTypeDateTimeV2, UInt64>>; struct NowFunctionName { static constexpr auto name = "now"; @@ -253,55 +143,21 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { factory.register_function<FunctionAddMinutes>(); factory.register_function<FunctionAddHours>(); factory.register_function<FunctionAddDays>(); - factory.register_function<FunctionAddSecondsV2>(); - factory.register_function<FunctionAddMinutesV2>(); - factory.register_function<FunctionAddHoursV2>(); - factory.register_function<FunctionAddDaysV2>(); factory.register_function<FunctionAddWeeks>(); factory.register_function<FunctionAddMonths>(); factory.register_function<FunctionAddYears>(); factory.register_function<FunctionAddQuarters>(); - factory.register_function<FunctionAddWeeksV2>(); - factory.register_function<FunctionAddMonthsV2>(); - factory.register_function<FunctionAddYearsV2>(); - factory.register_function<FunctionAddQuartersV2>(); - - factory.register_function<FunctionDatetimeV2AddSeconds>(); - factory.register_function<FunctionDatetimeV2AddMinutes>(); - factory.register_function<FunctionDatetimeV2AddHours>(); - factory.register_function<FunctionDatetimeV2AddDays>(); - factory.register_function<FunctionDatetimeV2AddWeeks>(); - factory.register_function<FunctionDatetimeV2AddMonths>(); - factory.register_function<FunctionDatetimeV2AddYears>(); - factory.register_function<FunctionDatetimeV2AddQuarters>(); factory.register_function<FunctionSubSeconds>(); factory.register_function<FunctionSubMinutes>(); factory.register_function<FunctionSubHours>(); factory.register_function<FunctionSubDays>(); - factory.register_function<FunctionSubSecondsV2>(); - factory.register_function<FunctionSubMinutesV2>(); - factory.register_function<FunctionSubHoursV2>(); - factory.register_function<FunctionSubDaysV2>(); factory.register_alias("days_sub", "date_sub"); factory.register_alias("days_sub", "subdate"); factory.register_function<FunctionSubMonths>(); factory.register_function<FunctionSubYears>(); factory.register_function<FunctionSubQuarters>(); factory.register_function<FunctionSubWeeks>(); - factory.register_function<FunctionSubMonthsV2>(); - factory.register_function<FunctionSubYearsV2>(); - factory.register_function<FunctionSubQuartersV2>(); - factory.register_function<FunctionSubWeeksV2>(); - - factory.register_function<FunctionDatetimeV2SubSeconds>(); - factory.register_function<FunctionDatetimeV2SubMinutes>(); - factory.register_function<FunctionDatetimeV2SubHours>(); - factory.register_function<FunctionDatetimeV2SubDays>(); - factory.register_function<FunctionDatetimeV2SubMonths>(); - factory.register_function<FunctionDatetimeV2SubYears>(); - factory.register_function<FunctionDatetimeV2SubQuarters>(); - factory.register_function<FunctionDatetimeV2SubWeeks>(); factory.register_function<FunctionDateDiff>(); factory.register_function<FunctionTimeDiff>(); @@ -313,35 +169,8 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { factory.register_function<FunctionMinutesDiff>(); factory.register_function<FunctionSecondsDiff>(); -#define REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, TYPE1, TYPE2) \ - factory.register_function<NAME##_##TYPE1##_##TYPE2>(); - -#define REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateTimeV2, DataTypeDateTimeV2) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateTimeV2, DataTypeDateV2) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateV2, DataTypeDateTimeV2) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateTimeV2, DataTypeDateTime) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateV2, DataTypeDateTime) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateV2, DataTypeDateV2) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateTime, DataTypeDateV2) \ - REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, DataTypeDateTime, DataTypeDateTimeV2) - - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2DateDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2TimeDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2YearsDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2MonthsDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2WeeksDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2HoursDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2MinutesDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2SecondsDiff) - REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2DaysDiff) - factory.register_function<FunctionToYearWeekTwoArgs>(); factory.register_function<FunctionToWeekTwoArgs>(); - factory.register_function<FunctionToYearWeekTwoArgsV2>(); - factory.register_function<FunctionToWeekTwoArgsV2>(); - factory.register_function<FunctionDatetimeV2ToYearWeekTwoArgs>(); - factory.register_function<FunctionDatetimeV2ToWeekTwoArgs>(); factory.register_function<FunctionNow>(); factory.register_function<FunctionCurrentTimestamp>(); @@ -364,4 +193,4 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { factory.register_alias("months_add", "add_months"); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/functions/function_date_or_datetime_computation.cpp b/be/src/vec/functions/function_date_or_datetime_computation_v2.cpp similarity index 58% copy from be/src/vec/functions/function_date_or_datetime_computation.cpp copy to be/src/vec/functions/function_date_or_datetime_computation_v2.cpp index f63b3d8be3..ced2b2d70c 100644 --- a/be/src/vec/functions/function_date_or_datetime_computation.cpp +++ b/be/src/vec/functions/function_date_or_datetime_computation_v2.cpp @@ -16,25 +16,12 @@ // under the License. #include "vec/functions/function_date_or_datetime_computation.h" - #include "vec/functions/simple_function_factory.h" namespace doris::vectorized { -using FunctionAddSeconds = FunctionDateOrDateTimeComputation< - AddSecondsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddSecondsV2 = FunctionDateOrDateTimeComputation< AddSecondsImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; -using FunctionAddMinutes = FunctionDateOrDateTimeComputation< - AddMinutesImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddHours = - FunctionDateOrDateTimeComputation<AddHoursImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddDays = - FunctionDateOrDateTimeComputation<AddDaysImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddWeeks = - FunctionDateOrDateTimeComputation<AddWeeksImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionAddMonths = - FunctionDateOrDateTimeComputation<AddMonthsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddMinutesV2 = FunctionDateOrDateTimeComputation< AddMinutesImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; using FunctionAddHoursV2 = @@ -45,31 +32,11 @@ using FunctionAddWeeksV2 = FunctionDateOrDateTimeComputation<AddWeeksImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; using FunctionAddMonthsV2 = FunctionDateOrDateTimeComputation<AddMonthsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionAddQuarters = FunctionDateOrDateTimeComputation< - AddQuartersImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddQuartersV2 = FunctionDateOrDateTimeComputation<AddQuartersImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionAddYears = - FunctionDateOrDateTimeComputation<AddYearsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionAddYearsV2 = FunctionDateOrDateTimeComputation<AddYearsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionSubSeconds = FunctionDateOrDateTimeComputation< - SubtractSecondsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubMinutes = FunctionDateOrDateTimeComputation< - SubtractMinutesImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubHours = FunctionDateOrDateTimeComputation< - SubtractHoursImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubDays = FunctionDateOrDateTimeComputation< - SubtractDaysImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubWeeks = FunctionDateOrDateTimeComputation< - SubtractWeeksImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubMonths = FunctionDateOrDateTimeComputation< - SubtractMonthsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubQuarters = FunctionDateOrDateTimeComputation< - SubtractQuartersImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; -using FunctionSubYears = FunctionDateOrDateTimeComputation< - SubtractYearsImpl<DataTypeDateTime, Int64, DataTypeDateTime>>; using FunctionSubSecondsV2 = FunctionDateOrDateTimeComputation< SubtractSecondsImpl<DataTypeDateV2, UInt32, DataTypeDateTimeV2>>; using FunctionSubMinutesV2 = FunctionDateOrDateTimeComputation< @@ -87,31 +54,8 @@ using FunctionSubQuartersV2 = FunctionDateOrDateTimeComputation< using FunctionSubYearsV2 = FunctionDateOrDateTimeComputation< SubtractYearsImpl<DataTypeDateV2, UInt32, DataTypeDateV2>>; -using FunctionDateDiff = FunctionDateOrDateTimeComputation<DateDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionTimeDiff = FunctionDateOrDateTimeComputation<TimeDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionYearsDiff = FunctionDateOrDateTimeComputation<YearsDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionMonthsDiff = FunctionDateOrDateTimeComputation<MonthsDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionDaysDiff = FunctionDateOrDateTimeComputation<DaysDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionWeeksDiff = FunctionDateOrDateTimeComputation<WeeksDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionHoursDiff = FunctionDateOrDateTimeComputation<HoursDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionMinutesDiff = FunctionDateOrDateTimeComputation<MintueSDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; -using FunctionSecondsDiff = FunctionDateOrDateTimeComputation<SecondsDiffImpl< - VecDateTimeValue, VecDateTimeValue, DataTypeDateTime, DataTypeDateTime, Int64, Int64>>; - -using FunctionToYearWeekTwoArgs = FunctionDateOrDateTimeComputation< - ToYearWeekTwoArgsImpl<VecDateTimeValue, DataTypeDateTime, Int64>>; using FunctionToYearWeekTwoArgsV2 = FunctionDateOrDateTimeComputation< ToYearWeekTwoArgsImpl<DateV2Value<DateV2ValueType>, DataTypeDateV2, UInt32>>; -using FunctionToWeekTwoArgs = FunctionDateOrDateTimeComputation< - ToWeekTwoArgsImpl<VecDateTimeValue, DataTypeDateTime, Int64>>; using FunctionToWeekTwoArgsV2 = FunctionDateOrDateTimeComputation< ToWeekTwoArgsImpl<DateV2Value<DateV2ValueType>, DataTypeDateV2, UInt32>>; @@ -127,7 +71,6 @@ using FunctionDatetimeV2AddWeeks = FunctionDateOrDateTimeComputation< AddWeeksImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; using FunctionDatetimeV2AddMonths = FunctionDateOrDateTimeComputation< AddMonthsImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; - using FunctionDatetimeV2AddQuarters = FunctionDateOrDateTimeComputation< AddQuartersImpl<DataTypeDateTimeV2, UInt64, DataTypeDateTimeV2>>; using FunctionDatetimeV2AddYears = FunctionDateOrDateTimeComputation< @@ -189,78 +132,11 @@ using FunctionDatetimeV2ToYearWeekTwoArgs = FunctionDateOrDateTimeComputation< using FunctionDatetimeV2ToWeekTwoArgs = FunctionDateOrDateTimeComputation< ToWeekTwoArgsImpl<DateV2Value<DateTimeV2ValueType>, DataTypeDateTimeV2, UInt64>>; -struct NowFunctionName { - static constexpr auto name = "now"; -}; - -struct CurrentTimestampFunctionName { - static constexpr auto name = "current_timestamp"; -}; - -struct LocalTimeFunctionName { - static constexpr auto name = "localtime"; -}; - -struct LocalTimestampFunctionName { - static constexpr auto name = "localtimestamp"; -}; - -using FunctionNow = FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<NowFunctionName, false>>; -using FunctionCurrentTimestamp = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<CurrentTimestampFunctionName, false>>; -using FunctionLocalTime = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<LocalTimeFunctionName, false>>; -using FunctionLocalTimestamp = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<LocalTimestampFunctionName, false>>; - -using FunctionNowWithPrecision = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<NowFunctionName, true>>; -using FunctionCurrentTimestampWithPrecision = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<CurrentTimestampFunctionName, true>>; -using FunctionLocalTimeWithPrecision = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<LocalTimeFunctionName, true>>; -using FunctionLocalTimestampWithPrecision = - FunctionCurrentDateOrDateTime<CurrentDateTimeImpl<LocalTimestampFunctionName, true>>; - -struct CurDateFunctionName { - static constexpr auto name = "curdate"; -}; - -struct CurrentDateFunctionName { - static constexpr auto name = "current_date"; -}; - -FunctionBuilderPtr createCurrentDateFunctionBuilderFunction() { - return std::make_shared<CurrentDateFunctionBuilder<CurrentDateFunctionName>>(); -} -FunctionBuilderPtr createCurDateFunctionBuilderFunction() { - return std::make_shared<CurrentDateFunctionBuilder<CurDateFunctionName>>(); -} - -struct CurTimeFunctionName { - static constexpr auto name = "curtime"; -}; -struct CurrentTimeFunctionName { - static constexpr auto name = "current_time"; -}; - -using FunctionCurTime = FunctionCurrentDateOrDateTime<CurrentTimeImpl<CurTimeFunctionName>>; -using FunctionCurrentTime = FunctionCurrentDateOrDateTime<CurrentTimeImpl<CurrentTimeFunctionName>>; -using FunctionUtcTimeStamp = FunctionCurrentDateOrDateTime<UtcTimestampImpl>; - -void register_function_date_time_computation(SimpleFunctionFactory& factory) { - factory.register_function<FunctionAddSeconds>(); - factory.register_function<FunctionAddMinutes>(); - factory.register_function<FunctionAddHours>(); - factory.register_function<FunctionAddDays>(); +void register_function_date_time_computation_v2(SimpleFunctionFactory& factory) { factory.register_function<FunctionAddSecondsV2>(); factory.register_function<FunctionAddMinutesV2>(); factory.register_function<FunctionAddHoursV2>(); factory.register_function<FunctionAddDaysV2>(); - factory.register_function<FunctionAddWeeks>(); - factory.register_function<FunctionAddMonths>(); - factory.register_function<FunctionAddYears>(); - factory.register_function<FunctionAddQuarters>(); factory.register_function<FunctionAddWeeksV2>(); factory.register_function<FunctionAddMonthsV2>(); factory.register_function<FunctionAddYearsV2>(); @@ -275,20 +151,10 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { factory.register_function<FunctionDatetimeV2AddYears>(); factory.register_function<FunctionDatetimeV2AddQuarters>(); - factory.register_function<FunctionSubSeconds>(); - factory.register_function<FunctionSubMinutes>(); - factory.register_function<FunctionSubHours>(); - factory.register_function<FunctionSubDays>(); factory.register_function<FunctionSubSecondsV2>(); factory.register_function<FunctionSubMinutesV2>(); factory.register_function<FunctionSubHoursV2>(); factory.register_function<FunctionSubDaysV2>(); - factory.register_alias("days_sub", "date_sub"); - factory.register_alias("days_sub", "subdate"); - factory.register_function<FunctionSubMonths>(); - factory.register_function<FunctionSubYears>(); - factory.register_function<FunctionSubQuarters>(); - factory.register_function<FunctionSubWeeks>(); factory.register_function<FunctionSubMonthsV2>(); factory.register_function<FunctionSubYearsV2>(); factory.register_function<FunctionSubQuartersV2>(); @@ -303,16 +169,6 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { factory.register_function<FunctionDatetimeV2SubQuarters>(); factory.register_function<FunctionDatetimeV2SubWeeks>(); - factory.register_function<FunctionDateDiff>(); - factory.register_function<FunctionTimeDiff>(); - factory.register_function<FunctionYearsDiff>(); - factory.register_function<FunctionMonthsDiff>(); - factory.register_function<FunctionWeeksDiff>(); - factory.register_function<FunctionDaysDiff>(); - factory.register_function<FunctionHoursDiff>(); - factory.register_function<FunctionMinutesDiff>(); - factory.register_function<FunctionSecondsDiff>(); - #define REGISTER_DATEV2_FUNCTIONS_WITH_TWO_ARGS(NAME, TYPE1, TYPE2) \ factory.register_function<NAME##_##TYPE1##_##TYPE2>(); @@ -336,32 +192,10 @@ void register_function_date_time_computation(SimpleFunctionFactory& factory) { REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2SecondsDiff) REGISTER_ALL_DATEV2_FUNCTIONS_WITH_TWO_ARGS(FunctionDatetimeV2DaysDiff) - factory.register_function<FunctionToYearWeekTwoArgs>(); - factory.register_function<FunctionToWeekTwoArgs>(); factory.register_function<FunctionToYearWeekTwoArgsV2>(); factory.register_function<FunctionToWeekTwoArgsV2>(); factory.register_function<FunctionDatetimeV2ToYearWeekTwoArgs>(); factory.register_function<FunctionDatetimeV2ToWeekTwoArgs>(); - - factory.register_function<FunctionNow>(); - factory.register_function<FunctionCurrentTimestamp>(); - factory.register_function<FunctionLocalTime>(); - factory.register_function<FunctionLocalTimestamp>(); - factory.register_function<FunctionNowWithPrecision>(); - factory.register_function<FunctionCurrentTimestampWithPrecision>(); - factory.register_function<FunctionLocalTimeWithPrecision>(); - factory.register_function<FunctionLocalTimestampWithPrecision>(); - factory.register_function(CurrentDateFunctionName::name, - &createCurrentDateFunctionBuilderFunction); - factory.register_function(CurDateFunctionName::name, &createCurDateFunctionBuilderFunction); - factory.register_function<FunctionCurTime>(); - factory.register_function<FunctionCurrentTime>(); - factory.register_function<FunctionUtcTimeStamp>(); - - // alias - factory.register_alias("days_add", "date_add"); - factory.register_alias("days_add", "adddate"); - factory.register_alias("months_add", "add_months"); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 110d410fae..0839870a67 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -61,6 +61,7 @@ void register_function_in(SimpleFunctionFactory& factory); void register_function_if(SimpleFunctionFactory& factory); void register_function_nullif(SimpleFunctionFactory& factory); void register_function_date_time_computation(SimpleFunctionFactory& factory); +void register_function_date_time_computation_v2(SimpleFunctionFactory& factory); void register_function_timestamp(SimpleFunctionFactory& factory); void register_function_utility(SimpleFunctionFactory& factory); void register_function_json(SimpleFunctionFactory& factory); @@ -199,6 +200,7 @@ public: register_function_if(instance); register_function_nullif(instance); register_function_date_time_computation(instance); + register_function_date_time_computation_v2(instance); register_function_timestamp(instance); register_function_utility(instance); register_function_date_time_to_string(instance); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org