This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4073aba5ad5 [Improvementation](join) empty_block shall be set true 
when build blo… (#33977)
4073aba5ad5 is described below

commit 4073aba5ad53fa35f2a0992316d59b101870b23c
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Apr 25 11:40:05 2024 +0800

    [Improvementation](join) empty_block shall be set true when build blo… 
(#33977)
    
    empty_block shall be set true when build block only one row
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 17 ++---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   | 80 ++++++++++------------
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  9 +--
 be/src/vec/core/column_with_type_and_name.cpp      | 12 ++--
 .../join/test_half_join_nullable_build_side.out    |  6 ++
 .../join/test_half_join_nullable_build_side.groovy |  4 ++
 6 files changed, 67 insertions(+), 61 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a0d111c63a7..2b2bdad86f7 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -156,21 +156,22 @@ bool HashJoinBuildSinkLocalState::build_unique() const {
 
 void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
+    bool empty_block =
+            !_shared_state->build_block ||
+            !(_shared_state->build_block->rows() > 1); // build size always 
mock a row into block
     _shared_state->short_circuit_for_probe =
             (_shared_state->_has_null_in_build_side &&
              p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!p._is_mark_join) ||
-            (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN 
&&
-             !p._is_mark_join) ||
-            (!_shared_state->build_block && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
-             !p._is_mark_join) ||
-            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_OUTER_JOIN) ||
-            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_SEMI_JOIN) ||
-            (!_shared_state->build_block && p._join_op == 
TJoinOp::RIGHT_ANTI_JOIN);
+            (empty_block && p._join_op == TJoinOp::INNER_JOIN && 
!p._is_mark_join) ||
+            (empty_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && 
!p._is_mark_join) ||
+            (empty_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+            (empty_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+            (empty_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);
 
     //when build table rows is 0 and not have other_join_conjunct and not 
_is_mark_join and join type is one of 
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
     //we could get the result is probe table + null-column(if need output)
     _shared_state->empty_right_table_need_probe_dispose =
-            (!_shared_state->build_block && !p._have_other_join_conjunct && 
!p._is_mark_join) &&
+            (empty_block && !p._have_other_join_conjunct && !p._is_mark_join) 
&&
             (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN ||
              p._join_op == TJoinOp::LEFT_ANTI_JOIN);
 }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index a58ad62211c..fc6f81f4190 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -247,7 +247,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     }
 
     //TODO: this short circuit maybe could refactor, no need to check at here.
-    if (local_state._shared_state->empty_right_table_need_probe_dispose) {
+    if (local_state.empty_right_table_shortcut()) {
         // when build table rows is 0 and not have other_join_conjunct and 
join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
         // we could get the result is probe table + null-column(if need output)
         // If we use a short-circuit strategy, should return block directly by 
add additional null data.
@@ -257,12 +257,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
             return Status::OK();
         }
 
-        vectorized::Block temp_block;
-        //get probe side output column
-        for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
-            temp_block.insert(local_state._probe_block.get_by_position(i));
-        }
-
         //create build side null column, if need output
         for (int i = 0;
              (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < 
_right_output_slot_flags.size(); ++i) {
@@ -273,8 +267,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                     
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
             auto nullable_column = 
vectorized::ColumnNullable::create(std::move(column),
                                                                       
std::move(null_map_column));
-            temp_block.insert({std::move(nullable_column), make_nullable(type),
-                               _right_table_column_names[i]});
+            local_state._probe_block.insert({std::move(nullable_column), 
make_nullable(type),
+                                             _right_table_column_names[i]});
         }
         if (_is_outer_join) {
             reinterpret_cast<vectorized::ColumnUInt8*>(
@@ -290,8 +284,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
         /// No need to check the block size in `_filter_data_and_build_output` 
because here dose not
         /// increase the output rows count(just same as `_probe_block`'s rows 
count).
         RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, 
output_block, eos,
-                                                                 &temp_block, 
false));
-        temp_block.clear();
+                                                                 
&local_state._probe_block, false));
         
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
         return Status::OK();
     }
@@ -374,36 +367,52 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
 }
 
 Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
-                                                     
vectorized::ColumnUInt8::MutablePtr& null_map,
-                                                     
vectorized::ColumnRawPtrs& raw_ptrs,
                                                      const std::vector<int>& 
res_col_ids) {
+    if (empty_right_table_shortcut()) {
+        return Status::OK();
+    }
+
+    _probe_columns.resize(_probe_expr_ctxs.size());
+
+    if (!_has_set_need_null_map_for_probe) {
+        _has_set_need_null_map_for_probe = true;
+        _need_null_map_for_probe = _need_probe_null_map(block, res_col_ids);
+    }
+    if (_need_null_map_for_probe) {
+        if (_null_map_column == nullptr) {
+            _null_map_column = vectorized::ColumnUInt8::create();
+        }
+        _null_map_column->get_data().assign(block.rows(), (uint8_t)0);
+    }
+
     auto& shared_state = *_shared_state;
     auto& p = _parent->cast<HashJoinProbeOperatorX>();
     for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
         if (p._should_convert_to_nullable[i]) {
             _key_columns_holder.emplace_back(
                     
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
-            raw_ptrs[i] = _key_columns_holder.back().get();
+            _probe_columns[i] = _key_columns_holder.back().get();
             continue;
         }
 
         if (shared_state.is_null_safe_eq_join[i]) {
-            raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get();
+            _probe_columns[i] = 
block.get_by_position(res_col_ids[i]).column.get();
         } else {
-            auto column = block.get_by_position(res_col_ids[i]).column.get();
-            if (auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
-                auto& col_nested = nullable->get_nested_column();
-                auto& col_nullmap = nullable->get_null_map_data();
-
-                DCHECK(null_map != nullptr);
-                
vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
+            const auto* column = 
block.get_by_position(res_col_ids[i]).column.get();
+            if (const auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+                const auto& col_nested = nullable->get_nested_column();
+                const auto& col_nullmap = nullable->get_null_map_data();
+
+                DCHECK(_null_map_column != nullptr);
+                
vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(),
+                                                             col_nullmap);
                 if (shared_state.store_null_in_hash_table[i]) {
-                    raw_ptrs[i] = nullable;
+                    _probe_columns[i] = nullable;
                 } else {
-                    raw_ptrs[i] = &col_nested;
+                    _probe_columns[i] = &col_nested;
                 }
             } else {
-                raw_ptrs[i] = column;
+                _probe_columns[i] = column;
             }
         }
     }
@@ -482,10 +491,7 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
     local_state._probe_eos = eos;
     if (input_block->rows() > 0) {
         COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows());
-        int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size();
-        local_state._probe_columns.resize(probe_expr_ctxs_sz);
-
-        std::vector<int> res_col_ids(probe_expr_ctxs_sz);
+        std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
         RETURN_IF_ERROR(_do_evaluate(*input_block, 
local_state._probe_expr_ctxs,
                                      *local_state._probe_expr_call_timer, 
res_col_ids));
         if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
@@ -493,22 +499,8 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* inpu
                     local_state._convert_block_to_null(*input_block);
         }
 
-        // TODO: Now we are not sure whether a column is nullable only by 
ExecNode's `row_desc`
-        //  so we have to initialize this flag by the first probe block.
-        if (!local_state._has_set_need_null_map_for_probe) {
-            local_state._has_set_need_null_map_for_probe = true;
-            local_state._need_null_map_for_probe =
-                    local_state._need_probe_null_map(*input_block, 
res_col_ids);
-        }
-        if (local_state._need_null_map_for_probe) {
-            if (local_state._null_map_column == nullptr) {
-                local_state._null_map_column = 
vectorized::ColumnUInt8::create();
-            }
-            
local_state._null_map_column->get_data().assign(input_block->rows(), 
(uint8_t)0);
-        }
+        RETURN_IF_ERROR(local_state._extract_join_column(*input_block, 
res_col_ids));
 
-        RETURN_IF_ERROR(local_state._extract_join_column(*input_block, 
local_state._null_map_column,
-                                                         
local_state._probe_columns, res_col_ids));
         if (&local_state._probe_block != input_block) {
             input_block->swap(local_state._probe_block);
         }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b4930307bcc..1b45a2a258e 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -94,15 +94,16 @@ public:
     const std::shared_ptr<vectorized::Block>& build_block() const {
         return _shared_state->build_block;
     }
+    bool empty_right_table_shortcut() const {
+        // !Base::_projections.empty() means nereids planner
+        return _shared_state->empty_right_table_need_probe_dispose && 
!Base::_projections.empty();
+    }
 
 private:
     void _prepare_probe_block();
     bool _need_probe_null_map(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
     std::vector<uint16_t> _convert_block_to_null(vectorized::Block& block);
-    Status _extract_join_column(vectorized::Block& block,
-                                vectorized::ColumnUInt8::MutablePtr& null_map,
-                                vectorized::ColumnRawPtrs& raw_ptrs,
-                                const std::vector<int>& res_col_ids);
+    Status _extract_join_column(vectorized::Block& block, const 
std::vector<int>& res_col_ids);
     friend class HashJoinProbeOperatorX;
     template <int JoinOpType, typename Parent>
     friend struct vectorized::ProcessHashTableProbe;
diff --git a/be/src/vec/core/column_with_type_and_name.cpp 
b/be/src/vec/core/column_with_type_and_name.cpp
index cd0f7194004..e93946804ff 100644
--- a/be/src/vec/core/column_with_type_and_name.cpp
+++ b/be/src/vec/core/column_with_type_and_name.cpp
@@ -62,15 +62,17 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& 
out) const {
         out << name;
     }
 
-    if (type)
+    if (type) {
         out << " " << type->get_name();
-    else
+    } else {
         out << " nullptr";
+    }
 
-    if (column)
-        out << ' ' << column->dump_structure();
-    else
+    if (column) {
+        out << ' ' << column->dump_structure() << "(use_count=" << 
column->use_count() << ')';
+    } else {
         out << " nullptr";
+    }
 }
 
 String ColumnWithTypeAndName::dump_structure() const {
diff --git 
a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out 
b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
index 8404bee641f..56c5f6e2229 100644
--- a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
+++ b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out
@@ -134,3 +134,9 @@
 4      \N      \N      \N      \N      \N
 5      1111    1111    3       1111    1111
 
+-- !shortcut --
+1      11      11
+2      111     111
+3      1111    1111
+4      111     111
+
diff --git 
a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
 
b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
index 2bb24309960..230332fdf3b 100644
--- 
a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
+++ 
b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy
@@ -286,4 +286,8 @@ suite("test_half_join_nullable_build_side", "query,p0") {
             left join test_half_join_nullable_build_side_l r on  l.v2 <=> r.v2
         order by 1, 2, 3;
     """
+
+    qt_shortcut """
+    select *         from             test_half_join_nullable_build_side_l l 
left anti join test_half_join_nullable_build_side_r r on  l.v2 <=> r.v2 and 
r.k1=5         order by 1, 2, 3;
+    """
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to