This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bc22802b6a4 [test](ut) add cases about hash join (#49803) bc22802b6a4 is described below commit bc22802b6a43a6d62ac7ad1ed5000e2222976fca Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Tue Apr 15 09:28:13 2025 +0800 [test](ut) add cases about hash join (#49803) --- be/src/pipeline/exec/hashjoin_build_sink.h | 8 - be/src/pipeline/exec/hashjoin_probe_operator.cpp | 5 +- be/src/pipeline/exec/hashjoin_probe_operator.h | 5 +- be/src/runtime/runtime_state.h | 2 +- be/src/util/stack_util.cpp | 2 +- .../pipeline/operator/hash_join_test_helper.cpp | 630 ++++++++++++ be/test/pipeline/operator/hash_join_test_helper.h | 56 + .../pipeline/operator/hashjoin_build_sink_test.cpp | 362 +++++++ .../operator/hashjoin_probe_operator_test.cpp | 1074 ++++++++++++++++++++ be/test/pipeline/operator/join_test_helper.cpp | 43 + be/test/pipeline/operator/join_test_helper.h | 53 + be/test/testutil/mock/mock_operators.h | 7 + be/test/testutil/mock/mock_runtime_state.h | 5 + be/test/testutil/run_all_tests.cpp | 22 +- 14 files changed, 2258 insertions(+), 16 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index c6b6cdff029..ff477cd1105 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -42,14 +42,6 @@ public: void init_short_circuit_for_probe(); bool build_unique() const; - std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; } - - void add_hash_buckets_info(const std::string& info) const { - _profile->add_info_string("HashTableBuckets", info); - } - void add_hash_buckets_filled_info(const std::string& info) const { - _profile->add_info_string("HashTableFilledBuckets", info); - } Dependency* finishdependency() override { return _finish_dependency.get(); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index ab52f01fa5b..6138369fe23 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -529,11 +529,14 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { } for (auto conjunct : _other_join_conjuncts) { - conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids); + conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids); } for (auto& conjunct : _mark_join_conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc)); + if (_have_other_join_conjunct) { + conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids); + } } RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc())); diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 3914fc0d58d..d1aa2280454 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -155,7 +155,8 @@ public: bool need_finalize_variant_column() const { return _need_finalize_variant_column; } bool is_lazy_materialized_column(int column_id) const { - return _have_other_join_conjunct && !_other_conjunct_refer_column_ids.contains(column_id); + return _have_other_join_conjunct && + !_should_not_lazy_materialized_column_ids.contains(column_id); } private: @@ -185,7 +186,7 @@ private: std::vector<bool> _left_output_slot_flags; std::vector<bool> _right_output_slot_flags; bool _need_finalize_variant_column = false; - std::set<int> _other_conjunct_refer_column_ids; + std::set<int> _should_not_lazy_materialized_column_ids; std::vector<std::string> _right_table_column_names; const std::vector<TExpr> _partition_exprs; }; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 5af2f643f38..4037c22d31a 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -490,7 +490,7 @@ public: : 0; } - bool enable_share_hash_table_for_broadcast_join() const { + MOCK_FUNCTION bool enable_share_hash_table_for_broadcast_join() const { return _query_options.__isset.enable_share_hash_table_for_broadcast_join && _query_options.enable_share_hash_table_for_broadcast_join; } diff --git a/be/src/util/stack_util.cpp b/be/src/util/stack_util.cpp index d84c4eb4f21..4adeb7b5548 100644 --- a/be/src/util/stack_util.cpp +++ b/be/src/util/stack_util.cpp @@ -45,7 +45,7 @@ std::string get_stack_trace(int start_pointers_index, std::string dwarf_location dwarf_location_info_mode = config::dwarf_location_info_mode; } #ifdef BE_TEST - auto tool = std::string {"libunwind"}; + auto tool = std::string {"boost"}; #else auto tool = config::get_stack_trace_tool; #endif diff --git a/be/test/pipeline/operator/hash_join_test_helper.cpp b/be/test/pipeline/operator/hash_join_test_helper.cpp new file mode 100644 index 00000000000..532e2c55c70 --- /dev/null +++ b/be/test/pipeline/operator/hash_join_test_helper.cpp @@ -0,0 +1,630 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hash_join_test_helper.h" + +#include <gen_cpp/Exprs_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <glog/logging.h> + +#include <cstddef> +#include <iostream> +#include <sstream> +#include <unordered_map> +#include <vector> + +#include "testutil/creators.h" +#include "testutil/mock/mock_operators.h" + +namespace doris::pipeline { +TPlanNode HashJoinTestHelper::create_test_plan_node( + const TJoinOp::type& join_op_type, const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_keys_nullable, const std::vector<bool>& right_keys_nullable, + const bool is_mark_join, const size_t mark_join_conjuncts_size, const bool null_safe_equal, + const bool has_other_join_conjuncts) { + DCHECK_EQ(key_types.size(), left_keys_nullable.size()); + DCHECK_EQ(key_types.size(), right_keys_nullable.size()); + DCHECK_GE(key_types.size(), mark_join_conjuncts_size); + + TPlanNode tnode; + tnode.node_id = 0; + tnode.node_type = TPlanNodeType::HASH_JOIN_NODE; + tnode.num_children = 2; + tnode.__set_hash_join_node(THashJoinNode()); + tnode.hash_join_node.join_op = join_op_type; + tnode.limit = -1; + tnode.hash_join_node.__set_is_mark(is_mark_join); + + size_t mark_join_conjuncts_count = 0; + + doris::TSlotId col_unique_id = 0; + for (size_t i = 0; i != key_types.size(); ++i) { + const auto key_type = key_types[i]; + const auto left_key_nullable = left_keys_nullable[i]; + const auto right_key_nullable = right_keys_nullable[i]; + TEqJoinCondition eq_cond; + eq_cond.left = TExpr(); + eq_cond.right = TExpr(); + if (null_safe_equal) { + eq_cond.opcode = TExprOpcode::EQ_FOR_NULL; + } else { + eq_cond.opcode = TExprOpcode::EQ; + } + eq_cond.__isset.opcode = true; + TTypeNode type_node; + type_node.type = TTypeNodeType::SCALAR; + type_node.scalar_type.type = key_type; + type_node.__isset.scalar_type = true; + + if (key_type == TPrimitiveType::CHAR || key_type == TPrimitiveType::VARCHAR || + key_type == TPrimitiveType::STRING) { + type_node.scalar_type.__set_len(OLAP_STRING_MAX_LENGTH); + } else if (key_type == TPrimitiveType::DECIMAL128I || + key_type == TPrimitiveType::DECIMAL256 || + key_type == TPrimitiveType::DECIMAL32 || key_type == TPrimitiveType::DECIMAL64 || + key_type == TPrimitiveType::DECIMALV2) { + type_node.scalar_type.__set_precision(18); + type_node.scalar_type.__set_scale(18); + } else if (key_type == TPrimitiveType::DATETIMEV2) { + type_node.scalar_type.__set_scale(6); + } else if (key_type == TPrimitiveType::TIMEV2) { + type_node.scalar_type.__set_scale(0); + } + + eq_cond.left.nodes.emplace_back(); + eq_cond.left.nodes[0].type.types.emplace_back(type_node); + eq_cond.left.nodes[0].node_type = TExprNodeType::SLOT_REF; + eq_cond.left.nodes[0].__set_is_nullable(left_key_nullable); + eq_cond.left.nodes[0].num_children = 0; + eq_cond.left.nodes[0].slot_ref.col_unique_id = col_unique_id++; + eq_cond.left.nodes[0].__isset.slot_ref = true; + + eq_cond.right.nodes.emplace_back(); + eq_cond.right.nodes[0].type.types.emplace_back(type_node); + eq_cond.right.nodes[0].node_type = TExprNodeType::SLOT_REF; + eq_cond.right.nodes[0].__set_is_nullable(right_key_nullable); + eq_cond.right.nodes[0].num_children = 0; + eq_cond.right.nodes[0].slot_ref.col_unique_id = col_unique_id++; + eq_cond.right.nodes[0].__isset.slot_ref = true; + + if (mark_join_conjuncts_count < mark_join_conjuncts_size) { + TExpr mark_join_cond; + mark_join_cond.nodes.emplace_back(); + mark_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED; + mark_join_cond.nodes[0].opcode = TExprOpcode::EQ; + mark_join_cond.nodes[0].type.types.emplace_back(); + mark_join_cond.nodes[0].type.types[0].scalar_type.type = TPrimitiveType::BOOLEAN; + mark_join_cond.nodes[0].type.types[0].__isset.scalar_type = true; + mark_join_cond.nodes[0].__set_is_nullable(true); + mark_join_cond.nodes[0].num_children = 2; + + mark_join_cond.nodes[0].fn.name.function_name = "eq"; + mark_join_cond.nodes[0].__isset.fn = true; + + mark_join_cond.nodes.emplace_back(); + mark_join_cond.nodes[1] = eq_cond.left.nodes[0]; + mark_join_cond.nodes.emplace_back(); + mark_join_cond.nodes[2] = eq_cond.right.nodes[0]; + tnode.hash_join_node.mark_join_conjuncts.emplace_back(mark_join_cond); + tnode.hash_join_node.__isset.mark_join_conjuncts = true; + + ++mark_join_conjuncts_count; + } else { + tnode.hash_join_node.eq_join_conjuncts.push_back(eq_cond); + } + } + + if (has_other_join_conjuncts) { + TExpr other_join_cond; + other_join_cond.nodes.emplace_back(); + other_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED; + other_join_cond.nodes[0].opcode = TExprOpcode::EQ; + other_join_cond.nodes[0].type.types.emplace_back(); + other_join_cond.nodes[0].type.types[0].scalar_type.type = TPrimitiveType::BOOLEAN; + other_join_cond.nodes[0].type.types[0].__isset.scalar_type = true; + other_join_cond.nodes[0].__set_is_nullable(true); + other_join_cond.nodes[0].num_children = 2; + + other_join_cond.nodes[0].fn.name.function_name = "gt"; + other_join_cond.nodes[0].__isset.fn = true; + + other_join_cond.nodes.emplace_back(); + other_join_cond.nodes[1].node_type = TExprNodeType::SLOT_REF; + other_join_cond.nodes[1].type.types.emplace_back(); + other_join_cond.nodes[1].type.types[0].scalar_type.type = TPrimitiveType::INT; + other_join_cond.nodes[1].__set_is_nullable(true); + other_join_cond.nodes[1].type.types[0].__isset.scalar_type = true; + other_join_cond.nodes[1].num_children = 0; + other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++; + other_join_cond.nodes[1].__isset.slot_ref = true; + + other_join_cond.nodes.emplace_back(); + other_join_cond.nodes[2].node_type = TExprNodeType::INT_LITERAL; + other_join_cond.nodes[2].type.types.emplace_back(); + other_join_cond.nodes[2].type.types[0].scalar_type.type = TPrimitiveType::INT; + other_join_cond.nodes[2].type.types[0].__isset.scalar_type = true; + other_join_cond.nodes[2].num_children = 0; + other_join_cond.nodes[2].int_literal.value = 100; + other_join_cond.nodes[2].__isset.int_literal = true; + + tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond); + + other_join_cond.nodes[2].int_literal.value = 50; + other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++; + tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond); + tnode.hash_join_node.__isset.other_join_conjuncts = true; + } + + tnode.row_tuples.push_back(0); + tnode.row_tuples.push_back(1); + tnode.nullable_tuples.push_back(false); + tnode.nullable_tuples.push_back(false); + tnode.__isset.hash_join_node = true; + + tnode.hash_join_node.vintermediate_tuple_id_list.emplace_back(2); + tnode.hash_join_node.__isset.vintermediate_tuple_id_list = true; + + auto desc_table = create_test_table_descriptor(tnode); + auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl); + DCHECK(!desc_table.slotDescriptors.empty()); + + runtime_state->set_desc_tbl(desc_tbl); + + return tnode; +} + +TDescriptorTable HashJoinTestHelper::create_test_table_descriptor(TPlanNode& tnode) { + TTupleDescriptorBuilder left_tuple_builder, right_tuple_builder, intermediate_tuple_builder, + output_tuple_builder; + + const auto is_left_half_join = + tnode.hash_join_node.join_op == TJoinOp::LEFT_SEMI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN; + const auto is_right_half_join = tnode.hash_join_node.join_op == TJoinOp::RIGHT_ANTI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::RIGHT_SEMI_JOIN; + const auto is_half_join = is_left_half_join || is_right_half_join; + + const auto has_other_join_conjuncts = !tnode.hash_join_node.other_join_conjuncts.empty(); + const auto intermediate_need_output_left_side = + !is_half_join || is_left_half_join || + !tnode.hash_join_node.mark_join_conjuncts.empty() || has_other_join_conjuncts; + const auto need_output_left_side = !is_half_join || is_left_half_join; + + const auto intermediate_need_output_right_side = + !is_half_join || is_right_half_join || + !tnode.hash_join_node.mark_join_conjuncts.empty() || has_other_join_conjuncts; + const auto need_output_right_side = !is_half_join || is_right_half_join; + + const auto other_conjuncts_count = tnode.hash_join_node.other_join_conjuncts.empty() ? 0 : 1; + + const auto keys_count = tnode.hash_join_node.eq_join_conjuncts.size(); + const auto mark_keys_count = tnode.hash_join_node.mark_join_conjuncts.size(); + const auto total_output_slots_count = + (need_output_left_side ? keys_count + mark_keys_count + other_conjuncts_count : 0) + + (need_output_right_side ? keys_count + mark_keys_count + other_conjuncts_count : 0); + const auto total_intermediate_output_slots_count = + (intermediate_need_output_left_side + ? keys_count + mark_keys_count + other_conjuncts_count + : 0) + + (intermediate_need_output_right_side + ? keys_count + mark_keys_count + other_conjuncts_count + : 0); + std::vector<TSlotDescriptor> intermediate_slots(total_intermediate_output_slots_count); + std::vector<TSlotDescriptor> output_slots(total_output_slots_count); + const auto intermediate_keys_offset = + intermediate_need_output_left_side + ? keys_count + mark_keys_count + other_conjuncts_count + : 0; + const auto keys_offset = + need_output_left_side ? keys_count + mark_keys_count + other_conjuncts_count : 0; + + size_t slots_count = 0; + doris::TSlotId col_unique_id = 200; + std::vector<doris::TSlotId> hash_output_unique_ids; + std::vector<TExpr> projection_exprs(total_output_slots_count); + for (auto& eq_cond : tnode.hash_join_node.mark_join_conjuncts) { + auto left_type = thrift_to_type(eq_cond.nodes[1].type.types[0].scalar_type.type); + auto slot = TSlotDescriptorBuilder() + .type(left_type) + .nullable(eq_cond.nodes[1].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = eq_cond.nodes[1].type.types[0].scalar_type; + slot.col_unique_id = eq_cond.nodes[1].slot_ref.col_unique_id; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + + intermediate_slots[slots_count] = slot; + if (need_output_left_side) { + auto& expr = projection_exprs[slots_count]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count] = slot; + } + + slot.col_unique_id = col_unique_id++; + left_tuple_builder.add_slot(slot); + hash_output_unique_ids.emplace_back(slot.col_unique_id); + + auto right_type = thrift_to_type(eq_cond.nodes[2].type.types[0].scalar_type.type); + + slot = TSlotDescriptorBuilder() + .type(right_type) + .nullable(eq_cond.nodes[2].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = eq_cond.nodes[2].type.types[0].scalar_type; + slot.col_unique_id = eq_cond.nodes[2].slot_ref.col_unique_id; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + intermediate_slots[slots_count + intermediate_keys_offset] = slot; + if (need_output_right_side) { + auto& expr = projection_exprs[slots_count + keys_offset]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count + keys_offset] = slot; + } + slot.col_unique_id = col_unique_id++; + right_tuple_builder.add_slot(slot); + hash_output_unique_ids.emplace_back(slot.col_unique_id); + ++slots_count; + } + + for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) { + auto left_type = thrift_to_type(eq_cond.left.nodes[0].type.types[0].scalar_type.type); + auto slot = TSlotDescriptorBuilder() + .type(left_type) + .nullable(eq_cond.left.nodes[0].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = eq_cond.left.nodes[0].type.types[0].scalar_type; + slot.col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id; + left_tuple_builder.add_slot(slot); + if (intermediate_need_output_left_side || need_output_left_side) { + hash_output_unique_ids.emplace_back(slot.col_unique_id); + } + + slot.col_unique_id = col_unique_id++; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + + if (intermediate_need_output_left_side) { + intermediate_slots[slots_count] = slot; + } + + if (need_output_left_side) { + auto& expr = projection_exprs[slots_count]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count] = slot; + } + + auto right_type = thrift_to_type(eq_cond.right.nodes[0].type.types[0].scalar_type.type); + + slot = TSlotDescriptorBuilder() + .type(right_type) + .nullable(eq_cond.right.nodes[0].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = eq_cond.right.nodes[0].type.types[0].scalar_type; + slot.col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id; + right_tuple_builder.add_slot(slot); + if (need_output_right_side || intermediate_need_output_right_side) { + hash_output_unique_ids.emplace_back(slot.col_unique_id); + } + slot.col_unique_id = col_unique_id++; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + + if (intermediate_need_output_right_side) { + intermediate_slots[slots_count + intermediate_keys_offset] = slot; + } + if (need_output_right_side) { + auto& expr = projection_exprs[slots_count + keys_offset]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count + keys_offset] = slot; + } + ++slots_count; + } + + if (tnode.hash_join_node.other_join_conjuncts.size() == 2) { + auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0]; + auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1]; + + auto left_type = thrift_to_type(left_cond.nodes[1].type.types[0].scalar_type.type); + auto slot = TSlotDescriptorBuilder() + .type(left_type) + .nullable(left_cond.nodes[1].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = left_cond.nodes[1].type.types[0].scalar_type; + + slot.col_unique_id = col_unique_id++; + left_tuple_builder.add_slot(slot); + hash_output_unique_ids.emplace_back(slot.col_unique_id); + + slot.col_unique_id = left_cond.nodes[1].slot_ref.col_unique_id; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + + intermediate_slots[slots_count] = slot; + + if (need_output_left_side) { + auto& expr = projection_exprs[slots_count]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count] = slot; + } + + auto right_type = thrift_to_type(right_cond.nodes[1].type.types[0].scalar_type.type); + slot = TSlotDescriptorBuilder() + .type(right_type) + .nullable(right_cond.nodes[1].is_nullable) + .build(); + slot.slotType.types[0].scalar_type = right_cond.nodes[1].type.types[0].scalar_type; + + slot.col_unique_id = col_unique_id++; + right_tuple_builder.add_slot(slot); + hash_output_unique_ids.emplace_back(slot.col_unique_id); + + slot.col_unique_id = right_cond.nodes[1].slot_ref.col_unique_id; + if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN || + tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) { + slot.nullIndicatorByte = 0; + slot.nullIndicatorBit = 0; + } + + intermediate_slots[slots_count + intermediate_keys_offset] = slot; + + if (need_output_right_side) { + auto& expr = projection_exprs[slots_count + keys_offset]; + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(slot.nullIndicatorBit == 0 && slot.nullIndicatorByte == 0); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_slots[slots_count + keys_offset] = slot; + } + + ++slots_count; + } + + TDescriptorTableBuilder builder; + + left_tuple_builder.build(&builder); + right_tuple_builder.build(&builder); + + for (auto& slot : intermediate_slots) { + intermediate_tuple_builder.add_slot(slot); + } + + for (auto& slot : output_slots) { + output_tuple_builder.add_slot(slot); + } + + if (tnode.hash_join_node.is_mark) { + auto type = thrift_to_type(TPrimitiveType::BOOLEAN); + auto slot = TSlotDescriptorBuilder().type(type).nullable(true).build(); + slot.col_unique_id = col_unique_id++; + intermediate_tuple_builder.add_slot(slot); + + auto& expr = projection_exprs.emplace_back(); + auto& node = expr.nodes.emplace_back(); + node.node_type = TExprNodeType::SLOT_REF; + node.__set_is_nullable(true); + node.num_children = 0; + node.__isset.slot_ref = true; + node.slot_ref.col_unique_id = slot.col_unique_id; + node.slot_ref.slot_id = slot.id; + node.slot_ref.__isset.col_unique_id = true; + node.type.types.emplace_back(); + node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type; + node.type.types[0].__isset.scalar_type = true; + + slot.col_unique_id = col_unique_id++; + output_tuple_builder.add_slot(slot); + } + + tnode.projections = projection_exprs; + tnode.__isset.projections = true; + + intermediate_tuple_builder.build(&builder); + + output_tuple_builder.build(&builder); + tnode.__set_output_tuple_id(3); + tnode.hash_join_node.__set_voutput_tuple_id(3); + + auto table_desc = builder.desc_tbl(); + + std::unordered_map<int32_t, TTupleId> slots_map; + for (auto& slot : table_desc.slotDescriptors) { + slots_map[slot.col_unique_id] = slot.id; + } + + for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) { + col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id; + eq_cond.left.nodes[0].slot_ref.slot_id = slots_map[col_unique_id]; + + col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id; + eq_cond.right.nodes[0].slot_ref.slot_id = slots_map[col_unique_id]; + } + + for (size_t i = 0; i != mark_keys_count; ++i) { + auto& slot_ref = tnode.hash_join_node.mark_join_conjuncts[i].nodes[1].slot_ref; + slot_ref.slot_id = slots_map[slot_ref.col_unique_id]; + + auto& slot_ref_right = tnode.hash_join_node.mark_join_conjuncts[i].nodes[2].slot_ref; + slot_ref_right.slot_id = slots_map[slot_ref_right.col_unique_id]; + } + + for (auto& expr : tnode.projections) { + for (auto& node : expr.nodes) { + if (node.node_type == TExprNodeType::SLOT_REF) { + auto col_unique_id = node.slot_ref.col_unique_id; + node.slot_ref.slot_id = slots_map[col_unique_id]; + } + } + } + + if (tnode.hash_join_node.other_join_conjuncts.size() == 2) { + auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0]; + auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1]; + + left_cond.nodes[1].slot_ref.slot_id = slots_map[left_cond.nodes[1].slot_ref.col_unique_id]; + right_cond.nodes[1].slot_ref.slot_id = + slots_map[right_cond.nodes[1].slot_ref.col_unique_id]; + } + + for (auto uid : hash_output_unique_ids) { + tnode.hash_join_node.hash_output_slot_ids.emplace_back(slots_map[uid]); + } + tnode.hash_join_node.__isset.hash_output_slot_ids = true; + + return table_desc; +} + +void HashJoinTestHelper::add_mark_join_conjuncts(TPlanNode& join_node, + std::vector<TExpr>& conjuncts) { + EXPECT_TRUE(join_node.__isset.hash_join_node); + + join_node.hash_join_node.__isset.mark_join_conjuncts = true; + join_node.hash_join_node.mark_join_conjuncts.insert( + join_node.hash_join_node.mark_join_conjuncts.end(), conjuncts.begin(), conjuncts.end()); +} + +void HashJoinTestHelper::add_other_join_conjuncts(TPlanNode& join_node, + std::vector<TExpr>& conjuncts) { + EXPECT_TRUE(join_node.__isset.hash_join_node); + + join_node.hash_join_node.__isset.other_join_conjuncts = true; + join_node.hash_join_node.other_join_conjuncts.insert( + join_node.hash_join_node.other_join_conjuncts.end(), conjuncts.begin(), + conjuncts.end()); +} + +std::pair<std::shared_ptr<HashJoinProbeOperatorX>, std::shared_ptr<HashJoinBuildSinkOperatorX>> +HashJoinTestHelper::create_operators(const TPlanNode& tnode) { + auto sink_operator = std::make_shared<HashJoinBuildSinkOperatorX>(obj_pool.get(), 0, 0, tnode, + runtime_state->desc_tbl()); + + auto probe_operator = std::make_shared<HashJoinProbeOperatorX>(obj_pool.get(), tnode, 0, + runtime_state->desc_tbl()); + + auto child_operator = std::make_shared<MockSourceOperator>(); + auto probe_side_source_operator = std::make_shared<MockSourceOperator>(); + auto probe_side_sink_operator = std::make_shared<MockSinkOperator>(); + + RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false}); + child_operator->_row_descriptor = row_desc; + + RowDescriptor probe_side_row_desc(runtime_state->desc_tbl(), {0}, {false}); + probe_side_source_operator->_row_descriptor = probe_side_row_desc; + + EXPECT_TRUE(sink_operator->set_child(child_operator)); + EXPECT_TRUE(probe_operator->set_child(probe_side_source_operator)); + EXPECT_TRUE(probe_operator->set_child(child_operator)); + + // Setup task and state + std::map<int, + std::pair<std::shared_ptr<BasicSharedState>, std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + auto [source_pipeline, _] = generate_sort_pipeline(probe_operator, probe_side_sink_operator, + sink_operator, child_operator); + pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, runtime_state.get(), nullptr, + nullptr, shared_state_map, 0); + + return {std::move(probe_operator), std::move(sink_operator)}; +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/hash_join_test_helper.h b/be/test/pipeline/operator/hash_join_test_helper.h new file mode 100644 index 00000000000..355368a37d0 --- /dev/null +++ b/be/test/pipeline/operator/hash_join_test_helper.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/Exprs_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include "join_test_helper.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/hashjoin_probe_operator.h" + +namespace doris::pipeline { +class HashJoinTestHelper : public JoinTestHelper { +public: + TPlanNode create_test_plan_node(const TJoinOp::type& join_op_type, + const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_keys_nullable, + const std::vector<bool>& right_keys_nullable, + const bool is_mark_join = false, + const size_t mark_join_conjuncts_size = 0, + const bool null_safe_equal = false, + const bool has_other_join_conjuncts = false); + + TDescriptorTable create_test_table_descriptor(TPlanNode& tnode); + + void add_mark_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>& conjuncts); + void add_other_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>& conjuncts); + + std::pair<std::shared_ptr<HashJoinProbeOperatorX>, std::shared_ptr<HashJoinBuildSinkOperatorX>> + create_operators(const TPlanNode& tnode); + +protected: + std::vector<TExprNode*> _left_slots, _right_slots; +}; + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/hashjoin_build_sink_test.cpp b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp new file mode 100644 index 00000000000..7d60834b138 --- /dev/null +++ b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/hashjoin_build_sink.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> +#include <vector> + +#include "common/config.h" +#include "hash_join_test_helper.h" +#include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "testutil/mock/mock_runtime_state.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::pipeline { + +class HashJoinBuildSinkTest : public testing::Test { +public: + void SetUp() override { _helper.SetUp(); } + void TearDown() override { _helper.TearDown(); } + + template <typename Func> + void run_test_block(Func test_block) { + auto testing_join_ops = {TJoinOp::INNER_JOIN, + TJoinOp::LEFT_OUTER_JOIN, + TJoinOp::RIGHT_OUTER_JOIN, + TJoinOp::FULL_OUTER_JOIN, + TJoinOp::LEFT_SEMI_JOIN, + TJoinOp::RIGHT_SEMI_JOIN, + TJoinOp::LEFT_ANTI_JOIN, + TJoinOp::RIGHT_ANTI_JOIN, + TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, + TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN}; + auto testing_key_types = { + TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT, TPrimitiveType::SMALLINT, + TPrimitiveType::INT, TPrimitiveType::BIGINT, TPrimitiveType::FLOAT, + TPrimitiveType::DOUBLE, TPrimitiveType::DATE, TPrimitiveType::DATETIME, + TPrimitiveType::BINARY, TPrimitiveType::CHAR, TPrimitiveType::LARGEINT, + TPrimitiveType::VARCHAR, TPrimitiveType::DECIMALV2, TPrimitiveType::TIME, + TPrimitiveType::STRING, TPrimitiveType::DATEV2, TPrimitiveType::DATETIMEV2, + TPrimitiveType::TIMEV2, TPrimitiveType::DECIMAL32, TPrimitiveType::DECIMAL64, + TPrimitiveType::DECIMAL128I, TPrimitiveType::DECIMAL256, TPrimitiveType::IPV4, + TPrimitiveType::IPV6}; + + for (const auto& op_type : testing_join_ops) { + for (const auto key_type : testing_key_types) { + for (auto left_nullable : {true, false}) { + for (auto right_nullable : {true, false}) { + test_block(op_type, {key_type}, {left_nullable}, {right_nullable}); + } + } + + for (const auto key_type2 : testing_key_types) { + for (auto left_nullable : {true, false}) { + for (auto right_nullable : {true, false}) { + test_block(op_type, {key_type, key_type2}, + {left_nullable, right_nullable}, + {right_nullable, left_nullable}); + } + } + } + } + } + } + +protected: + HashJoinTestHelper _helper; +}; + +TEST_F(HashJoinBuildSinkTest, Init) { + auto test_block = [&](TJoinOp::type op_type, const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_nullables, + const std::vector<bool>& right_nullables) { + auto tnode = + _helper.create_test_plan_node(op_type, key_types, left_nullables, right_nullables); + auto [probe_operator, sink_operator] = _helper.create_operators(tnode); + ASSERT_TRUE(probe_operator); + ASSERT_TRUE(sink_operator); + + auto runtime_state = std::make_unique<MockRuntimeState>(); + runtime_state->_query_ctx = _helper.query_ctx.get(); + runtime_state->_query_id = _helper.query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + runtime_state->set_max_operator_id(-100); + runtime_state->set_desc_tbl(_helper.desc_tbl); + + auto st = sink_operator->init(tnode, runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + ASSERT_TRUE(shared_state); + + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = shared_state_map, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + st = probe_operator->init(tnode, runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + st = probe_operator->prepare(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + LocalStateInfo info2 {.parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = shared_state_map, + .task_idx = 0}; + + st = probe_operator->setup_local_state(runtime_state.get(), info2); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* probe_local_state = runtime_state->get_local_state(probe_operator->operator_id()); + ASSERT_TRUE(probe_local_state); + + st = probe_local_state->open(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>( + runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state); + + st = local_state->open(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + ASSERT_GT(sink_operator->get_memory_usage_debug_str(runtime_state.get()).size(), 0); + + st = local_state->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = sink_operator->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = probe_operator->close(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = probe_local_state->close(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + }; + + run_test_block(test_block); +} + +TEST_F(HashJoinBuildSinkTest, Sink) { + auto test_block = [&](TJoinOp::type op_type, const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_nullables, + const std::vector<bool>& right_nullables) { + auto tnode = + _helper.create_test_plan_node(op_type, key_types, left_nullables, right_nullables); + auto [probe_operator, sink_operator] = _helper.create_operators(tnode); + ASSERT_TRUE(probe_operator); + ASSERT_TRUE(sink_operator); + + auto runtime_state = std::make_unique<MockRuntimeState>(); + runtime_state->_query_ctx = _helper.query_ctx.get(); + runtime_state->_query_id = _helper.query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + runtime_state->set_max_operator_id(-100); + runtime_state->set_desc_tbl(_helper.desc_tbl); + + auto st = sink_operator->init(TDataSink()); + ASSERT_FALSE(st.ok()); + + st = sink_operator->init(tnode, runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + ASSERT_TRUE(shared_state); + + shared_state->create_source_dependency(sink_operator->operator_id(), tnode.node_id, + "HashJoinSinkTestDep"); + + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = shared_state_map, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>( + runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state); + + st = local_state->open(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), false), 0); + + const auto& row_desc = sink_operator->child()->row_desc(); + vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0); + + auto mutable_block = vectorized::MutableBlock(block.clone_empty()); + for (auto& col : mutable_block.mutable_columns()) { + col->insert_default(); + if (col->is_nullable()) { + auto& nullable_column = assert_cast<vectorized::ColumnNullable&>(*col); + nullable_column.insert_not_null_elements(1); + } else { + col->insert_default(); + } + } + + auto block2 = mutable_block.to_block(); + st = sink_operator->sink(runtime_state.get(), &block2, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(), true), 0); + st = sink_operator->sink(runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), true), 0); + + ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0); + + st = local_state->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = sink_operator->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + }; + + run_test_block(test_block); +} + +TEST_F(HashJoinBuildSinkTest, Terminate) { + auto test_block = [&](TJoinOp::type op_type, const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_nullables, + const std::vector<bool>& right_nullables) { + auto tnode = + _helper.create_test_plan_node(op_type, key_types, left_nullables, right_nullables); + auto [probe_operator, sink_operator] = _helper.create_operators(tnode); + ASSERT_TRUE(probe_operator); + ASSERT_TRUE(sink_operator); + + auto runtime_state = std::make_unique<MockRuntimeState>(); + runtime_state->_query_ctx = _helper.query_ctx.get(); + runtime_state->_query_id = _helper.query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + runtime_state->set_max_operator_id(-100); + runtime_state->set_desc_tbl(_helper.desc_tbl); + + auto st = sink_operator->init(TDataSink()); + ASSERT_FALSE(st.ok()); + + st = sink_operator->init(tnode, runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_state = sink_operator->create_shared_state(); + ASSERT_TRUE(shared_state); + + shared_state->create_source_dependency(sink_operator->operator_id(), tnode.node_id, + "HashJoinSinkTestDep"); + + std::map<int, std::pair<std::shared_ptr<BasicSharedState>, + std::vector<std::shared_ptr<Dependency>>>> + shared_state_map; + LocalSinkStateInfo info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = shared_state_map, + .tsink = TDataSink()}; + st = sink_operator->setup_local_state(runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>( + runtime_state->get_sink_local_state()); + ASSERT_TRUE(local_state); + + st = local_state->open(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), false), 0); + + const auto& row_desc = sink_operator->child()->row_desc(); + vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0); + + auto mutable_block = vectorized::MutableBlock(block.clone_empty()); + for (auto& col : mutable_block.mutable_columns()) { + col->insert_default(); + if (col->is_nullable()) { + auto& nullable_column = assert_cast<vectorized::ColumnNullable&>(*col); + nullable_column.insert_not_null_elements(1); + } else { + col->insert_default(); + } + } + + auto block2 = mutable_block.to_block(); + st = sink_operator->sink(runtime_state.get(), &block2, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->terminate(runtime_state.get()); + ASSERT_TRUE(st.ok()) << "terminate failed: " << st.to_string(); + + ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(), true), 0); + st = sink_operator->sink(runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), true), 0); + + ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0); + + st = local_state->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + st = sink_operator->close(runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + }; + + run_test_block(test_block); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp new file mode 100644 index 00000000000..7c5fec3116c --- /dev/null +++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp @@ -0,0 +1,1074 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Types_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <cassert> +#include <cstddef> +#include <memory> +#include <type_traits> +#include <utility> +#include <vector> + +#include "hash_join_test_helper.h" +#include "pipeline/dependency.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/operator.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_operators.h" +#include "testutil/mock/mock_runtime_state.h" +#include "vec/common/assert_cast.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/core/field.h" +#include "vec/core/sort_block.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" + +namespace doris::pipeline { + +using namespace vectorized; + +class HashJoinProbeOperatorTest : public testing::Test { +public: + void SetUp() override { _helper.SetUp(); } + void TearDown() override { _helper.TearDown(); } + + template <typename T> + void check_column_value(const IColumn& column, const size_t index, const T& value) { + StringRef data; + if (column.is_nullable()) { + const auto& nullable_column = assert_cast<const ColumnNullable&>(column); + EXPECT_FALSE(nullable_column.is_null_at(index)); + auto nested_column = nullable_column.get_nested_column_ptr(); + data = nested_column->get_data_at(index); + } else { + data = column.get_data_at(index); + } + + if constexpr (std::is_same_v<std::string, T>) { + EXPECT_EQ(data.to_string(), value); + } else if constexpr (std::is_same_v<StringRef, T>) { + EXPECT_EQ(data.to_string(), value.to_string()); + } else { + EXPECT_EQ(sizeof(T), data.size); + EXPECT_EQ(memcmp(data.data, &value, sizeof(T)), 0); + } + } + + void check_column_values(const IColumn& column, const std::vector<vectorized::Field>& values, + std::source_location loc = std::source_location::current()) { + for (size_t i = 0; i != values.size(); ++i) { + vectorized::Field value; + column.get(i, value); + ASSERT_EQ(value.get_type(), values[i].get_type()) + << "row: " << i << " type not match at: " << loc.file_name() << ":" + << loc.line(); + ASSERT_TRUE(value == values[i]) + << "row: " << i << " value not match at: " << loc.file_name() << ":" + << loc.line(); + } + } + + Block sort_block_by_columns(Block& block, const std::vector<size_t>& sort_columns = {}) { + SortDescription sort_description; + for (auto column : sort_columns) { + sort_description.emplace_back(column, 1, 1); + } + + if (sort_description.empty()) { + for (size_t i = 0; i != block.columns(); ++i) { + sort_description.emplace_back(i, 1, 1); + } + } + + auto sorted_block = block.clone_empty(); + sort_block(block, sorted_block, sort_description); + return sorted_block; + } + + struct JoinParams { + TJoinOp::type join_op_type {TJoinOp::INNER_JOIN}; + bool is_mark_join {false}; + size_t mark_join_conjuncts_size {0}; + bool is_broadcast_join {false}; + bool null_safe_equal {false}; + bool has_other_join_conjuncts {false}; + }; + + // NOLINTNEXTLINE(readability-function-*) + void run_test(const JoinParams& join_params, const std::vector<TPrimitiveType::type>& key_types, + const std::vector<bool>& left_keys_nullable, + const std::vector<bool>& right_keys_nullable, std::vector<Block>& build_blocks, + std::vector<Block>& probe_blocks, Block& output_block) { + auto tnode = _helper.create_test_plan_node( + join_params.join_op_type, key_types, left_keys_nullable, right_keys_nullable, + join_params.is_mark_join, join_params.mark_join_conjuncts_size, + join_params.null_safe_equal, join_params.has_other_join_conjuncts); + + bool should_build_hash_table = true; + if (join_params.is_broadcast_join) { + tnode.hash_join_node.__isset.is_broadcast_join = true; + tnode.hash_join_node.is_broadcast_join = true; + should_build_hash_table = true; + } + + auto [probe_operator, sink_operator] = _helper.create_operators(tnode); + ASSERT_TRUE(probe_operator); + ASSERT_TRUE(sink_operator); + + auto st = probe_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = probe_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + st = sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string(); + + auto shared_runtime_state = std::make_shared<MockRuntimeState>(); + + auto shared_state = sink_operator->create_shared_state(); + if (join_params.is_broadcast_join) { + shared_state = HashJoinSharedState::create_shared(8); + shared_state->create_source_dependencies(8, sink_operator->operator_id(), + sink_operator->node_id(), "HASH_JOIN_PROBE"); + shared_runtime_state->_query_ctx = _helper.runtime_state->_query_ctx; + shared_runtime_state->_query_id = _helper.runtime_state->_query_id; + shared_runtime_state->resize_op_id_to_local_state(-100); + shared_runtime_state->set_max_operator_id(-100); + + LocalSinkStateInfo sink_local_state_info { + .task_idx = 1, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink(), + }; + st = sink_operator->setup_local_state(shared_runtime_state.get(), + sink_local_state_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>( + shared_runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state); + + st = sink_local_state->open(shared_runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + } + + ASSERT_EQ(probe_operator->is_broadcast_join(), join_params.is_broadcast_join); + ASSERT_TRUE(shared_state); + LocalSinkStateInfo sink_local_state_info { + .task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .sender_id = 0, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .tsink = TDataSink(), + }; + + st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_local_state_info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + ASSERT_EQ(sink_operator->should_dry_run(_helper.runtime_state.get()), + join_params.is_broadcast_join && !should_build_hash_table); + + ASSERT_EQ(sink_operator->require_data_distribution(), false); + ASSERT_EQ(probe_operator->require_data_distribution(), false); + ASSERT_FALSE(sink_operator->is_shuffled_operator()); + ASSERT_FALSE(probe_operator->is_shuffled_operator()); + std::cout << "sink distribution: " + << get_exchange_type_name( + sink_operator->required_data_distribution().distribution_type) + << std::endl; + std::cout << "probe distribution: " + << get_exchange_type_name( + probe_operator->required_data_distribution().distribution_type) + << std::endl; + + LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .shared_state_map = {}, + .task_idx = 0}; + st = probe_operator->setup_local_state(_helper.runtime_state.get(), info); + ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string(); + + auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state); + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + auto* probe_local_state = + _helper.runtime_state->get_local_state(probe_operator->operator_id()); + ASSERT_TRUE(probe_local_state); + + st = probe_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string(); + + std::cout << "probe debug string: " << probe_operator->debug_string(0) << std::endl; + + for (auto& build_block : build_blocks) { + st = sink_operator->sink(_helper.runtime_state.get(), &build_block, false); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + } + ASSERT_EQ(sink_local_state->build_unique(), + tnode.hash_join_node.other_join_conjuncts.empty() && + (tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::LEFT_SEMI_JOIN || + tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN)); + + Block empty_block; + + if (join_params.is_broadcast_join) { + st = sink_operator->sink(shared_runtime_state.get(), &empty_block, false); + ASSERT_FALSE(st.ok()); + } + + st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->close(_helper.runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + + if (join_params.is_broadcast_join) { + st = sink_operator->sink(shared_runtime_state.get(), &empty_block, true); + ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string(); + + st = sink_operator->close(shared_runtime_state.get(), st); + ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string(); + } + + auto source_operator = + std::dynamic_pointer_cast<MockSourceOperator>(probe_operator->child()); + ASSERT_TRUE(source_operator); + + bool eos = false; + Block tmp_output_block; + MutableBlock output_block_mutable; + for (auto& probe_block : probe_blocks) { + source_operator->set_block(std::move(probe_block)); + st = probe_operator->get_block_after_projects(_helper.runtime_state.get(), + &tmp_output_block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + + if (tmp_output_block.empty()) { + continue; + } + + st = output_block_mutable.merge(std::move(tmp_output_block)); + tmp_output_block.clear(); + ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string(); + } + + if (eos) { + return; + } + + source_operator->set_eos(); + + st = probe_operator->get_block(_helper.runtime_state.get(), &tmp_output_block, &eos); + ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string(); + ASSERT_TRUE(eos); + + if (!tmp_output_block.empty()) { + st = output_block_mutable.merge(std::move(tmp_output_block)); + ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string(); + } + + output_block = output_block_mutable.to_block(); + } + +protected: + HashJoinTestHelper _helper; +}; + +TEST_F(HashJoinProbeOperatorTest, InnerJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, + {false, true}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 2); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"}); + check_column_values(*sorted_block.get_by_position(2).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"}); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyBuildSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, + {false, true}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyProbeSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({}, {}); + probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, + {false, true}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinOtherConjuncts) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1})); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 0, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::INNER_JOIN, .has_other_join_conjuncts = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 2); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {1, 3}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "c"}); + check_column_values(*sorted_block.get_by_position(2).column, {101, 102}); + check_column_values(*sorted_block.get_by_position(3).column, {1, 3}); + check_column_values(*sorted_block.get_by_position(4).column, {"a", "c"}); + check_column_values(*sorted_block.get_by_position(5).column, {51, 59}); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinNullSafeEqual) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 0}); + probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::INNER_JOIN, .null_safe_equal = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, true}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 3); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {3, 4, 5}); + check_column_values(*sorted_block.get_by_position(1).column, {"c", "d", Null()}); + check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 5}); + check_column_values(*sorted_block.get_by_position(3).column, {"c", "d", Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, CheckSlot) { + auto tnode = _helper.create_test_plan_node(TJoinOp::INNER_JOIN, + {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, false); + + auto [probe_operator, sink_operator] = _helper.create_operators(tnode); + ASSERT_TRUE(probe_operator); + ASSERT_TRUE(sink_operator); + + auto desc_tbl = _helper.runtime_state->desc_tbl(); + desc_tbl._slot_desc_map[4]->_is_nullable = !desc_tbl._slot_desc_map[4]->_is_nullable; + _helper.runtime_state->set_desc_tbl(&desc_tbl); + + auto st = probe_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string(); + + st = probe_operator->prepare(_helper.runtime_state.get()); + ASSERT_FALSE(st.ok()); +} + +TEST_F(HashJoinProbeOperatorTest, InnerJoinBroadcast) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::INNER_JOIN, .is_broadcast_join = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 2); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"}); + check_column_values(*sorted_block.get_by_position(2).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"}); +} + +TEST_F(HashJoinProbeOperatorTest, FullOuterJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(output_block.rows(), 8); + + check_column_values(*sorted_block.get_by_position(0).column, + {1, 3, 4, Null(), Null(), Null(), Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, + {"a", "c", "d", "b", "e", Null(), Null(), Null()}); + check_column_values(*sorted_block.get_by_position(2).column, + {Null(), 3, 4, Null(), Null(), 1, 2, 5}); + check_column_values(*sorted_block.get_by_position(3).column, + {Null(), "c", "d", Null(), Null(), Null(), "b", Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyBuildSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(output_block.rows(), 5); +} + +TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyProbeSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({}, {}); + probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(output_block.rows(), 5); +} + +TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 5); +} + +TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin2) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5, 2, 3}, + {0, 1, 0, 0, 1, 0, 0}); + probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e", "b", "c"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 7); + + check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, + {"a", "b", "c", "c", "d", "b", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, + {Null(), 2, 3, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(3).column, + {Null(), "b", "c", "c", "d", Null(), Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, RightOuterJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 5); + + check_column_values(*sorted_block.get_by_position(0).column, {3, 4, Null(), Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, + {"c", "d", Null(), Null(), Null()}); + check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 1, 2, 5}); + check_column_values(*sorted_block.get_by_position(3).column, {"c", "d", Null(), "b", Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, RightOuterJoinEmptyBuildSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, RightOuterJoin2) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5, 2, 3}, + {0, 1, 0, 0, 1, 0, 0}); + probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e", "b", "c"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 6); + + check_column_values(*sorted_block.get_by_position(0).column, {2, 3, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, + {"b", "c", "c", "d", Null(), Null()}); + check_column_values(*sorted_block.get_by_position(2).column, {2, 3, 3, 4, 1, 5}); + check_column_values(*sorted_block.get_by_position(3).column, + {"b", "c", "c", "d", Null(), Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 3); + + check_column_values(*sorted_block.get_by_position(0).column, {1, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", "e"}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftSemiJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 2); + check_column_values(*sorted_block.get_by_position(0).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinEmptyBuildSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, RightAntiJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 3); + + check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 5}); + check_column_values(*sorted_block.get_by_position(1).column, {Null(), "b", Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, RightSemiJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::RIGHT_SEMI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 2); + + check_column_values(*sorted_block.get_by_position(0).column, {3, 4}); + check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"}); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"f", "g", "h", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinEmptyBuildSide) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({}, {})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 5); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, .has_other_join_conjuncts = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin2) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 1); + check_column_values(*sorted_block.get_by_position(0).column, {2}); + check_column_values(*sorted_block.get_by_position(1).column, {"a"}); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts2) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1})); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, .has_other_join_conjuncts = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 0); +} + +TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin2) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + auto sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + ASSERT_EQ(sorted_block.rows(), 3); + check_column_values(*sorted_block.get_by_position(0).column, {2, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", "e"}); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinMark) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 5); + + Block sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", "d", "b", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, {Null(), 0, 0, Null(), Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, NullAwareLeftSemiJoinMark) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN, true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 5); + + Block sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data() << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", "d", "b", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, {Null(), 1, 1, Null(), Null()}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinMark) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_SEMI_JOIN, true, 1}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 5); + + Block sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", "d", "b", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, {0, 1, 1, Null(), 0}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMark) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 1, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({TJoinOp::LEFT_ANTI_JOIN, true, 1}, {TPrimitiveType::INT, TPrimitiveType::STRING}, + {true, false}, {false, true}, build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 5); + + Block sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, Null(), Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", "d", "b", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, {1, 0, 0, Null(), 1}); +} + +TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) { + auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 5}); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>( + {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1})); + sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1})); + + auto probe_block = + ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 5}, {0, 0, 0, 0, 1}); + probe_block.insert( + ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", "c", "d", "e"})); + probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>( + {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1})); + + Block output_block; + std::vector<Block> build_blocks = {sink_block}; + std::vector<Block> probe_blocks = {probe_block}; + run_test({.join_op_type = TJoinOp::LEFT_ANTI_JOIN, + .is_mark_join = true, + .mark_join_conjuncts_size = 1, + .has_other_join_conjuncts = true}, + {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, {false, true}, + build_blocks, probe_blocks, output_block); + + ASSERT_EQ(output_block.rows(), 5); + + Block sorted_block = sort_block_by_columns(output_block); + std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << std::endl; + + check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 4, Null()}); + check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", "c", "d", "e"}); + check_column_values(*sorted_block.get_by_position(2).column, {101, 100, 102, 99, Null()}); + check_column_values(*sorted_block.get_by_position(3).column, {0, 1, 0, 1, 1}); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/join_test_helper.cpp b/be/test/pipeline/operator/join_test_helper.cpp new file mode 100644 index 00000000000..d6828621590 --- /dev/null +++ b/be/test/pipeline/operator/join_test_helper.cpp @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "join_test_helper.h" + +#include "testutil/creators.h" + +namespace doris::pipeline { + +void JoinTestHelper::SetUp() { + runtime_state = std::make_unique<MockRuntimeState>(); + obj_pool = std::make_unique<ObjectPool>(); + + runtime_profile = std::make_shared<RuntimeProfile>("test"); + + query_ctx = generate_one_query(); + + runtime_state->_query_ctx = query_ctx.get(); + runtime_state->_query_id = query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + runtime_state->set_max_operator_id(-100); + + ADD_TIMER(runtime_profile.get(), "ExecTime"); + runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 0); +} + +void JoinTestHelper::TearDown() {} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/join_test_helper.h b/be/test/pipeline/operator/join_test_helper.h new file mode 100644 index 00000000000..b94b99c8aa2 --- /dev/null +++ b/be/test/pipeline/operator/join_test_helper.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Descriptors_types.h> +#include <gmock/gmock-actions.h> +#include <gmock/gmock-function-mocker.h> +#include <gmock/gmock-spec-builders.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +#include <memory> + +#include "common/object_pool.h" +#include "pipeline/pipeline_task.h" +#include "runtime/fragment_mgr.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { +class JoinTestHelper { +public: + virtual ~JoinTestHelper() = default; + void SetUp(); + void TearDown(); + + // virtual TPlanNode create_test_plan_node() = 0; + // virtual TDescriptorTable create_test_table_descriptor(bool nullable) = 0; + + std::unique_ptr<MockRuntimeState> runtime_state; + std::unique_ptr<ObjectPool> obj_pool; + std::shared_ptr<QueryContext> query_ctx; + std::shared_ptr<RuntimeProfile> runtime_profile; + std::shared_ptr<PipelineTask> pipeline_task; + DescriptorTbl* desc_tbl; +}; + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/testutil/mock/mock_operators.h b/be/test/testutil/mock/mock_operators.h index 58b1a738926..0c6136880ad 100644 --- a/be/test/testutil/mock/mock_operators.h +++ b/be/test/testutil/mock/mock_operators.h @@ -49,6 +49,13 @@ private: bool _eos = false; }; +class MockSourceOperator : public MockChildOperator { +public: + bool is_source() const override { return true; } + + Status close(RuntimeState* state) override { return Status::OK(); } +}; + class MockSinkOperator final : public DataSinkOperatorXBase { public: Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override { diff --git a/be/test/testutil/mock/mock_runtime_state.h b/be/test/testutil/mock/mock_runtime_state.h index 2b3888bf518..0831ba80404 100644 --- a/be/test/testutil/mock/mock_runtime_state.h +++ b/be/test/testutil/mock/mock_runtime_state.h @@ -41,12 +41,17 @@ public: return _enable_shared_exchange_sink_buffer; } + bool enable_share_hash_table_for_broadcast_join() const override { + return _enable_share_hash_table_for_broadcast_join; + } + bool enable_local_exchange() const override { return true; } WorkloadGroupPtr workload_group() override { return _workload_group; } // default batch size int batsh_size = 4096; bool _enable_shared_exchange_sink_buffer = true; + bool _enable_share_hash_table_for_broadcast_join = true; std::shared_ptr<MockContext> _mock_context = std::make_shared<MockContext>(); std::shared_ptr<MockQueryContext> _query_ctx_uptr = std::make_shared<MockQueryContext>(); WorkloadGroupPtr _workload_group = nullptr; diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index ac43c7ace14..a9688a3656d 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "common/logging.h" +#include "common/phdr_cache.h" #include "common/status.h" #include "gtest/gtest.h" #include "olap/page_cache.h" @@ -97,8 +98,23 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_tracking_memory(false); google::ParseCommandLineFlags(&argc, &argv, false); - int res = RUN_ALL_TESTS(); - doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr); - return res; + updatePHDRCache(); + try { + int res = RUN_ALL_TESTS(); + doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr); + return res; + } catch (doris::Exception& e) { + LOG(FATAL) << "Exception: " << e.what(); + } catch (...) { + auto eptr = std::current_exception(); + try { + std::rethrow_exception(eptr); + } catch (const std::exception& e) { + LOG(FATAL) << "Unknown exception: " << e.what(); + } catch (...) { + LOG(FATAL) << "Unknown exception"; + } + return -1; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org