This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 65a32f76b05 [Feature](join) Support lazy materialization of columns 
that are not used in join other conjunct (#49073)
65a32f76b05 is described below

commit 65a32f76b053f1f345f8e53e413152728ac1eb90
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Mar 25 11:57:59 2025 +0800

    [Feature](join) Support lazy materialization of columns that are not used 
in join other conjunct (#49073)
    
    ### What problem does this PR solve?
    1. Support lazy materialization of columns that are not used in join
    other conjunct
    2. Simplify some code
    
    before:
    ```
    HASH_JOIN_OPERATOR  (id=6  ,  nereids_id=1089):(ExecTime:  10sec315ms)
                              -  BlocksProduced:  90.294K  (90294)
                              -  ExecTime:  10sec315ms
                              -  NonEqualJoinConjunctEvaluationTime:  2sec454ms
                              -  ProbeRows:  1.0M  (1000000)
                              -  ProbeWhenBuildSideOutputTime:  2sec223ms
                              -  ProbeWhenProbeSideOutputTime:  3sec33ms
                              -  ProbeWhenSearchHashTableTime:  1sec877ms
                              -  ProjectionTime:  155.383ms
                              -  RowsProduced:  1.0M  (1000000)
    ```
    after:
    ```
    HASH_JOIN_OPERATOR  (id=6  ,  nereids_id=1108):(ExecTime:  5sec669ms)
                            -  BlocksProduced:  90.294K  (90294)
                            -  ExecTime:  5sec669ms
                            -  NonEqualJoinConjunctEvaluationTime:  2sec111ms
                            -  ProbeRows:  1.0M  (1000000)
                            -  ProbeWhenBuildSideOutputTime:  689.966ms
                            -  ProbeWhenProbeSideOutputTime:  350.553ms
                            -  ProbeWhenSearchHashTableTime:  1sec883ms
                            -  ProjectionTime:  150.629ms
                            -  RowsProduced:  1.0M  (1000000)
    ```
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [x] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [x] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [x] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [x] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  40 +---
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  16 +-
 .../pipeline/exec/join/process_hash_table_probe.h  |  41 ++--
 .../exec/join/process_hash_table_probe_impl.h      | 250 +++++++++++++--------
 be/src/vec/common/hash_table/join_hash_table.h     |   4 +-
 be/src/vec/exprs/vexpr.h                           |   6 +
 be/src/vec/exprs/vslot_ref.h                       |   4 +
 7 files changed, 196 insertions(+), 165 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 7f273680f6d..a432b59cfa2 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -93,34 +93,6 @@ void HashJoinProbeLocalState::prepare_for_next() {
     _prepare_probe_block();
 }
 
-bool HashJoinProbeLocalState::have_other_join_conjunct() const {
-    return _parent->cast<HashJoinProbeOperatorX>()._have_other_join_conjunct;
-}
-
-bool HashJoinProbeLocalState::is_right_semi_anti() const {
-    return _parent->cast<HashJoinProbeOperatorX>()._is_right_semi_anti;
-}
-
-bool HashJoinProbeLocalState::is_outer_join() const {
-    return _parent->cast<HashJoinProbeOperatorX>()._is_outer_join;
-}
-
-std::vector<bool>* HashJoinProbeLocalState::left_output_slot_flags() {
-    return &_parent->cast<HashJoinProbeOperatorX>()._left_output_slot_flags;
-}
-
-std::vector<bool>* HashJoinProbeLocalState::right_output_slot_flags() {
-    return &_parent->cast<HashJoinProbeOperatorX>()._right_output_slot_flags;
-}
-
-vectorized::DataTypes HashJoinProbeLocalState::right_table_data_types() {
-    return _parent->cast<HashJoinProbeOperatorX>()._right_table_data_types;
-}
-
-vectorized::DataTypes HashJoinProbeLocalState::left_table_data_types() {
-    return _parent->cast<HashJoinProbeOperatorX>()._left_table_data_types;
-}
-
 Status HashJoinProbeLocalState::close(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_close_timer);
@@ -133,12 +105,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) 
{
                                              if (process_hashtable_ctx._arena) 
{
                                                  
process_hashtable_ctx._arena.reset();
                                              }
-
-                                             if 
(process_hashtable_ctx._serialize_key_arena) {
-                                                 
process_hashtable_ctx._serialize_key_arena.reset();
-                                                 
process_hashtable_ctx._serialized_key_buffer_size =
-                                                         0;
-                                             }
                                          }},
                    *_process_hashtable_ctx_variants);
     }
@@ -259,7 +225,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                                             : nullptr,
                                     mutable_join_block, &temp_block,
                                     
cast_set<uint32_t>(local_state._probe_block.rows()),
-                                    _is_mark_join, _have_other_join_conjunct);
+                                    _is_mark_join);
                         } else {
                             st = Status::InternalError("uninited hash table");
                         }
@@ -522,6 +488,10 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* 
state) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
     }
 
+    for (auto conjunct : _other_join_conjuncts) {
+        
conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids);
+    }
+
     for (auto& conjunct : _mark_join_conjuncts) {
         RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
     }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index f3f1983975e..6fdf77e250d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -61,14 +61,7 @@ public:
                                         bool* eos, vectorized::Block* 
temp_block,
                                         bool check_rows_count = true);
 
-    bool have_other_join_conjunct() const;
-    bool is_right_semi_anti() const;
-    bool is_outer_join() const;
-    std::vector<bool>* left_output_slot_flags();
-    std::vector<bool>* right_output_slot_flags();
-    vectorized::DataTypes right_table_data_types();
-    vectorized::DataTypes left_table_data_types();
-    bool* has_null_in_build_side() { return 
&_shared_state->_has_null_in_build_side; }
+    bool has_null_in_build_side() { return 
_shared_state->_has_null_in_build_side; }
     const std::shared_ptr<vectorized::Block>& build_block() const {
         return _shared_state->build_block;
     }
@@ -161,11 +154,17 @@ public:
 
     bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
 
+    bool is_lazy_materialized_column(int column_id) const {
+        return _have_other_join_conjunct && 
!_other_conjunct_refer_column_ids.contains(column_id);
+    }
+
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer,
                         std::vector<int>& res_col_ids) const;
     friend class HashJoinProbeLocalState;
+    template <int JoinOpType>
+    friend struct ProcessHashTableProbe;
 
     const TJoinDistributionType::type _join_distribution;
 
@@ -184,6 +183,7 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
     bool _need_finalize_variant_column = false;
+    std::set<int> _other_conjunct_refer_column_ids;
     std::vector<std::string> _right_table_column_names;
     const std::vector<TExpr> _partition_exprs;
 };
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 8cfcfc75e1b..0019b180de3 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -22,7 +22,6 @@
 #include "vec/columns/column.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/arena.h"
-#include "vec/common/hash_table/join_hash_table.h"
 
 namespace doris {
 namespace vectorized {
@@ -33,12 +32,13 @@ struct HashJoinProbeContext;
 namespace pipeline {
 
 class HashJoinProbeLocalState;
+class HashJoinProbeOperatorX;
 
 using MutableColumnPtr = vectorized::IColumn::MutablePtr;
 using MutableColumns = std::vector<vectorized::MutableColumnPtr>;
 
 using NullMap = vectorized::ColumnUInt8::Container;
-using ConstNullMapPtr = const vectorized::NullMap*;
+using ConstNullMapPtr = const NullMap*;
 
 template <int JoinOpType>
 struct ProcessHashTableProbe {
@@ -46,24 +46,19 @@ struct ProcessHashTableProbe {
     ~ProcessHashTableProbe() = default;
 
     // output build side result column
-    void build_side_output_column(vectorized::MutableColumns& mcol,
-                                  const std::vector<bool>& output_slot_flags, 
int size,
-                                  bool have_other_join_conjunct, bool 
is_mark_join);
+    void build_side_output_column(vectorized::MutableColumns& mcol, int size, 
bool is_mark_join);
 
-    void probe_side_output_column(vectorized::MutableColumns& mcol,
-                                  const std::vector<bool>& output_slot_flags, 
int size,
-                                  bool all_match_one, bool 
have_other_join_conjunct);
+    void probe_side_output_column(vectorized::MutableColumns& mcol, int size, 
bool all_match_one);
 
     template <typename HashTableType>
     Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
                    vectorized::MutableBlock& mutable_block, vectorized::Block* 
output_block,
-                   uint32_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);
+                   uint32_t probe_rows, bool is_mark_join);
 
     // Only process the join with no other join conjunct, because of no other 
join conjunt
     // the output block struct is same with mutable block. we can do more opt 
on it and simplify
     // the logic of probe
-    // TODO: opt the visited here to reduce the size of hash table
-    template <typename HashTableType, bool with_other_conjuncts, bool 
is_mark_join>
+    template <typename HashTableType, bool is_mark_join>
     Status do_process(HashTableType& hash_table_ctx, const uint8_t* null_map,
                       vectorized::MutableBlock& mutable_block, 
vectorized::Block* output_block,
                       uint32_t probe_rows);
@@ -74,12 +69,13 @@ struct ProcessHashTableProbe {
     Status do_other_join_conjuncts(vectorized::Block* output_block, 
DorisVector<uint8_t>& visited,
                                    bool has_null_in_build_side);
 
-    template <bool with_other_conjuncts>
     Status do_mark_join_conjuncts(vectorized::Block* output_block, const 
uint8_t* null_map);
 
+    Status finalize_block_with_filter(vectorized::Block* output_block, size_t 
filter_column_id,
+                                      size_t column_to_keep);
+
     template <typename HashTableType>
     typename HashTableType::State _init_probe_side(HashTableType& 
hash_table_ctx, size_t probe_rows,
-                                                   bool 
with_other_join_conjuncts,
                                                    const uint8_t* null_map);
 
     // Process full outer join/ right join / right semi/anti join to output 
the join result
@@ -93,15 +89,17 @@ struct ProcessHashTableProbe {
     uint32_t _process_probe_null_key(uint32_t probe_idx);
 
     pipeline::HashJoinProbeLocalState* _parent = nullptr;
+    pipeline::HashJoinProbeOperatorX* _parent_operator = nullptr;
+
     const int _batch_size;
     const std::shared_ptr<vectorized::Block>& _build_block;
     std::unique_ptr<vectorized::Arena> _arena;
-    std::vector<StringRef> _probe_keys;
 
-    std::vector<uint32_t> _probe_indexs;
+    vectorized::ColumnVector<uint32_t> _probe_indexs;
+    vectorized::ColumnVector<uint32_t> _output_row_indexs;
     bool _probe_visited = false;
     bool _picking_null_keys = false;
-    std::vector<uint32_t> _build_indexs;
+    vectorized::ColumnVector<uint32_t> _build_indexs;
     std::vector<uint8_t> _null_flags;
 
     /// If the probe key of one row on left side is null,
@@ -111,19 +109,14 @@ struct ProcessHashTableProbe {
 
     std::vector<int> _build_blocks_locs;
 
-    size_t _serialized_key_buffer_size {0};
-    uint8_t* _serialized_key_buffer = nullptr;
-    std::unique_ptr<vectorized::Arena> _serialize_key_arena;
     std::vector<char> _probe_side_find_result;
 
-    bool _have_other_join_conjunct;
-    bool _is_right_semi_anti;
-    std::vector<bool>* _left_output_slot_flags = nullptr;
-    std::vector<bool>* _right_output_slot_flags = nullptr;
+    const bool _have_other_join_conjunct;
+    const std::vector<bool>& _left_output_slot_flags;
+    const std::vector<bool>& _right_output_slot_flags;
     // nullable column but not has null except first row
     std::vector<bool> _build_column_has_null;
     bool _need_calculate_build_index_has_zero = true;
-    bool* _has_null_in_build_side;
 
     RuntimeProfile::Counter* _search_hashtable_timer = nullptr;
     RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index b06e7c7c388..4c46d581bc4 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -25,10 +25,10 @@
 #include "process_hash_table_probe.h"
 #include "runtime/thread_context.h" // IWYU pragma: keep
 #include "util/simd/bits.h"
+#include "vec/columns/column_const.h"
 #include "vec/columns/column_filter_helper.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/columns_number.h"
-#include "vec/common/hash_table/join_hash_table.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::pipeline {
@@ -37,44 +37,42 @@ template <int JoinOpType>
 
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState*
 parent,
                                                          int batch_size)
         : _parent(parent),
+          _parent_operator(&parent->_parent->template 
cast<HashJoinProbeOperatorX>()),
           _batch_size(batch_size),
           _build_block(parent->build_block()),
-          _have_other_join_conjunct(parent->have_other_join_conjunct()),
-          _is_right_semi_anti(parent->is_right_semi_anti()),
-          _left_output_slot_flags(parent->left_output_slot_flags()),
-          _right_output_slot_flags(parent->right_output_slot_flags()),
-          _has_null_in_build_side(parent->has_null_in_build_side()),
+          
_have_other_join_conjunct(_parent_operator->_have_other_join_conjunct),
+          _left_output_slot_flags(_parent_operator->_left_output_slot_flags),
+          _right_output_slot_flags(_parent_operator->_right_output_slot_flags),
           _search_hashtable_timer(parent->_search_hashtable_timer),
           _init_probe_side_timer(parent->_init_probe_side_timer),
           _build_side_output_timer(parent->_build_side_output_timer),
           _probe_side_output_timer(parent->_probe_side_output_timer),
           _finish_probe_phase_timer(parent->_finish_probe_phase_timer),
-          _right_col_idx((_is_right_semi_anti && !_have_other_join_conjunct)
+          _right_col_idx((_parent_operator->_is_right_semi_anti && 
!_have_other_join_conjunct)
                                  ? 0
-                                 : _parent->left_table_data_types().size()),
-          _right_col_len(_parent->right_table_data_types().size()) {}
+                                 : 
_parent_operator->_left_table_data_types.size()),
+          _right_col_len(_parent_operator->_right_table_data_types.size()) {}
 
 template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
-        vectorized::MutableColumns& mcol, const std::vector<bool>& 
output_slot_flags, int size,
-        bool have_other_join_conjunct, bool is_mark_join) {
+void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(vectorized::MutableColumns&
 mcol,
+                                                                 int size, 
bool is_mark_join) {
     SCOPED_TIMER(_build_side_output_timer);
 
     // indicates whether build_indexs contain 0
     bool build_index_has_zero =
             (JoinOpType != TJoinOp::INNER_JOIN && JoinOpType != 
TJoinOp::RIGHT_OUTER_JOIN) ||
-            have_other_join_conjunct || is_mark_join;
+            _have_other_join_conjunct || is_mark_join;
     if (!size) {
         return;
     }
 
     if (!build_index_has_zero && _build_column_has_null.empty()) {
         _need_calculate_build_index_has_zero = false;
-        _build_column_has_null.resize(output_slot_flags.size());
+        _build_column_has_null.resize(_right_output_slot_flags.size());
         for (int i = 0; i < _right_col_len; i++) {
             const auto& column = *_build_block->safe_get_by_position(i).column;
             _build_column_has_null[i] = false;
-            if (output_slot_flags[i] && column.is_nullable()) {
+            if (_right_output_slot_flags[i] && column.is_nullable()) {
                 const auto& nullable = assert_cast<const 
vectorized::ColumnNullable&>(column);
                 _build_column_has_null[i] = !simd::contain_byte(
                         nullable.get_null_map_data().data() + 1, 
nullable.size() - 1, 1);
@@ -83,16 +81,18 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
         }
     }
 
-    for (size_t i = 0; i < _right_col_len && i + _right_col_idx < mcol.size(); 
i++) {
+    for (int i = 0; i < _right_col_len && i + _right_col_idx < mcol.size(); 
i++) {
         const auto& column = *_build_block->safe_get_by_position(i).column;
-        if (output_slot_flags[i]) {
+        if (_right_output_slot_flags[i] &&
+            !_parent_operator->is_lazy_materialized_column(i + 
(int)_right_col_idx)) {
             if (!build_index_has_zero && _build_column_has_null[i]) {
                 assert_cast<vectorized::ColumnNullable*>(mcol[i + 
_right_col_idx].get())
-                        ->insert_indices_from_not_has_null(column, 
_build_indexs.data(),
-                                                           
_build_indexs.data() + size);
+                        ->insert_indices_from_not_has_null(column, 
_build_indexs.get_data().data(),
+                                                           
_build_indexs.get_data().data() + size);
             } else {
-                mcol[i + _right_col_idx]->insert_indices_from(column, 
_build_indexs.data(),
-                                                              
_build_indexs.data() + size);
+                mcol[i + _right_col_idx]->insert_indices_from(
+                        column, _build_indexs.get_data().data(),
+                        _build_indexs.get_data().data() + size);
             }
         } else if (i + _right_col_idx != _parent->_mark_column_id) {
             mcol[i + _right_col_idx]->insert_default();
@@ -115,23 +115,25 @@ void 
ProcessHashTableProbe<JoinOpType>::build_side_output_column(
 }
 
 template <int JoinOpType>
-void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
-        vectorized::MutableColumns& mcol, const std::vector<bool>& 
output_slot_flags, int size,
-        bool all_match_one, bool have_other_join_conjunct) {
+void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(vectorized::MutableColumns&
 mcol,
+                                                                 int size, 
bool all_match_one) {
     SCOPED_TIMER(_probe_side_output_timer);
     auto& probe_block = _parent->_probe_block;
-    for (int i = 0; i < output_slot_flags.size(); ++i) {
-        if (output_slot_flags[i]) {
-            if (auto& p = _parent->parent()->cast<HashJoinProbeOperatorX>();
-                p.need_finalize_variant_column()) {
+
+    for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+        if (_left_output_slot_flags[i]) {
+            if (_parent_operator->need_finalize_variant_column()) {
                 
std::move(*probe_block.get_by_position(i).column).mutate()->finalize();
             }
+        }
+
+        if (_left_output_slot_flags[i] && 
!_parent_operator->is_lazy_materialized_column(i)) {
             auto& column = probe_block.get_by_position(i).column;
             if (all_match_one) {
-                mcol[i]->insert_range_from(*column, _probe_indexs[0], size);
+                mcol[i]->insert_range_from(*column, 
_probe_indexs.get_element(0), size);
             } else {
-                mcol[i]->insert_indices_from(*column, _probe_indexs.data(),
-                                             _probe_indexs.data() + size);
+                mcol[i]->insert_indices_from(*column, 
_probe_indexs.get_data().data(),
+                                             _probe_indexs.get_data().data() + 
size);
             }
         } else {
             mcol[i]->insert_default();
@@ -143,14 +145,13 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
 template <int JoinOpType>
 template <typename HashTableType>
 typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_side(
-        HashTableType& hash_table_ctx, size_t probe_rows, bool 
with_other_join_conjuncts,
-        const uint8_t* null_map) {
+        HashTableType& hash_table_ctx, size_t probe_rows, const uint8_t* 
null_map) {
     // may over batch size 1 for some outer join case
     _probe_indexs.resize(_batch_size + 1);
     _build_indexs.resize(_batch_size + 1);
     if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
          JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
-        with_other_join_conjuncts) {
+        _have_other_join_conjunct) {
         _null_flags.resize(_batch_size + 1);
     }
 
@@ -174,7 +175,7 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
 }
 
 template <int JoinOpType>
-template <typename HashTableType, bool with_other_conjuncts, bool is_mark_join>
+template <typename HashTableType, bool is_mark_join>
 Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& 
hash_table_ctx,
                                                      const uint8_t* null_map,
                                                      vectorized::MutableBlock& 
mutable_block,
@@ -188,7 +189,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     auto& build_index = _parent->_build_index;
     {
         SCOPED_TIMER(_init_probe_side_timer);
-        _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, 
with_other_conjuncts, null_map);
+        _init_probe_side<HashTableType>(hash_table_ctx, probe_rows, null_map);
     }
 
     auto& mcol = mutable_block.mutable_columns();
@@ -196,7 +197,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
     uint32_t current_offset = 0;
     if ((JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
          JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
-        with_other_conjuncts) {
+        _have_other_join_conjunct) {
         SCOPED_TIMER(_search_hashtable_timer);
 
         /// If `_build_index_for_null_probe_key` is not zero, it means we are 
in progress of handling probe null key.
@@ -214,8 +215,9 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
             auto [new_probe_idx, new_build_idx, new_current_offset, 
picking_null_keys] =
                     
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
                             hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
-                            build_index, probe_rows, _probe_indexs.data(), 
_build_indexs.data(),
-                            _null_flags.data(), _picking_null_keys, null_map);
+                            build_index, probe_rows, 
_probe_indexs.get_data().data(),
+                            _build_indexs.get_data().data(), 
_null_flags.data(), _picking_null_keys,
+                            null_map);
             probe_index = new_probe_idx;
             build_index = new_build_idx;
             current_offset = new_current_offset;
@@ -237,20 +239,20 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
         auto [new_probe_idx, new_build_idx, new_current_offset] =
                 hash_table_ctx.hash_table->template find_batch<JoinOpType>(
                         hash_table_ctx.keys, 
hash_table_ctx.bucket_nums.data(), probe_index,
-                        build_index, cast_set<int32_t>(probe_rows), 
_probe_indexs.data(),
-                        _probe_visited, _build_indexs.data(), null_map, 
with_other_conjuncts,
-                        is_mark_join, !_parent->_mark_join_conjuncts.empty());
+                        build_index, cast_set<int32_t>(probe_rows), 
_probe_indexs.get_data().data(),
+                        _probe_visited, _build_indexs.get_data().data(), 
null_map,
+                        _have_other_join_conjunct, is_mark_join,
+                        !_parent->_mark_join_conjuncts.empty());
         probe_index = new_probe_idx;
         build_index = new_build_idx;
         current_offset = new_current_offset;
     }
 
-    build_side_output_column(mcol, *_right_output_slot_flags, current_offset, 
with_other_conjuncts,
-                             is_mark_join);
+    build_side_output_column(mcol, current_offset, is_mark_join);
 
-    if constexpr (with_other_conjuncts || (JoinOpType != 
TJoinOp::RIGHT_SEMI_JOIN &&
-                                           JoinOpType != 
TJoinOp::RIGHT_ANTI_JOIN)) {
-        auto check_all_match_one = [](const std::vector<uint32_t>& vecs, int 
size) {
+    if (_have_other_join_conjunct ||
+        (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != 
TJoinOp::RIGHT_ANTI_JOIN)) {
+        auto check_all_match_one = [](const auto& vecs, int size) {
             if (!size || vecs[size - 1] != vecs[0] + size - 1) {
                 return false;
             }
@@ -262,9 +264,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
             return true;
         };
 
-        probe_side_output_column(mcol, *_left_output_slot_flags, 
current_offset,
-                                 check_all_match_one(_probe_indexs, 
current_offset),
-                                 with_other_conjuncts);
+        probe_side_output_column(mcol, current_offset,
+                                 check_all_match_one(_probe_indexs.get_data(), 
current_offset));
     }
 
     output_block->swap(mutable_block.to_block());
@@ -275,9 +276,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
                  JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
                 hash_table_ctx.hash_table
                         ->empty_build_side(); // empty build side will return 
false to instead null
-        return do_mark_join_conjuncts<with_other_conjuncts>(output_block,
-                                                            ignore_null_map ? 
nullptr : null_map);
-    } else if constexpr (with_other_conjuncts) {
+        return do_mark_join_conjuncts(output_block, ignore_null_map ? nullptr 
: null_map);
+    } else if (_have_other_join_conjunct) {
         return do_other_join_conjuncts(output_block, 
hash_table_ctx.hash_table->get_visited(),
                                        
hash_table_ctx.hash_table->has_null_key());
     }
@@ -293,15 +293,15 @@ uint32_t 
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
     DCHECK_LT(0, _build_index_for_null_probe_key);
     uint32_t matched_cnt = 0;
     for (; _build_index_for_null_probe_key < rows && matched_cnt < 
_batch_size; ++matched_cnt) {
-        _probe_indexs[matched_cnt] = probe_index;
-        _build_indexs[matched_cnt] = _build_index_for_null_probe_key++;
+        _probe_indexs.get_element(matched_cnt) = probe_index;
+        _build_indexs.get_element(matched_cnt) = 
_build_index_for_null_probe_key++;
         _null_flags[matched_cnt] = 1;
     }
 
     if (_build_index_for_null_probe_key == rows) {
         _build_index_for_null_probe_key = 0;
-        _probe_indexs[matched_cnt] = probe_index;
-        _build_indexs[matched_cnt] = 0;
+        _probe_indexs.get_element(matched_cnt) = probe_index;
+        _build_indexs.get_element(matched_cnt) = 0;
         _null_flags[matched_cnt] = 0;
         matched_cnt++;
     }
@@ -309,6 +309,65 @@ uint32_t 
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
     return matched_cnt;
 }
 
+template <int JoinOpType>
+Status ProcessHashTableProbe<JoinOpType>::finalize_block_with_filter(
+        vectorized::Block* output_block, size_t filter_column_id, size_t 
column_to_keep) {
+    vectorized::ColumnPtr filter_ptr = 
output_block->get_by_position(filter_column_id).column;
+    RETURN_IF_ERROR(
+            vectorized::Block::filter_block(output_block, filter_column_id, 
column_to_keep));
+
+    auto do_lazy_materialize = [&](const std::vector<bool>& output_slot_flags,
+                                   vectorized::ColumnVector<unsigned int>& 
row_indexs,
+                                   int column_offset, vectorized::Block* 
source_block) {
+        if (!_have_other_join_conjunct) {
+            return;
+        }
+        std::vector<int> column_ids;
+        for (int i = 0; i < output_slot_flags.size(); ++i) {
+            if (output_slot_flags[i] &&
+                _parent_operator->is_lazy_materialized_column(i + 
column_offset)) {
+                column_ids.push_back(i);
+            }
+        }
+        if (column_ids.empty()) {
+            return;
+        }
+        size_t row_count = filter_ptr->size();
+        // input row_indexs's size may bigger than row_count coz 
_init_probe_side
+        row_indexs.resize(row_count);
+
+        bool need_filter =
+                simd::count_zero_num(
+                        (int8_t*)assert_cast<const 
vectorized::ColumnUInt8*>(filter_ptr.get())
+                                ->get_data()
+                                .data(),
+                        row_count) != 0;
+        if (need_filter) {
+            const auto& column_filter =
+                    assert_cast<const 
vectorized::ColumnUInt8*>(filter_ptr.get())->get_data();
+            row_indexs.filter(column_filter);
+        }
+
+        const auto& container = row_indexs.get_data();
+        for (int column_id : column_ids) {
+            int output_column_id = column_id + column_offset;
+            output_block->get_by_position(output_column_id).column =
+                    assert_cast<const vectorized::ColumnConst*>(
+                            
output_block->get_by_position(output_column_id).column.get())
+                            ->get_data_column_ptr();
+
+            auto& src = source_block->get_by_position(column_id).column;
+            auto dst = 
output_block->get_by_position(output_column_id).column->assume_mutable();
+            dst->clear();
+            dst->insert_indices_from(*src, container.data(), container.data() 
+ container.size());
+        }
+    };
+    do_lazy_materialize(_right_output_slot_flags, _build_indexs, 
(int)_right_col_idx,
+                        _build_block.get());
+    do_lazy_materialize(_left_output_slot_flags, _probe_indexs, 0, 
&_parent->_probe_block);
+    return Status::OK();
+}
+
 /**
      * Mark join: there is a column named mark column which stores the result 
of mark join conjunct.
      * For example:
@@ -343,7 +402,6 @@ uint32_t 
ProcessHashTableProbe<JoinOpType>::_process_probe_null_key(uint32_t pro
      * So this query will be a "null aware left anti join", which means the 
equal conjunct's result should be nullable.
      */
 template <int JoinOpType>
-template <bool with_other_conjuncts>
 Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Block* 
output_block,
                                                                  const 
uint8_t* null_map) {
     DCHECK(JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
@@ -377,23 +435,23 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
         // select 4 not in (2, 3, null) => null, select 4 not in (2, 3) => true
         // select 4 in (2, 3, null) => null, select 4 in (2, 3) => false
         for (size_t i = 0; i != row_count; ++i) {
-            mark_filter_data[i] = _build_indexs[i] != 0;
+            mark_filter_data[i] = _build_indexs.get_element(i) != 0;
         }
 
-        if constexpr (with_other_conjuncts) {
+        if (_have_other_join_conjunct) {
             // _null_flags is true means build or probe side of the row is null
             memcpy(mark_null_map, _null_flags.data(), row_count);
         } else {
             if (null_map) {
                 // probe side of the row is null, so the mark sign should also 
be null.
                 for (size_t i = 0; i != row_count; ++i) {
-                    mark_null_map[i] |= null_map[_probe_indexs[i]];
+                    mark_null_map[i] |= null_map[_probe_indexs.get_element(i)];
                 }
             }
-            if (!with_other_conjuncts && *_has_null_in_build_side) {
+            if (!_have_other_join_conjunct && 
_parent->has_null_in_build_side()) {
                 // _has_null_in_build_side will change false to null when row 
not matched
                 for (size_t i = 0; i != row_count; ++i) {
-                    mark_null_map[i] |= _build_indexs[i] == 0;
+                    mark_null_map[i] |= _build_indexs.get_element(i) == 0;
                 }
             }
         }
@@ -401,11 +459,11 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
         // for non null aware join, build_indexs is 0 which means there is no 
match
         // sometimes null will be returned in conjunct, but it should not 
actually be null.
         for (size_t i = 0; i != row_count; ++i) {
-            mark_null_map[i] &= _build_indexs[i] != 0;
+            mark_null_map[i] &= _build_indexs.get_element(i) != 0;
         }
     }
 
-    if constexpr (with_other_conjuncts) {
+    if (_have_other_join_conjunct) {
         vectorized::IColumn::Filter other_conjunct_filter(row_count, 1);
         {
             bool can_be_filter_all = false;
@@ -428,19 +486,20 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
     auto filter_column = vectorized::ColumnUInt8::create(row_count, 0);
     auto* __restrict filter_map = filter_column->get_data().data();
     for (size_t i = 0; i != row_count; ++i) {
-        if (_parent->_last_probe_match == _probe_indexs[i]) {
+        if (_parent->_last_probe_match == _probe_indexs.get_element(i)) {
             continue;
         }
-        if (_build_indexs[i] == 0) {
-            bool has_null_mark_value = _parent->_last_probe_null_mark == 
_probe_indexs[i];
+        if (_build_indexs.get_element(i) == 0) {
+            bool has_null_mark_value =
+                    _parent->_last_probe_null_mark == 
_probe_indexs.get_element(i);
             filter_map[i] = true;
             mark_filter_data[i] = false;
             mark_null_map[i] |= has_null_mark_value;
         } else if (mark_null_map[i]) {
-            _parent->_last_probe_null_mark = _probe_indexs[i];
+            _parent->_last_probe_null_mark = _probe_indexs.get_element(i);
         } else if (mark_filter_data[i]) {
             filter_map[i] = true;
-            _parent->_last_probe_match = _probe_indexs[i];
+            _parent->_last_probe_match = _probe_indexs.get_element(i);
         }
     }
 
@@ -454,7 +513,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_mark_join_conjuncts(vectorized::Blo
     auto result_column_id = output_block->columns();
     output_block->insert(
             {std::move(filter_column), 
std::make_shared<vectorized::DataTypeUInt8>(), ""});
-    return vectorized::Block::filter_block(output_block, result_column_id, 
result_column_id);
+    return finalize_block_with_filter(output_block, result_column_id, 
result_column_id);
 }
 
 template <int JoinOpType>
@@ -495,24 +554,24 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
 
         // process equal-conjuncts-matched tuples that are newly generated
         // in this run if there are any.
-        for (int i = 0; i < row_count; ++i) {
-            bool join_hit = _build_indexs[i];
+        for (size_t i = 0; i < row_count; ++i) {
+            bool join_hit = _build_indexs.get_element(i);
             bool other_hit = filter_column_ptr[i];
 
             if (!join_hit) {
-                filter_map[i] = _parent->_last_probe_match != _probe_indexs[i];
+                filter_map[i] = _parent->_last_probe_match != 
_probe_indexs.get_element(i);
             } else {
                 filter_map[i] = other_hit;
             }
             if (filter_map[i]) {
-                _parent->_last_probe_match = _probe_indexs[i];
+                _parent->_last_probe_match = _probe_indexs.get_element(i);
             }
         }
 
         for (size_t i = 0; i < row_count; ++i) {
             if (filter_map[i]) {
                 if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
-                    visited[_build_indexs[i]] = 1;
+                    visited[_build_indexs.get_element(i)] = 1;
                 }
             }
         }
@@ -524,24 +583,24 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
         auto* __restrict filter_map = new_filter_column->get_data().data();
 
         for (size_t i = 0; i < row_count; ++i) {
-            bool not_matched_before = _parent->_last_probe_match != 
_probe_indexs[i];
+            bool not_matched_before = _parent->_last_probe_match != 
_probe_indexs.get_element(i);
 
             if constexpr (JoinOpType == TJoinOp::LEFT_SEMI_JOIN) {
-                if (_build_indexs[i] == 0) {
+                if (_build_indexs.get_element(i) == 0) {
                     filter_map[i] = false;
                 } else if (filter_column_ptr[i]) {
                     filter_map[i] = not_matched_before;
-                    _parent->_last_probe_match = _probe_indexs[i];
+                    _parent->_last_probe_match = _probe_indexs.get_element(i);
                 } else {
                     filter_map[i] = false;
                 }
             } else {
-                if (_build_indexs[i] == 0) {
+                if (_build_indexs.get_element(i) == 0) {
                     filter_map[i] = not_matched_before;
                 } else {
                     filter_map[i] = false;
                     if (filter_column_ptr[i]) {
-                        _parent->_last_probe_match = _probe_indexs[i];
+                        _parent->_last_probe_match = 
_probe_indexs.get_element(i);
                     }
                 }
             }
@@ -551,11 +610,11 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
     } else if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
                          JoinOpType == TJoinOp::RIGHT_ANTI_JOIN) {
         for (int i = 0; i < row_count; ++i) {
-            visited[_build_indexs[i]] |= filter_column_ptr[i];
+            visited[_build_indexs.get_element(i)] |= filter_column_ptr[i];
         }
     } else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
         for (int i = 0; i < row_count; ++i) {
-            visited[_build_indexs[i]] |= filter_column_ptr[i];
+            visited[_build_indexs.get_element(i)] |= filter_column_ptr[i];
         }
     }
 
@@ -568,8 +627,8 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
                       JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             orig_columns = _right_col_idx;
         }
-        RETURN_IF_ERROR(
-                vectorized::Block::filter_block(output_block, 
result_column_id, orig_columns));
+
+        return finalize_block_with_filter(output_block, result_column_id, 
orig_columns);
     }
 
     return Status::OK();
@@ -602,17 +661,18 @@ Status 
ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
                     mcol.size(), _right_col_len, _right_col_idx);
         }
         for (size_t j = 0; j < _right_col_len; ++j) {
-            if (_right_output_slot_flags->at(j)) {
+            if (_right_output_slot_flags[j]) {
                 const auto& column = 
*_build_block->safe_get_by_position(j).column;
-                mcol[j + _right_col_idx]->insert_indices_from(column, 
_build_indexs.data(),
-                                                              
_build_indexs.data() + block_size);
+                mcol[j + _right_col_idx]->insert_indices_from(
+                        column, _build_indexs.get_data().data(),
+                        _build_indexs.get_data().data() + block_size);
             } else {
                 mcol[j + _right_col_idx]->resize(block_size);
             }
         }
 
         // just resize the left table column in case with other conjunct to 
make block size is not zero
-        if (_is_right_semi_anti && _have_other_join_conjunct) {
+        if (_parent_operator->_is_right_semi_anti && 
_have_other_join_conjunct) {
             for (int i = 0; i < _right_col_idx; ++i) {
                 mcol[i]->resize(block_size);
             }
@@ -638,17 +698,15 @@ Status 
ProcessHashTableProbe<JoinOpType>::process(HashTableType& hash_table_ctx,
                                                   vectorized::ConstNullMapPtr 
null_map,
                                                   vectorized::MutableBlock& 
mutable_block,
                                                   vectorized::Block* 
output_block,
-                                                  uint32_t probe_rows, bool 
is_mark_join,
-                                                  bool 
have_other_join_conjunct) {
+                                                  uint32_t probe_rows, bool 
is_mark_join) {
     Status res;
     std::visit(
-            [&](auto is_mark_join, auto have_other_join_conjunct) {
-                res = do_process<HashTableType, have_other_join_conjunct, 
is_mark_join>(
+            [&](auto is_mark_join) {
+                res = do_process<HashTableType, is_mark_join>(
                         hash_table_ctx, null_map ? null_map->data() : nullptr, 
mutable_block,
                         output_block, probe_rows);
             },
-            vectorized::make_bool_variant(is_mark_join),
-            vectorized::make_bool_variant(have_other_join_conjunct));
+            vectorized::make_bool_variant(is_mark_join));
     return res;
 }
 
@@ -664,7 +722,7 @@ struct ExtractType<T(U)> {
     template Status 
ProcessHashTableProbe<JoinOpType>::process<ExtractType<void(T)>::Type>(        \
             ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::ConstNullMapPtr null_map,     \
             vectorized::MutableBlock & mutable_block, vectorized::Block * 
output_block,            \
-            uint32_t probe_rows, bool is_mark_join, bool 
have_other_join_conjunct);                \
+            uint32_t probe_rows, bool is_mark_join);                           
                    \
     template Status 
ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
             ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::MutableBlock & mutable_block, \
             vectorized::Block * output_block, bool* eos, bool is_mark_join);
diff --git a/be/src/vec/common/hash_table/join_hash_table.h 
b/be/src/vec/common/hash_table/join_hash_table.h
index faccb4136d3..96d2ab2fcb9 100644
--- a/be/src/vec/common/hash_table/join_hash_table.h
+++ b/be/src/vec/common/hash_table/join_hash_table.h
@@ -179,7 +179,7 @@ public:
     }
 
     template <int JoinOpType, bool is_mark_join>
-    bool iterate_map(std::vector<uint32_t>& build_idxs,
+    bool iterate_map(vectorized::ColumnVector<uint32_t>& build_idxs,
                      vectorized::ColumnFilterHelper* mark_column_helper) const 
{
         const auto batch_size = max_batch_size;
         const auto elem_num = visited.size();
@@ -188,7 +188,7 @@ public:
 
         while (count < batch_size && iter_idx < elem_num) {
             const auto matched = visited[iter_idx];
-            build_idxs[count] = iter_idx;
+            build_idxs.get_element(count) = iter_idx;
             if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
                 if constexpr (is_mark_join) {
                     mark_column_helper->insert_value(matched);
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index ea4575b7e61..3b42b4bf250 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -262,6 +262,12 @@ public:
     void set_index_unique_id(uint32_t index_unique_id) { _index_unique_id = 
index_unique_id; }
     uint32_t index_unique_id() const { return _index_unique_id; }
 
+    virtual void collect_slot_column_ids(std::set<int>& column_ids) const {
+        for (auto child : _children) {
+            child->collect_slot_column_ids(column_ids);
+        }
+    }
+
 protected:
     /// Simple debug string that provides no expr subclass-specific information
     std::string debug_string(const std::string& expr_name) const {
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 48f08d1d5e0..145428a1cf3 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -59,6 +59,10 @@ public:
 
     size_t estimate_memory(const size_t rows) override { return 0; }
 
+    void collect_slot_column_ids(std::set<int>& column_ids) const override {
+        column_ids.insert(_column_id);
+    }
+
 private:
     int _slot_id;
     int _column_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to