This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bc22802b6a4 [test](ut) add cases about hash join (#49803)
bc22802b6a4 is described below

commit bc22802b6a43a6d62ac7ad1ed5000e2222976fca
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Tue Apr 15 09:28:13 2025 +0800

    [test](ut) add cases about hash join (#49803)
---
 be/src/pipeline/exec/hashjoin_build_sink.h         |    8 -
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |    5 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |    5 +-
 be/src/runtime/runtime_state.h                     |    2 +-
 be/src/util/stack_util.cpp                         |    2 +-
 .../pipeline/operator/hash_join_test_helper.cpp    |  630 ++++++++++++
 be/test/pipeline/operator/hash_join_test_helper.h  |   56 +
 .../pipeline/operator/hashjoin_build_sink_test.cpp |  362 +++++++
 .../operator/hashjoin_probe_operator_test.cpp      | 1074 ++++++++++++++++++++
 be/test/pipeline/operator/join_test_helper.cpp     |   43 +
 be/test/pipeline/operator/join_test_helper.h       |   53 +
 be/test/testutil/mock/mock_operators.h             |    7 +
 be/test/testutil/mock/mock_runtime_state.h         |    5 +
 be/test/testutil/run_all_tests.cpp                 |   22 +-
 14 files changed, 2258 insertions(+), 16 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index c6b6cdff029..ff477cd1105 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -42,14 +42,6 @@ public:
     void init_short_circuit_for_probe();
 
     bool build_unique() const;
-    std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
-
-    void add_hash_buckets_info(const std::string& info) const {
-        _profile->add_info_string("HashTableBuckets", info);
-    }
-    void add_hash_buckets_filled_info(const std::string& info) const {
-        _profile->add_info_string("HashTableFilledBuckets", info);
-    }
 
     Dependency* finishdependency() override { return _finish_dependency.get(); 
}
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index ab52f01fa5b..6138369fe23 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -529,11 +529,14 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* 
state) {
     }
 
     for (auto conjunct : _other_join_conjuncts) {
-        
conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids);
+        
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
     }
 
     for (auto& conjunct : _mark_join_conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
+        if (_have_other_join_conjunct) {
+            
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
+        }
     }
 
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, 
_child->row_desc()));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 3914fc0d58d..d1aa2280454 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -155,7 +155,8 @@ public:
     bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
 
     bool is_lazy_materialized_column(int column_id) const {
-        return _have_other_join_conjunct && 
!_other_conjunct_refer_column_ids.contains(column_id);
+        return _have_other_join_conjunct &&
+               !_should_not_lazy_materialized_column_ids.contains(column_id);
     }
 
 private:
@@ -185,7 +186,7 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
     bool _need_finalize_variant_column = false;
-    std::set<int> _other_conjunct_refer_column_ids;
+    std::set<int> _should_not_lazy_materialized_column_ids;
     std::vector<std::string> _right_table_column_names;
     const std::vector<TExpr> _partition_exprs;
 };
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5af2f643f38..4037c22d31a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -490,7 +490,7 @@ public:
                        : 0;
     }
 
-    bool enable_share_hash_table_for_broadcast_join() const {
+    MOCK_FUNCTION bool enable_share_hash_table_for_broadcast_join() const {
         return 
_query_options.__isset.enable_share_hash_table_for_broadcast_join &&
                _query_options.enable_share_hash_table_for_broadcast_join;
     }
diff --git a/be/src/util/stack_util.cpp b/be/src/util/stack_util.cpp
index d84c4eb4f21..4adeb7b5548 100644
--- a/be/src/util/stack_util.cpp
+++ b/be/src/util/stack_util.cpp
@@ -45,7 +45,7 @@ std::string get_stack_trace(int start_pointers_index, 
std::string dwarf_location
         dwarf_location_info_mode = config::dwarf_location_info_mode;
     }
 #ifdef BE_TEST
-    auto tool = std::string {"libunwind"};
+    auto tool = std::string {"boost"};
 #else
     auto tool = config::get_stack_trace_tool;
 #endif
diff --git a/be/test/pipeline/operator/hash_join_test_helper.cpp 
b/be/test/pipeline/operator/hash_join_test_helper.cpp
new file mode 100644
index 00000000000..532e2c55c70
--- /dev/null
+++ b/be/test/pipeline/operator/hash_join_test_helper.cpp
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hash_join_test_helper.h"
+
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <iostream>
+#include <sstream>
+#include <unordered_map>
+#include <vector>
+
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris::pipeline {
+TPlanNode HashJoinTestHelper::create_test_plan_node(
+        const TJoinOp::type& join_op_type, const 
std::vector<TPrimitiveType::type>& key_types,
+        const std::vector<bool>& left_keys_nullable, const std::vector<bool>& 
right_keys_nullable,
+        const bool is_mark_join, const size_t mark_join_conjuncts_size, const 
bool null_safe_equal,
+        const bool has_other_join_conjuncts) {
+    DCHECK_EQ(key_types.size(), left_keys_nullable.size());
+    DCHECK_EQ(key_types.size(), right_keys_nullable.size());
+    DCHECK_GE(key_types.size(), mark_join_conjuncts_size);
+
+    TPlanNode tnode;
+    tnode.node_id = 0;
+    tnode.node_type = TPlanNodeType::HASH_JOIN_NODE;
+    tnode.num_children = 2;
+    tnode.__set_hash_join_node(THashJoinNode());
+    tnode.hash_join_node.join_op = join_op_type;
+    tnode.limit = -1;
+    tnode.hash_join_node.__set_is_mark(is_mark_join);
+
+    size_t mark_join_conjuncts_count = 0;
+
+    doris::TSlotId col_unique_id = 0;
+    for (size_t i = 0; i != key_types.size(); ++i) {
+        const auto key_type = key_types[i];
+        const auto left_key_nullable = left_keys_nullable[i];
+        const auto right_key_nullable = right_keys_nullable[i];
+        TEqJoinCondition eq_cond;
+        eq_cond.left = TExpr();
+        eq_cond.right = TExpr();
+        if (null_safe_equal) {
+            eq_cond.opcode = TExprOpcode::EQ_FOR_NULL;
+        } else {
+            eq_cond.opcode = TExprOpcode::EQ;
+        }
+        eq_cond.__isset.opcode = true;
+        TTypeNode type_node;
+        type_node.type = TTypeNodeType::SCALAR;
+        type_node.scalar_type.type = key_type;
+        type_node.__isset.scalar_type = true;
+
+        if (key_type == TPrimitiveType::CHAR || key_type == 
TPrimitiveType::VARCHAR ||
+            key_type == TPrimitiveType::STRING) {
+            type_node.scalar_type.__set_len(OLAP_STRING_MAX_LENGTH);
+        } else if (key_type == TPrimitiveType::DECIMAL128I ||
+                   key_type == TPrimitiveType::DECIMAL256 ||
+                   key_type == TPrimitiveType::DECIMAL32 || key_type == 
TPrimitiveType::DECIMAL64 ||
+                   key_type == TPrimitiveType::DECIMALV2) {
+            type_node.scalar_type.__set_precision(18);
+            type_node.scalar_type.__set_scale(18);
+        } else if (key_type == TPrimitiveType::DATETIMEV2) {
+            type_node.scalar_type.__set_scale(6);
+        } else if (key_type == TPrimitiveType::TIMEV2) {
+            type_node.scalar_type.__set_scale(0);
+        }
+
+        eq_cond.left.nodes.emplace_back();
+        eq_cond.left.nodes[0].type.types.emplace_back(type_node);
+        eq_cond.left.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        eq_cond.left.nodes[0].__set_is_nullable(left_key_nullable);
+        eq_cond.left.nodes[0].num_children = 0;
+        eq_cond.left.nodes[0].slot_ref.col_unique_id = col_unique_id++;
+        eq_cond.left.nodes[0].__isset.slot_ref = true;
+
+        eq_cond.right.nodes.emplace_back();
+        eq_cond.right.nodes[0].type.types.emplace_back(type_node);
+        eq_cond.right.nodes[0].node_type = TExprNodeType::SLOT_REF;
+        eq_cond.right.nodes[0].__set_is_nullable(right_key_nullable);
+        eq_cond.right.nodes[0].num_children = 0;
+        eq_cond.right.nodes[0].slot_ref.col_unique_id = col_unique_id++;
+        eq_cond.right.nodes[0].__isset.slot_ref = true;
+
+        if (mark_join_conjuncts_count < mark_join_conjuncts_size) {
+            TExpr mark_join_cond;
+            mark_join_cond.nodes.emplace_back();
+            mark_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED;
+            mark_join_cond.nodes[0].opcode = TExprOpcode::EQ;
+            mark_join_cond.nodes[0].type.types.emplace_back();
+            mark_join_cond.nodes[0].type.types[0].scalar_type.type = 
TPrimitiveType::BOOLEAN;
+            mark_join_cond.nodes[0].type.types[0].__isset.scalar_type = true;
+            mark_join_cond.nodes[0].__set_is_nullable(true);
+            mark_join_cond.nodes[0].num_children = 2;
+
+            mark_join_cond.nodes[0].fn.name.function_name = "eq";
+            mark_join_cond.nodes[0].__isset.fn = true;
+
+            mark_join_cond.nodes.emplace_back();
+            mark_join_cond.nodes[1] = eq_cond.left.nodes[0];
+            mark_join_cond.nodes.emplace_back();
+            mark_join_cond.nodes[2] = eq_cond.right.nodes[0];
+            
tnode.hash_join_node.mark_join_conjuncts.emplace_back(mark_join_cond);
+            tnode.hash_join_node.__isset.mark_join_conjuncts = true;
+
+            ++mark_join_conjuncts_count;
+        } else {
+            tnode.hash_join_node.eq_join_conjuncts.push_back(eq_cond);
+        }
+    }
+
+    if (has_other_join_conjuncts) {
+        TExpr other_join_cond;
+        other_join_cond.nodes.emplace_back();
+        other_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED;
+        other_join_cond.nodes[0].opcode = TExprOpcode::EQ;
+        other_join_cond.nodes[0].type.types.emplace_back();
+        other_join_cond.nodes[0].type.types[0].scalar_type.type = 
TPrimitiveType::BOOLEAN;
+        other_join_cond.nodes[0].type.types[0].__isset.scalar_type = true;
+        other_join_cond.nodes[0].__set_is_nullable(true);
+        other_join_cond.nodes[0].num_children = 2;
+
+        other_join_cond.nodes[0].fn.name.function_name = "gt";
+        other_join_cond.nodes[0].__isset.fn = true;
+
+        other_join_cond.nodes.emplace_back();
+        other_join_cond.nodes[1].node_type = TExprNodeType::SLOT_REF;
+        other_join_cond.nodes[1].type.types.emplace_back();
+        other_join_cond.nodes[1].type.types[0].scalar_type.type = 
TPrimitiveType::INT;
+        other_join_cond.nodes[1].__set_is_nullable(true);
+        other_join_cond.nodes[1].type.types[0].__isset.scalar_type = true;
+        other_join_cond.nodes[1].num_children = 0;
+        other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++;
+        other_join_cond.nodes[1].__isset.slot_ref = true;
+
+        other_join_cond.nodes.emplace_back();
+        other_join_cond.nodes[2].node_type = TExprNodeType::INT_LITERAL;
+        other_join_cond.nodes[2].type.types.emplace_back();
+        other_join_cond.nodes[2].type.types[0].scalar_type.type = 
TPrimitiveType::INT;
+        other_join_cond.nodes[2].type.types[0].__isset.scalar_type = true;
+        other_join_cond.nodes[2].num_children = 0;
+        other_join_cond.nodes[2].int_literal.value = 100;
+        other_join_cond.nodes[2].__isset.int_literal = true;
+
+        tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond);
+
+        other_join_cond.nodes[2].int_literal.value = 50;
+        other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++;
+        tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond);
+        tnode.hash_join_node.__isset.other_join_conjuncts = true;
+    }
+
+    tnode.row_tuples.push_back(0);
+    tnode.row_tuples.push_back(1);
+    tnode.nullable_tuples.push_back(false);
+    tnode.nullable_tuples.push_back(false);
+    tnode.__isset.hash_join_node = true;
+
+    tnode.hash_join_node.vintermediate_tuple_id_list.emplace_back(2);
+    tnode.hash_join_node.__isset.vintermediate_tuple_id_list = true;
+
+    auto desc_table = create_test_table_descriptor(tnode);
+    auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+    DCHECK(!desc_table.slotDescriptors.empty());
+
+    runtime_state->set_desc_tbl(desc_tbl);
+
+    return tnode;
+}
+
+TDescriptorTable HashJoinTestHelper::create_test_table_descriptor(TPlanNode& 
tnode) {
+    TTupleDescriptorBuilder left_tuple_builder, right_tuple_builder, 
intermediate_tuple_builder,
+            output_tuple_builder;
+
+    const auto is_left_half_join =
+            tnode.hash_join_node.join_op == TJoinOp::LEFT_SEMI_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN 
||
+            tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
+    const auto is_right_half_join = tnode.hash_join_node.join_op == 
TJoinOp::RIGHT_ANTI_JOIN ||
+                                    tnode.hash_join_node.join_op == 
TJoinOp::RIGHT_SEMI_JOIN;
+    const auto is_half_join = is_left_half_join || is_right_half_join;
+
+    const auto has_other_join_conjuncts = 
!tnode.hash_join_node.other_join_conjuncts.empty();
+    const auto intermediate_need_output_left_side =
+            !is_half_join || is_left_half_join ||
+            !tnode.hash_join_node.mark_join_conjuncts.empty() || 
has_other_join_conjuncts;
+    const auto need_output_left_side = !is_half_join || is_left_half_join;
+
+    const auto intermediate_need_output_right_side =
+            !is_half_join || is_right_half_join ||
+            !tnode.hash_join_node.mark_join_conjuncts.empty() || 
has_other_join_conjuncts;
+    const auto need_output_right_side = !is_half_join || is_right_half_join;
+
+    const auto other_conjuncts_count = 
tnode.hash_join_node.other_join_conjuncts.empty() ? 0 : 1;
+
+    const auto keys_count = tnode.hash_join_node.eq_join_conjuncts.size();
+    const auto mark_keys_count = 
tnode.hash_join_node.mark_join_conjuncts.size();
+    const auto total_output_slots_count =
+            (need_output_left_side ? keys_count + mark_keys_count + 
other_conjuncts_count : 0) +
+            (need_output_right_side ? keys_count + mark_keys_count + 
other_conjuncts_count : 0);
+    const auto total_intermediate_output_slots_count =
+            (intermediate_need_output_left_side
+                     ? keys_count + mark_keys_count + other_conjuncts_count
+                     : 0) +
+            (intermediate_need_output_right_side
+                     ? keys_count + mark_keys_count + other_conjuncts_count
+                     : 0);
+    std::vector<TSlotDescriptor> 
intermediate_slots(total_intermediate_output_slots_count);
+    std::vector<TSlotDescriptor> output_slots(total_output_slots_count);
+    const auto intermediate_keys_offset =
+            intermediate_need_output_left_side
+                    ? keys_count + mark_keys_count + other_conjuncts_count
+                    : 0;
+    const auto keys_offset =
+            need_output_left_side ? keys_count + mark_keys_count + 
other_conjuncts_count : 0;
+
+    size_t slots_count = 0;
+    doris::TSlotId col_unique_id = 200;
+    std::vector<doris::TSlotId> hash_output_unique_ids;
+    std::vector<TExpr> projection_exprs(total_output_slots_count);
+    for (auto& eq_cond : tnode.hash_join_node.mark_join_conjuncts) {
+        auto left_type = 
thrift_to_type(eq_cond.nodes[1].type.types[0].scalar_type.type);
+        auto slot = TSlotDescriptorBuilder()
+                            .type(left_type)
+                            .nullable(eq_cond.nodes[1].is_nullable)
+                            .build();
+        slot.slotType.types[0].scalar_type = 
eq_cond.nodes[1].type.types[0].scalar_type;
+        slot.col_unique_id = eq_cond.nodes[1].slot_ref.col_unique_id;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+
+        intermediate_slots[slots_count] = slot;
+        if (need_output_left_side) {
+            auto& expr = projection_exprs[slots_count];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count] = slot;
+        }
+
+        slot.col_unique_id = col_unique_id++;
+        left_tuple_builder.add_slot(slot);
+        hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+        auto right_type = 
thrift_to_type(eq_cond.nodes[2].type.types[0].scalar_type.type);
+
+        slot = TSlotDescriptorBuilder()
+                       .type(right_type)
+                       .nullable(eq_cond.nodes[2].is_nullable)
+                       .build();
+        slot.slotType.types[0].scalar_type = 
eq_cond.nodes[2].type.types[0].scalar_type;
+        slot.col_unique_id = eq_cond.nodes[2].slot_ref.col_unique_id;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+        intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+        if (need_output_right_side) {
+            auto& expr = projection_exprs[slots_count + keys_offset];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count + keys_offset] = slot;
+        }
+        slot.col_unique_id = col_unique_id++;
+        right_tuple_builder.add_slot(slot);
+        hash_output_unique_ids.emplace_back(slot.col_unique_id);
+        ++slots_count;
+    }
+
+    for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) {
+        auto left_type = 
thrift_to_type(eq_cond.left.nodes[0].type.types[0].scalar_type.type);
+        auto slot = TSlotDescriptorBuilder()
+                            .type(left_type)
+                            .nullable(eq_cond.left.nodes[0].is_nullable)
+                            .build();
+        slot.slotType.types[0].scalar_type = 
eq_cond.left.nodes[0].type.types[0].scalar_type;
+        slot.col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id;
+        left_tuple_builder.add_slot(slot);
+        if (intermediate_need_output_left_side || need_output_left_side) {
+            hash_output_unique_ids.emplace_back(slot.col_unique_id);
+        }
+
+        slot.col_unique_id = col_unique_id++;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+
+        if (intermediate_need_output_left_side) {
+            intermediate_slots[slots_count] = slot;
+        }
+
+        if (need_output_left_side) {
+            auto& expr = projection_exprs[slots_count];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count] = slot;
+        }
+
+        auto right_type = 
thrift_to_type(eq_cond.right.nodes[0].type.types[0].scalar_type.type);
+
+        slot = TSlotDescriptorBuilder()
+                       .type(right_type)
+                       .nullable(eq_cond.right.nodes[0].is_nullable)
+                       .build();
+        slot.slotType.types[0].scalar_type = 
eq_cond.right.nodes[0].type.types[0].scalar_type;
+        slot.col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id;
+        right_tuple_builder.add_slot(slot);
+        if (need_output_right_side || intermediate_need_output_right_side) {
+            hash_output_unique_ids.emplace_back(slot.col_unique_id);
+        }
+        slot.col_unique_id = col_unique_id++;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+
+        if (intermediate_need_output_right_side) {
+            intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+        }
+        if (need_output_right_side) {
+            auto& expr = projection_exprs[slots_count + keys_offset];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count + keys_offset] = slot;
+        }
+        ++slots_count;
+    }
+
+    if (tnode.hash_join_node.other_join_conjuncts.size() == 2) {
+        auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0];
+        auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1];
+
+        auto left_type = 
thrift_to_type(left_cond.nodes[1].type.types[0].scalar_type.type);
+        auto slot = TSlotDescriptorBuilder()
+                            .type(left_type)
+                            .nullable(left_cond.nodes[1].is_nullable)
+                            .build();
+        slot.slotType.types[0].scalar_type = 
left_cond.nodes[1].type.types[0].scalar_type;
+
+        slot.col_unique_id = col_unique_id++;
+        left_tuple_builder.add_slot(slot);
+        hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+        slot.col_unique_id = left_cond.nodes[1].slot_ref.col_unique_id;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+
+        intermediate_slots[slots_count] = slot;
+
+        if (need_output_left_side) {
+            auto& expr = projection_exprs[slots_count];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count] = slot;
+        }
+
+        auto right_type = 
thrift_to_type(right_cond.nodes[1].type.types[0].scalar_type.type);
+        slot = TSlotDescriptorBuilder()
+                       .type(right_type)
+                       .nullable(right_cond.nodes[1].is_nullable)
+                       .build();
+        slot.slotType.types[0].scalar_type = 
right_cond.nodes[1].type.types[0].scalar_type;
+
+        slot.col_unique_id = col_unique_id++;
+        right_tuple_builder.add_slot(slot);
+        hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+        slot.col_unique_id = right_cond.nodes[1].slot_ref.col_unique_id;
+        if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+            tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+            slot.nullIndicatorByte = 0;
+            slot.nullIndicatorBit = 0;
+        }
+
+        intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+
+        if (need_output_right_side) {
+            auto& expr = projection_exprs[slots_count + keys_offset];
+            auto& node = expr.nodes.emplace_back();
+            node.node_type = TExprNodeType::SLOT_REF;
+            node.__set_is_nullable(slot.nullIndicatorBit == 0 && 
slot.nullIndicatorByte == 0);
+            node.num_children = 0;
+            node.__isset.slot_ref = true;
+            node.slot_ref.col_unique_id = slot.col_unique_id;
+            node.slot_ref.slot_id = slot.id;
+            node.slot_ref.__isset.col_unique_id = true;
+            node.type.types.emplace_back();
+            node.type.types[0].scalar_type = 
slot.slotType.types[0].scalar_type;
+            node.type.types[0].__isset.scalar_type = true;
+
+            slot.col_unique_id = col_unique_id++;
+            output_slots[slots_count + keys_offset] = slot;
+        }
+
+        ++slots_count;
+    }
+
+    TDescriptorTableBuilder builder;
+
+    left_tuple_builder.build(&builder);
+    right_tuple_builder.build(&builder);
+
+    for (auto& slot : intermediate_slots) {
+        intermediate_tuple_builder.add_slot(slot);
+    }
+
+    for (auto& slot : output_slots) {
+        output_tuple_builder.add_slot(slot);
+    }
+
+    if (tnode.hash_join_node.is_mark) {
+        auto type = thrift_to_type(TPrimitiveType::BOOLEAN);
+        auto slot = TSlotDescriptorBuilder().type(type).nullable(true).build();
+        slot.col_unique_id = col_unique_id++;
+        intermediate_tuple_builder.add_slot(slot);
+
+        auto& expr = projection_exprs.emplace_back();
+        auto& node = expr.nodes.emplace_back();
+        node.node_type = TExprNodeType::SLOT_REF;
+        node.__set_is_nullable(true);
+        node.num_children = 0;
+        node.__isset.slot_ref = true;
+        node.slot_ref.col_unique_id = slot.col_unique_id;
+        node.slot_ref.slot_id = slot.id;
+        node.slot_ref.__isset.col_unique_id = true;
+        node.type.types.emplace_back();
+        node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type;
+        node.type.types[0].__isset.scalar_type = true;
+
+        slot.col_unique_id = col_unique_id++;
+        output_tuple_builder.add_slot(slot);
+    }
+
+    tnode.projections = projection_exprs;
+    tnode.__isset.projections = true;
+
+    intermediate_tuple_builder.build(&builder);
+
+    output_tuple_builder.build(&builder);
+    tnode.__set_output_tuple_id(3);
+    tnode.hash_join_node.__set_voutput_tuple_id(3);
+
+    auto table_desc = builder.desc_tbl();
+
+    std::unordered_map<int32_t, TTupleId> slots_map;
+    for (auto& slot : table_desc.slotDescriptors) {
+        slots_map[slot.col_unique_id] = slot.id;
+    }
+
+    for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) {
+        col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id;
+        eq_cond.left.nodes[0].slot_ref.slot_id = slots_map[col_unique_id];
+
+        col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id;
+        eq_cond.right.nodes[0].slot_ref.slot_id = slots_map[col_unique_id];
+    }
+
+    for (size_t i = 0; i != mark_keys_count; ++i) {
+        auto& slot_ref = 
tnode.hash_join_node.mark_join_conjuncts[i].nodes[1].slot_ref;
+        slot_ref.slot_id = slots_map[slot_ref.col_unique_id];
+
+        auto& slot_ref_right = 
tnode.hash_join_node.mark_join_conjuncts[i].nodes[2].slot_ref;
+        slot_ref_right.slot_id = slots_map[slot_ref_right.col_unique_id];
+    }
+
+    for (auto& expr : tnode.projections) {
+        for (auto& node : expr.nodes) {
+            if (node.node_type == TExprNodeType::SLOT_REF) {
+                auto col_unique_id = node.slot_ref.col_unique_id;
+                node.slot_ref.slot_id = slots_map[col_unique_id];
+            }
+        }
+    }
+
+    if (tnode.hash_join_node.other_join_conjuncts.size() == 2) {
+        auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0];
+        auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1];
+
+        left_cond.nodes[1].slot_ref.slot_id = 
slots_map[left_cond.nodes[1].slot_ref.col_unique_id];
+        right_cond.nodes[1].slot_ref.slot_id =
+                slots_map[right_cond.nodes[1].slot_ref.col_unique_id];
+    }
+
+    for (auto uid : hash_output_unique_ids) {
+        tnode.hash_join_node.hash_output_slot_ids.emplace_back(slots_map[uid]);
+    }
+    tnode.hash_join_node.__isset.hash_output_slot_ids = true;
+
+    return table_desc;
+}
+
+void HashJoinTestHelper::add_mark_join_conjuncts(TPlanNode& join_node,
+                                                 std::vector<TExpr>& 
conjuncts) {
+    EXPECT_TRUE(join_node.__isset.hash_join_node);
+
+    join_node.hash_join_node.__isset.mark_join_conjuncts = true;
+    join_node.hash_join_node.mark_join_conjuncts.insert(
+            join_node.hash_join_node.mark_join_conjuncts.end(), 
conjuncts.begin(), conjuncts.end());
+}
+
+void HashJoinTestHelper::add_other_join_conjuncts(TPlanNode& join_node,
+                                                  std::vector<TExpr>& 
conjuncts) {
+    EXPECT_TRUE(join_node.__isset.hash_join_node);
+
+    join_node.hash_join_node.__isset.other_join_conjuncts = true;
+    join_node.hash_join_node.other_join_conjuncts.insert(
+            join_node.hash_join_node.other_join_conjuncts.end(), 
conjuncts.begin(),
+            conjuncts.end());
+}
+
+std::pair<std::shared_ptr<HashJoinProbeOperatorX>, 
std::shared_ptr<HashJoinBuildSinkOperatorX>>
+HashJoinTestHelper::create_operators(const TPlanNode& tnode) {
+    auto sink_operator = 
std::make_shared<HashJoinBuildSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
+                                                                      
runtime_state->desc_tbl());
+
+    auto probe_operator = 
std::make_shared<HashJoinProbeOperatorX>(obj_pool.get(), tnode, 0,
+                                                                   
runtime_state->desc_tbl());
+
+    auto child_operator = std::make_shared<MockSourceOperator>();
+    auto probe_side_source_operator = std::make_shared<MockSourceOperator>();
+    auto probe_side_sink_operator = std::make_shared<MockSinkOperator>();
+
+    RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false});
+    child_operator->_row_descriptor = row_desc;
+
+    RowDescriptor probe_side_row_desc(runtime_state->desc_tbl(), {0}, {false});
+    probe_side_source_operator->_row_descriptor = probe_side_row_desc;
+
+    EXPECT_TRUE(sink_operator->set_child(child_operator));
+    EXPECT_TRUE(probe_operator->set_child(probe_side_source_operator));
+    EXPECT_TRUE(probe_operator->set_child(child_operator));
+
+    // Setup task and state
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    auto [source_pipeline, _] = generate_sort_pipeline(probe_operator, 
probe_side_sink_operator,
+                                                       sink_operator, 
child_operator);
+    pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0, 
runtime_state.get(), nullptr,
+                                                   nullptr, shared_state_map, 
0);
+
+    return {std::move(probe_operator), std::move(sink_operator)};
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hash_join_test_helper.h 
b/be/test/pipeline/operator/hash_join_test_helper.h
new file mode 100644
index 00000000000..355368a37d0
--- /dev/null
+++ b/be/test/pipeline/operator/hash_join_test_helper.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "join_test_helper.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
+
+namespace doris::pipeline {
+class HashJoinTestHelper : public JoinTestHelper {
+public:
+    TPlanNode create_test_plan_node(const TJoinOp::type& join_op_type,
+                                    const std::vector<TPrimitiveType::type>& 
key_types,
+                                    const std::vector<bool>& 
left_keys_nullable,
+                                    const std::vector<bool>& 
right_keys_nullable,
+                                    const bool is_mark_join = false,
+                                    const size_t mark_join_conjuncts_size = 0,
+                                    const bool null_safe_equal = false,
+                                    const bool has_other_join_conjuncts = 
false);
+
+    TDescriptorTable create_test_table_descriptor(TPlanNode& tnode);
+
+    void add_mark_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>& 
conjuncts);
+    void add_other_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>& 
conjuncts);
+
+    std::pair<std::shared_ptr<HashJoinProbeOperatorX>, 
std::shared_ptr<HashJoinBuildSinkOperatorX>>
+    create_operators(const TPlanNode& tnode);
+
+protected:
+    std::vector<TExprNode*> _left_slots, _right_slots;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hashjoin_build_sink_test.cpp 
b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp
new file mode 100644
index 00000000000..7d60834b138
--- /dev/null
+++ b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/hashjoin_build_sink.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/config.h"
+#include "hash_join_test_helper.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::pipeline {
+
+class HashJoinBuildSinkTest : public testing::Test {
+public:
+    void SetUp() override { _helper.SetUp(); }
+    void TearDown() override { _helper.TearDown(); }
+
+    template <typename Func>
+    void run_test_block(Func test_block) {
+        auto testing_join_ops = {TJoinOp::INNER_JOIN,
+                                 TJoinOp::LEFT_OUTER_JOIN,
+                                 TJoinOp::RIGHT_OUTER_JOIN,
+                                 TJoinOp::FULL_OUTER_JOIN,
+                                 TJoinOp::LEFT_SEMI_JOIN,
+                                 TJoinOp::RIGHT_SEMI_JOIN,
+                                 TJoinOp::LEFT_ANTI_JOIN,
+                                 TJoinOp::RIGHT_ANTI_JOIN,
+                                 TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
+                                 TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN};
+        auto testing_key_types = {
+                TPrimitiveType::BOOLEAN,     TPrimitiveType::TINYINT,    
TPrimitiveType::SMALLINT,
+                TPrimitiveType::INT,         TPrimitiveType::BIGINT,     
TPrimitiveType::FLOAT,
+                TPrimitiveType::DOUBLE,      TPrimitiveType::DATE,       
TPrimitiveType::DATETIME,
+                TPrimitiveType::BINARY,      TPrimitiveType::CHAR,       
TPrimitiveType::LARGEINT,
+                TPrimitiveType::VARCHAR,     TPrimitiveType::DECIMALV2,  
TPrimitiveType::TIME,
+                TPrimitiveType::STRING,      TPrimitiveType::DATEV2,     
TPrimitiveType::DATETIMEV2,
+                TPrimitiveType::TIMEV2,      TPrimitiveType::DECIMAL32,  
TPrimitiveType::DECIMAL64,
+                TPrimitiveType::DECIMAL128I, TPrimitiveType::DECIMAL256, 
TPrimitiveType::IPV4,
+                TPrimitiveType::IPV6};
+
+        for (const auto& op_type : testing_join_ops) {
+            for (const auto key_type : testing_key_types) {
+                for (auto left_nullable : {true, false}) {
+                    for (auto right_nullable : {true, false}) {
+                        test_block(op_type, {key_type}, {left_nullable}, 
{right_nullable});
+                    }
+                }
+
+                for (const auto key_type2 : testing_key_types) {
+                    for (auto left_nullable : {true, false}) {
+                        for (auto right_nullable : {true, false}) {
+                            test_block(op_type, {key_type, key_type2},
+                                       {left_nullable, right_nullable},
+                                       {right_nullable, left_nullable});
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+protected:
+    HashJoinTestHelper _helper;
+};
+
+TEST_F(HashJoinBuildSinkTest, Init) {
+    auto test_block = [&](TJoinOp::type op_type, const 
std::vector<TPrimitiveType::type>& key_types,
+                          const std::vector<bool>& left_nullables,
+                          const std::vector<bool>& right_nullables) {
+        auto tnode =
+                _helper.create_test_plan_node(op_type, key_types, 
left_nullables, right_nullables);
+        auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+        ASSERT_TRUE(probe_operator);
+        ASSERT_TRUE(sink_operator);
+
+        auto runtime_state = std::make_unique<MockRuntimeState>();
+        runtime_state->_query_ctx = _helper.query_ctx.get();
+        runtime_state->_query_id = _helper.query_ctx->query_id();
+        runtime_state->resize_op_id_to_local_state(-100);
+        runtime_state->set_max_operator_id(-100);
+        runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+        auto st = sink_operator->init(tnode, runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+        st = sink_operator->prepare(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        auto shared_state = sink_operator->create_shared_state();
+        ASSERT_TRUE(shared_state);
+
+        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                std::vector<std::shared_ptr<Dependency>>>>
+                shared_state_map;
+        LocalSinkStateInfo info {.task_idx = 0,
+                                 .parent_profile = 
_helper.runtime_profile.get(),
+                                 .sender_id = 0,
+                                 .shared_state = shared_state.get(),
+                                 .shared_state_map = shared_state_map,
+                                 .tsink = TDataSink()};
+        st = sink_operator->setup_local_state(runtime_state.get(), info);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        st = probe_operator->init(tnode, runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+        st = probe_operator->prepare(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        LocalStateInfo info2 {.parent_profile = _helper.runtime_profile.get(),
+                              .scan_ranges = {},
+                              .shared_state = shared_state.get(),
+                              .shared_state_map = shared_state_map,
+                              .task_idx = 0};
+
+        st = probe_operator->setup_local_state(runtime_state.get(), info2);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        auto* probe_local_state = 
runtime_state->get_local_state(probe_operator->operator_id());
+        ASSERT_TRUE(probe_local_state);
+
+        st = probe_local_state->open(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+                runtime_state->get_sink_local_state());
+        ASSERT_TRUE(local_state);
+
+        st = local_state->open(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        
ASSERT_GT(sink_operator->get_memory_usage_debug_str(runtime_state.get()).size(),
 0);
+
+        st = local_state->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        st = sink_operator->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        st = probe_operator->close(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        st = probe_local_state->close(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+    };
+
+    run_test_block(test_block);
+}
+
+TEST_F(HashJoinBuildSinkTest, Sink) {
+    auto test_block = [&](TJoinOp::type op_type, const 
std::vector<TPrimitiveType::type>& key_types,
+                          const std::vector<bool>& left_nullables,
+                          const std::vector<bool>& right_nullables) {
+        auto tnode =
+                _helper.create_test_plan_node(op_type, key_types, 
left_nullables, right_nullables);
+        auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+        ASSERT_TRUE(probe_operator);
+        ASSERT_TRUE(sink_operator);
+
+        auto runtime_state = std::make_unique<MockRuntimeState>();
+        runtime_state->_query_ctx = _helper.query_ctx.get();
+        runtime_state->_query_id = _helper.query_ctx->query_id();
+        runtime_state->resize_op_id_to_local_state(-100);
+        runtime_state->set_max_operator_id(-100);
+        runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+        auto st = sink_operator->init(TDataSink());
+        ASSERT_FALSE(st.ok());
+
+        st = sink_operator->init(tnode, runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+        st = sink_operator->prepare(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        auto shared_state = sink_operator->create_shared_state();
+        ASSERT_TRUE(shared_state);
+
+        shared_state->create_source_dependency(sink_operator->operator_id(), 
tnode.node_id,
+                                               "HashJoinSinkTestDep");
+
+        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                std::vector<std::shared_ptr<Dependency>>>>
+                shared_state_map;
+        LocalSinkStateInfo info {.task_idx = 0,
+                                 .parent_profile = 
_helper.runtime_profile.get(),
+                                 .sender_id = 0,
+                                 .shared_state = shared_state.get(),
+                                 .shared_state_map = shared_state_map,
+                                 .tsink = TDataSink()};
+        st = sink_operator->setup_local_state(runtime_state.get(), info);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+                runtime_state->get_sink_local_state());
+        ASSERT_TRUE(local_state);
+
+        st = local_state->open(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), 
false), 0);
+
+        const auto& row_desc = sink_operator->child()->row_desc();
+        vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0);
+
+        auto mutable_block = vectorized::MutableBlock(block.clone_empty());
+        for (auto& col : mutable_block.mutable_columns()) {
+            col->insert_default();
+            if (col->is_nullable()) {
+                auto& nullable_column = 
assert_cast<vectorized::ColumnNullable&>(*col);
+                nullable_column.insert_not_null_elements(1);
+            } else {
+                col->insert_default();
+            }
+        }
+
+        auto block2 = mutable_block.to_block();
+        st = sink_operator->sink(runtime_state.get(), &block2, false);
+        ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+        ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(), 
true), 0);
+        st = sink_operator->sink(runtime_state.get(), &block, true);
+        ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+        ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), 
true), 0);
+
+        ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0);
+
+        st = local_state->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        st = sink_operator->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+    };
+
+    run_test_block(test_block);
+}
+
+TEST_F(HashJoinBuildSinkTest, Terminate) {
+    auto test_block = [&](TJoinOp::type op_type, const 
std::vector<TPrimitiveType::type>& key_types,
+                          const std::vector<bool>& left_nullables,
+                          const std::vector<bool>& right_nullables) {
+        auto tnode =
+                _helper.create_test_plan_node(op_type, key_types, 
left_nullables, right_nullables);
+        auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+        ASSERT_TRUE(probe_operator);
+        ASSERT_TRUE(sink_operator);
+
+        auto runtime_state = std::make_unique<MockRuntimeState>();
+        runtime_state->_query_ctx = _helper.query_ctx.get();
+        runtime_state->_query_id = _helper.query_ctx->query_id();
+        runtime_state->resize_op_id_to_local_state(-100);
+        runtime_state->set_max_operator_id(-100);
+        runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+        auto st = sink_operator->init(TDataSink());
+        ASSERT_FALSE(st.ok());
+
+        st = sink_operator->init(tnode, runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+        st = sink_operator->prepare(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        auto shared_state = sink_operator->create_shared_state();
+        ASSERT_TRUE(shared_state);
+
+        shared_state->create_source_dependency(sink_operator->operator_id(), 
tnode.node_id,
+                                               "HashJoinSinkTestDep");
+
+        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+                                std::vector<std::shared_ptr<Dependency>>>>
+                shared_state_map;
+        LocalSinkStateInfo info {.task_idx = 0,
+                                 .parent_profile = 
_helper.runtime_profile.get(),
+                                 .sender_id = 0,
+                                 .shared_state = shared_state.get(),
+                                 .shared_state_map = shared_state_map,
+                                 .tsink = TDataSink()};
+        st = sink_operator->setup_local_state(runtime_state.get(), info);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+                runtime_state->get_sink_local_state());
+        ASSERT_TRUE(local_state);
+
+        st = local_state->open(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), 
false), 0);
+
+        const auto& row_desc = sink_operator->child()->row_desc();
+        vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0);
+
+        auto mutable_block = vectorized::MutableBlock(block.clone_empty());
+        for (auto& col : mutable_block.mutable_columns()) {
+            col->insert_default();
+            if (col->is_nullable()) {
+                auto& nullable_column = 
assert_cast<vectorized::ColumnNullable&>(*col);
+                nullable_column.insert_not_null_elements(1);
+            } else {
+                col->insert_default();
+            }
+        }
+
+        auto block2 = mutable_block.to_block();
+        st = sink_operator->sink(runtime_state.get(), &block2, false);
+        ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+        st = sink_operator->terminate(runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "terminate failed: " << st.to_string();
+
+        ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(), 
true), 0);
+        st = sink_operator->sink(runtime_state.get(), &block, true);
+        ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+        ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(), 
true), 0);
+
+        ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0);
+
+        st = local_state->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        st = sink_operator->close(runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+    };
+
+    run_test_block(test_block);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp 
b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
new file mode 100644
index 00000000000..7c5fec3116c
--- /dev/null
+++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
@@ -0,0 +1,1074 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <cassert>
+#include <cstddef>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "hash_join_test_helper.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/operator.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_operators.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/sort_block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+class HashJoinProbeOperatorTest : public testing::Test {
+public:
+    void SetUp() override { _helper.SetUp(); }
+    void TearDown() override { _helper.TearDown(); }
+
+    template <typename T>
+    void check_column_value(const IColumn& column, const size_t index, const 
T& value) {
+        StringRef data;
+        if (column.is_nullable()) {
+            const auto& nullable_column = assert_cast<const 
ColumnNullable&>(column);
+            EXPECT_FALSE(nullable_column.is_null_at(index));
+            auto nested_column = nullable_column.get_nested_column_ptr();
+            data = nested_column->get_data_at(index);
+        } else {
+            data = column.get_data_at(index);
+        }
+
+        if constexpr (std::is_same_v<std::string, T>) {
+            EXPECT_EQ(data.to_string(), value);
+        } else if constexpr (std::is_same_v<StringRef, T>) {
+            EXPECT_EQ(data.to_string(), value.to_string());
+        } else {
+            EXPECT_EQ(sizeof(T), data.size);
+            EXPECT_EQ(memcmp(data.data, &value, sizeof(T)), 0);
+        }
+    }
+
+    void check_column_values(const IColumn& column, const 
std::vector<vectorized::Field>& values,
+                             std::source_location loc = 
std::source_location::current()) {
+        for (size_t i = 0; i != values.size(); ++i) {
+            vectorized::Field value;
+            column.get(i, value);
+            ASSERT_EQ(value.get_type(), values[i].get_type())
+                    << "row: " << i << " type not match at: " << 
loc.file_name() << ":"
+                    << loc.line();
+            ASSERT_TRUE(value == values[i])
+                    << "row: " << i << " value not match at: " << 
loc.file_name() << ":"
+                    << loc.line();
+        }
+    }
+
+    Block sort_block_by_columns(Block& block, const std::vector<size_t>& 
sort_columns = {}) {
+        SortDescription sort_description;
+        for (auto column : sort_columns) {
+            sort_description.emplace_back(column, 1, 1);
+        }
+
+        if (sort_description.empty()) {
+            for (size_t i = 0; i != block.columns(); ++i) {
+                sort_description.emplace_back(i, 1, 1);
+            }
+        }
+
+        auto sorted_block = block.clone_empty();
+        sort_block(block, sorted_block, sort_description);
+        return sorted_block;
+    }
+
+    struct JoinParams {
+        TJoinOp::type join_op_type {TJoinOp::INNER_JOIN};
+        bool is_mark_join {false};
+        size_t mark_join_conjuncts_size {0};
+        bool is_broadcast_join {false};
+        bool null_safe_equal {false};
+        bool has_other_join_conjuncts {false};
+    };
+
+    // NOLINTNEXTLINE(readability-function-*)
+    void run_test(const JoinParams& join_params, const 
std::vector<TPrimitiveType::type>& key_types,
+                  const std::vector<bool>& left_keys_nullable,
+                  const std::vector<bool>& right_keys_nullable, 
std::vector<Block>& build_blocks,
+                  std::vector<Block>& probe_blocks, Block& output_block) {
+        auto tnode = _helper.create_test_plan_node(
+                join_params.join_op_type, key_types, left_keys_nullable, 
right_keys_nullable,
+                join_params.is_mark_join, join_params.mark_join_conjuncts_size,
+                join_params.null_safe_equal, 
join_params.has_other_join_conjuncts);
+
+        bool should_build_hash_table = true;
+        if (join_params.is_broadcast_join) {
+            tnode.hash_join_node.__isset.is_broadcast_join = true;
+            tnode.hash_join_node.is_broadcast_join = true;
+            should_build_hash_table = true;
+        }
+
+        auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+        ASSERT_TRUE(probe_operator);
+        ASSERT_TRUE(sink_operator);
+
+        auto st = probe_operator->init(tnode, _helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+        st = probe_operator->prepare(_helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        st = sink_operator->init(tnode, _helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+        st = sink_operator->prepare(_helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+        auto shared_runtime_state = std::make_shared<MockRuntimeState>();
+
+        auto shared_state = sink_operator->create_shared_state();
+        if (join_params.is_broadcast_join) {
+            shared_state = HashJoinSharedState::create_shared(8);
+            shared_state->create_source_dependencies(8, 
sink_operator->operator_id(),
+                                                     sink_operator->node_id(), 
"HASH_JOIN_PROBE");
+            shared_runtime_state->_query_ctx = 
_helper.runtime_state->_query_ctx;
+            shared_runtime_state->_query_id = _helper.runtime_state->_query_id;
+            shared_runtime_state->resize_op_id_to_local_state(-100);
+            shared_runtime_state->set_max_operator_id(-100);
+
+            LocalSinkStateInfo sink_local_state_info {
+                    .task_idx = 1,
+                    .parent_profile = _helper.runtime_profile.get(),
+                    .sender_id = 0,
+                    .shared_state = shared_state.get(),
+                    .shared_state_map = {},
+                    .tsink = TDataSink(),
+            };
+            st = sink_operator->setup_local_state(shared_runtime_state.get(),
+                                                  sink_local_state_info);
+            ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << 
st.to_string();
+
+            auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>(
+                    shared_runtime_state->get_sink_local_state());
+            ASSERT_TRUE(sink_local_state);
+
+            st = sink_local_state->open(shared_runtime_state.get());
+            ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+        }
+
+        ASSERT_EQ(probe_operator->is_broadcast_join(), 
join_params.is_broadcast_join);
+        ASSERT_TRUE(shared_state);
+        LocalSinkStateInfo sink_local_state_info {
+                .task_idx = 0,
+                .parent_profile = _helper.runtime_profile.get(),
+                .sender_id = 0,
+                .shared_state = shared_state.get(),
+                .shared_state_map = {},
+                .tsink = TDataSink(),
+        };
+
+        st = sink_operator->setup_local_state(_helper.runtime_state.get(), 
sink_local_state_info);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        ASSERT_EQ(sink_operator->should_dry_run(_helper.runtime_state.get()),
+                  join_params.is_broadcast_join && !should_build_hash_table);
+
+        ASSERT_EQ(sink_operator->require_data_distribution(), false);
+        ASSERT_EQ(probe_operator->require_data_distribution(), false);
+        ASSERT_FALSE(sink_operator->is_shuffled_operator());
+        ASSERT_FALSE(probe_operator->is_shuffled_operator());
+        std::cout << "sink distribution: "
+                  << get_exchange_type_name(
+                             
sink_operator->required_data_distribution().distribution_type)
+                  << std::endl;
+        std::cout << "probe distribution: "
+                  << get_exchange_type_name(
+                             
probe_operator->required_data_distribution().distribution_type)
+                  << std::endl;
+
+        LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
+                             .scan_ranges = {},
+                             .shared_state = shared_state.get(),
+                             .shared_state_map = {},
+                             .task_idx = 0};
+        st = probe_operator->setup_local_state(_helper.runtime_state.get(), 
info);
+        ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+        auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>(
+                _helper.runtime_state->get_sink_local_state());
+        ASSERT_TRUE(sink_local_state);
+
+        st = sink_local_state->open(_helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        auto* probe_local_state =
+                
_helper.runtime_state->get_local_state(probe_operator->operator_id());
+        ASSERT_TRUE(probe_local_state);
+
+        st = probe_local_state->open(_helper.runtime_state.get());
+        ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+        std::cout << "probe debug string: " << probe_operator->debug_string(0) 
<< std::endl;
+
+        for (auto& build_block : build_blocks) {
+            st = sink_operator->sink(_helper.runtime_state.get(), 
&build_block, false);
+            ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+        }
+        ASSERT_EQ(sink_local_state->build_unique(),
+                  tnode.hash_join_node.other_join_conjuncts.empty() &&
+                          (tnode.hash_join_node.join_op == 
TJoinOp::LEFT_ANTI_JOIN ||
+                           tnode.hash_join_node.join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
+                           tnode.hash_join_node.join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
+
+        Block empty_block;
+
+        if (join_params.is_broadcast_join) {
+            st = sink_operator->sink(shared_runtime_state.get(), &empty_block, 
false);
+            ASSERT_FALSE(st.ok());
+        }
+
+        st = sink_operator->sink(_helper.runtime_state.get(), &empty_block, 
true);
+        ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+        st = sink_operator->close(_helper.runtime_state.get(), st);
+        ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+        if (join_params.is_broadcast_join) {
+            st = sink_operator->sink(shared_runtime_state.get(), &empty_block, 
true);
+            ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+            st = sink_operator->close(shared_runtime_state.get(), st);
+            ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+        }
+
+        auto source_operator =
+                
std::dynamic_pointer_cast<MockSourceOperator>(probe_operator->child());
+        ASSERT_TRUE(source_operator);
+
+        bool eos = false;
+        Block tmp_output_block;
+        MutableBlock output_block_mutable;
+        for (auto& probe_block : probe_blocks) {
+            source_operator->set_block(std::move(probe_block));
+            st = 
probe_operator->get_block_after_projects(_helper.runtime_state.get(),
+                                                          &tmp_output_block, 
&eos);
+            ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+            if (tmp_output_block.empty()) {
+                continue;
+            }
+
+            st = output_block_mutable.merge(std::move(tmp_output_block));
+            tmp_output_block.clear();
+            ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string();
+        }
+
+        if (eos) {
+            return;
+        }
+
+        source_operator->set_eos();
+
+        st = probe_operator->get_block(_helper.runtime_state.get(), 
&tmp_output_block, &eos);
+        ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+        ASSERT_TRUE(eos);
+
+        if (!tmp_output_block.empty()) {
+            st = output_block_mutable.merge(std::move(tmp_output_block));
+            ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string();
+        }
+
+        output_block = output_block_mutable.to_block();
+    }
+
+protected:
+    HashJoinTestHelper _helper;
+};
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING}, {true, false},
+             {false, true}, build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 2);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+    check_column_values(*sorted_block.get_by_position(2).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyBuildSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
 {}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING}, {true, false},
+             {false, true}, build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyProbeSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+    auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({}, 
{});
+    
probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING}, {true, false},
+             {false, true}, build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinOtherConjuncts) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 0, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+    
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::INNER_JOIN, .has_other_join_conjuncts = 
true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 2);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 3});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "c"});
+    check_column_values(*sorted_block.get_by_position(2).column, {101, 102});
+    check_column_values(*sorted_block.get_by_position(3).column, {1, 3});
+    check_column_values(*sorted_block.get_by_position(4).column, {"a", "c"});
+    check_column_values(*sorted_block.get_by_position(5).column, {51, 59});
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinNullSafeEqual) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 0});
+    
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::INNER_JOIN, .null_safe_equal = true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, true}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 3);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4, 5});
+    check_column_values(*sorted_block.get_by_position(1).column, {"c", "d", 
Null()});
+    check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 5});
+    check_column_values(*sorted_block.get_by_position(3).column, {"c", "d", 
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, CheckSlot) {
+    auto tnode = _helper.create_test_plan_node(TJoinOp::INNER_JOIN,
+                                               {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+                                               {true, false}, {false, true}, 
false);
+
+    auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+    ASSERT_TRUE(probe_operator);
+    ASSERT_TRUE(sink_operator);
+
+    auto desc_tbl = _helper.runtime_state->desc_tbl();
+    desc_tbl._slot_desc_map[4]->_is_nullable = 
!desc_tbl._slot_desc_map[4]->_is_nullable;
+    _helper.runtime_state->set_desc_tbl(&desc_tbl);
+
+    auto st = probe_operator->init(tnode, _helper.runtime_state.get());
+    ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+    st = probe_operator->prepare(_helper.runtime_state.get());
+    ASSERT_FALSE(st.ok());
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinBroadcast) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::INNER_JOIN, .is_broadcast_join = true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 2);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+    check_column_values(*sorted_block.get_by_position(2).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(output_block.rows(), 8);
+
+    check_column_values(*sorted_block.get_by_position(0).column,
+                        {1, 3, 4, Null(), Null(), Null(), Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column,
+                        {"a", "c", "d", "b", "e", Null(), Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(2).column,
+                        {Null(), 3, 4, Null(), Null(), 1, 2, 5});
+    check_column_values(*sorted_block.get_by_position(3).column,
+                        {Null(), "c", "d", Null(), Null(), Null(), "b", 
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyBuildSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
 {}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(output_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyProbeSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({}, 
{});
+    
probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(output_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin2) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1, 
2, 3, 4, 5, 2, 3},
+                                                                          {0, 
1, 0, 0, 1, 0, 0});
+    probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e", "b", "c"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 7);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 3, 
4, Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column,
+                        {"a", "b", "c", "c", "d", "b", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column,
+                        {Null(), 2, 3, 3, 4, Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(3).column,
+                        {Null(), "b", "c", "c", "d", Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 5);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4, 
Null(), Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column,
+                        {"c", "d", Null(), Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 1, 2, 
5});
+    check_column_values(*sorted_block.get_by_position(3).column, {"c", "d", 
Null(), "b", Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoinEmptyBuildSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
 {}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoin2) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1, 
2, 3, 4, 5, 2, 3},
+                                                                          {0, 
1, 0, 0, 1, 0, 0});
+    probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e", "b", "c"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 6);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {2, 3, 3, 4, 
Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column,
+                        {"b", "c", "c", "d", Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(2).column, {2, 3, 3, 4, 
1, 5});
+    check_column_values(*sorted_block.get_by_position(3).column,
+                        {"b", "c", "c", "d", Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 3);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, Null(), 
Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", 
"e"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 2);
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinEmptyBuildSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
 {}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightAntiJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::RIGHT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 3);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 5});
+    check_column_values(*sorted_block.get_by_position(1).column, {Null(), "b", 
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightSemiJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::RIGHT_SEMI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 2);
+
+    check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+    check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"f", "g", "h", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinEmptyBuildSide) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
 {}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+    
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, 
.has_other_join_conjuncts = true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin2) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 1);
+    check_column_values(*sorted_block.get_by_position(0).column, {2});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts2) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+    
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, 
.has_other_join_conjuncts = true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin2) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    auto sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    ASSERT_EQ(sorted_block.rows(), 3);
+    check_column_values(*sorted_block.get_by_position(0).column, {2, Null(), 
Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", 
"e"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinMark) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 5);
+
+    Block sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, 
Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", 
"d", "b", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column, {Null(), 0, 
0, Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftSemiJoinMark) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN, true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 5);
+
+    Block sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, 
Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", 
"d", "b", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column, {Null(), 1, 
1, Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinMark) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_SEMI_JOIN, true, 1}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    ASSERT_EQ(output_block.rows(), 5);
+
+    Block sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << 
std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, 
Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", 
"d", "b", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column, {0, 1, 1, 
Null(), 0});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMark) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 1, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({TJoinOp::LEFT_ANTI_JOIN, true, 1}, {TPrimitiveType::INT, 
TPrimitiveType::STRING},
+             {true, false}, {false, true}, build_blocks, probe_blocks, 
output_block);
+
+    ASSERT_EQ(output_block.rows(), 5);
+
+    Block sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << 
std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4, 
Null(), Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "c", 
"d", "b", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column, {1, 0, 0, 
Null(), 1});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) {
+    auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4, 
5});
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+            {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+    
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+    auto probe_block =
+            ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4, 
5}, {0, 0, 0, 0, 1});
+    probe_block.insert(
+            ColumnHelper::create_column_with_name<DataTypeString>({"a", "b", 
"c", "d", "e"}));
+    
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+            {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+    Block output_block;
+    std::vector<Block> build_blocks = {sink_block};
+    std::vector<Block> probe_blocks = {probe_block};
+    run_test({.join_op_type = TJoinOp::LEFT_ANTI_JOIN,
+              .is_mark_join = true,
+              .mark_join_conjuncts_size = 1,
+              .has_other_join_conjuncts = true},
+             {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false}, 
{false, true},
+             build_blocks, probe_blocks, output_block);
+
+    ASSERT_EQ(output_block.rows(), 5);
+
+    Block sorted_block = sort_block_by_columns(output_block);
+    std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) << 
std::endl;
+
+    check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 4, 
Null()});
+    check_column_values(*sorted_block.get_by_position(1).column, {"a", "b", 
"c", "d", "e"});
+    check_column_values(*sorted_block.get_by_position(2).column, {101, 100, 
102, 99, Null()});
+    check_column_values(*sorted_block.get_by_position(3).column, {0, 1, 0, 1, 
1});
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/join_test_helper.cpp 
b/be/test/pipeline/operator/join_test_helper.cpp
new file mode 100644
index 00000000000..d6828621590
--- /dev/null
+++ b/be/test/pipeline/operator/join_test_helper.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "join_test_helper.h"
+
+#include "testutil/creators.h"
+
+namespace doris::pipeline {
+
+void JoinTestHelper::SetUp() {
+    runtime_state = std::make_unique<MockRuntimeState>();
+    obj_pool = std::make_unique<ObjectPool>();
+
+    runtime_profile = std::make_shared<RuntimeProfile>("test");
+
+    query_ctx = generate_one_query();
+
+    runtime_state->_query_ctx = query_ctx.get();
+    runtime_state->_query_id = query_ctx->query_id();
+    runtime_state->resize_op_id_to_local_state(-100);
+    runtime_state->set_max_operator_id(-100);
+
+    ADD_TIMER(runtime_profile.get(), "ExecTime");
+    runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 
0);
+}
+
+void JoinTestHelper::TearDown() {}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/join_test_helper.h 
b/be/test/pipeline/operator/join_test_helper.h
new file mode 100644
index 00000000000..b94b99c8aa2
--- /dev/null
+++ b/be/test/pipeline/operator/join_test_helper.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class JoinTestHelper {
+public:
+    virtual ~JoinTestHelper() = default;
+    void SetUp();
+    void TearDown();
+
+    // virtual TPlanNode create_test_plan_node() = 0;
+    // virtual TDescriptorTable create_test_table_descriptor(bool nullable) = 
0;
+
+    std::unique_ptr<MockRuntimeState> runtime_state;
+    std::unique_ptr<ObjectPool> obj_pool;
+    std::shared_ptr<QueryContext> query_ctx;
+    std::shared_ptr<RuntimeProfile> runtime_profile;
+    std::shared_ptr<PipelineTask> pipeline_task;
+    DescriptorTbl* desc_tbl;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/mock/mock_operators.h 
b/be/test/testutil/mock/mock_operators.h
index 58b1a738926..0c6136880ad 100644
--- a/be/test/testutil/mock/mock_operators.h
+++ b/be/test/testutil/mock/mock_operators.h
@@ -49,6 +49,13 @@ private:
     bool _eos = false;
 };
 
+class MockSourceOperator : public MockChildOperator {
+public:
+    bool is_source() const override { return true; }
+
+    Status close(RuntimeState* state) override { return Status::OK(); }
+};
+
 class MockSinkOperator final : public DataSinkOperatorXBase {
 public:
     Status sink(RuntimeState* state, vectorized::Block* block, bool eos) 
override {
diff --git a/be/test/testutil/mock/mock_runtime_state.h 
b/be/test/testutil/mock/mock_runtime_state.h
index 2b3888bf518..0831ba80404 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -41,12 +41,17 @@ public:
         return _enable_shared_exchange_sink_buffer;
     }
 
+    bool enable_share_hash_table_for_broadcast_join() const override {
+        return _enable_share_hash_table_for_broadcast_join;
+    }
+
     bool enable_local_exchange() const override { return true; }
     WorkloadGroupPtr workload_group() override { return _workload_group; }
 
     // default batch size
     int batsh_size = 4096;
     bool _enable_shared_exchange_sink_buffer = true;
+    bool _enable_share_hash_table_for_broadcast_join = true;
     std::shared_ptr<MockContext> _mock_context = 
std::make_shared<MockContext>();
     std::shared_ptr<MockQueryContext> _query_ctx_uptr = 
std::make_shared<MockQueryContext>();
     WorkloadGroupPtr _workload_group = nullptr;
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index ac43c7ace14..a9688a3656d 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -20,6 +20,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "common/phdr_cache.h"
 #include "common/status.h"
 #include "gtest/gtest.h"
 #include "olap/page_cache.h"
@@ -97,8 +98,23 @@ int main(int argc, char** argv) {
     doris::ExecEnv::GetInstance()->set_tracking_memory(false);
 
     google::ParseCommandLineFlags(&argc, &argv, false);
-    int res = RUN_ALL_TESTS();
 
-    doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
-    return res;
+    updatePHDRCache();
+    try {
+        int res = RUN_ALL_TESTS();
+        
doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
+        return res;
+    } catch (doris::Exception& e) {
+        LOG(FATAL) << "Exception: " << e.what();
+    } catch (...) {
+        auto eptr = std::current_exception();
+        try {
+            std::rethrow_exception(eptr);
+        } catch (const std::exception& e) {
+            LOG(FATAL) << "Unknown exception: " << e.what();
+        } catch (...) {
+            LOG(FATAL) << "Unknown exception";
+        }
+        return -1;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to