This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cdf5f0fe687 [fix](pipelineX) mark join column should be nullable 
(#25275)
cdf5f0fe687 is described below

commit cdf5f0fe687091087402a4e36c253548e21ac541
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Wed Oct 11 11:35:43 2023 +0800

    [fix](pipelineX) mark join column should be nullable (#25275)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 11 +++++----
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   | 24 +++++++++++--------
 be/src/pipeline/exec/join_build_sink_operator.h    |  2 --
 be/src/pipeline/exec/join_probe_operator.cpp       |  8 +++----
 be/src/pipeline/exec/join_probe_operator.h         |  1 +
 .../exec/nested_loop_join_probe_operator.cpp       | 27 ++++++++--------------
 be/src/pipeline/pipeline_x/dependency.h            |  1 +
 be/src/vec/exec/join/vhash_join_node.cpp           |  3 ++-
 8 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8dd84dfd27b..3b0342a926b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -139,7 +139,8 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
 void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
     auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
     _shared_state->short_circuit_for_probe =
-            (_has_null_in_build_side && p._join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
+            (_shared_state->_has_null_in_build_side &&
+             p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!p._is_mark_join) ||
             (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::INNER_JOIN &&
              !p._is_mark_join) ||
             (_shared_state->build_blocks->empty() && p._join_op == 
TJoinOp::LEFT_SEMI_JOIN &&
@@ -203,7 +204,7 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
                                         has_null_value || 
short_circuit_for_null_in_build_side
                                                 ? &null_map_val->get_data()
                                                 : nullptr,
-                                        &_has_null_in_build_side);
+                                        
&_shared_state->_has_null_in_build_side);
                     }},
             *_shared_state->hash_table_variants,
             vectorized::make_bool_variant(_build_side_ignore_null),
@@ -452,7 +453,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
     // make one block for each 4 gigabytes
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
 
-    if (local_state._has_null_in_build_side) {
+    if (local_state._shared_state->_has_null_in_build_side) {
         // TODO: if _has_null_in_build_side is true we should finish current 
pipeline task.
         DCHECK(state->enable_pipeline_exec());
         return Status::OK();
@@ -538,7 +539,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             _shared_hash_table_context->hash_table_variants =
                     local_state._shared_state->hash_table_variants;
             _shared_hash_table_context->short_circuit_for_null_in_probe_side =
-                    local_state._has_null_in_build_side;
+                    local_state._shared_state->_has_null_in_build_side;
             if (local_state._runtime_filter_slots) {
                 local_state._runtime_filter_slots->copy_to_shared_context(
                         _shared_hash_table_context);
@@ -556,7 +557,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
                 
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
-        local_state._has_null_in_build_side =
+        local_state._shared_state->_has_null_in_build_side =
                 
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
         local_state._shared_state->hash_table_variants =
                 std::static_pointer_cast<vectorized::HashTableVariants>(
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 0a4b528be38..a4a66507085 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -19,6 +19,7 @@
 
 #include <string>
 
+#include "common/logging.h"
 #include "pipeline/exec/operator.h"
 
 namespace doris {
@@ -184,9 +185,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     local_state.init_for_probe(state);
     SCOPED_TIMER(local_state._probe_timer);
     if (local_state._shared_state->short_circuit_for_probe) {
-        /// If `_short_circuit_for_probe` is true, this indicates no rows
-        /// match the join condition, and this is 'mark join', so we need to 
create a column as mark
-        /// with all rows set to 0.
+        // If we use a short-circuit strategy, should return empty block 
directly.
+        source_state = SourceState::FINISHED;
+        return Status::OK();
+    }
+    if (local_state._shared_state->_has_null_in_build_side &&
+        _short_circuit_for_null_in_build_side) {
+        /// `_has_null_in_build_side` means have null value in build side.
+        /// `_short_circuit_for_null_in_build_side` means short circuit if has 
null in build side(e.g. null aware left anti join).
+        /// We need to create a column as mark with all rows set to NULL.
         if (_is_mark_join) {
             auto block_rows = local_state._probe_block.rows();
             if (block_rows == 0) {
@@ -203,9 +210,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                     
temp_block.insert(local_state._probe_block.get_by_position(i));
                 }
             }
-            auto mark_column = vectorized::ColumnUInt8::create(block_rows, 0);
-            temp_block.insert(
-                    {std::move(mark_column), 
std::make_shared<vectorized::DataTypeUInt8>(), ""});
+            auto mark_column = vectorized::ColumnNullable::create(
+                    vectorized::ColumnUInt8::create(block_rows, 0),
+                    vectorized::ColumnUInt8::create(block_rows, 1));
+            temp_block.insert({std::move(mark_column),
+                               
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});
 
             {
                 SCOPED_TIMER(local_state._join_filter_timer);
@@ -220,9 +229,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
             local_state.reached_limit(output_block, source_state);
             return Status::OK();
         }
-        // If we use a short-circuit strategy, should return empty block 
directly.
-        source_state = SourceState::FINISHED;
-        return Status::OK();
     }
     local_state._join_block.clear_column_data();
 
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h 
b/be/src/pipeline/exec/join_build_sink_operator.h
index 2f7a3ec03ea..5f440836874 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -40,8 +40,6 @@ protected:
     template <typename LocalStateType>
     friend class JoinBuildSinkOperatorX;
 
-    bool _has_null_in_build_side = false;
-
     RuntimeProfile::Counter* _build_rows_counter;
     RuntimeProfile::Counter* _push_down_timer;
     RuntimeProfile::Counter* _push_compute_timer;
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index c4776afe019..63074bed70c 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -66,9 +66,8 @@ void JoinProbeLocalState<DependencyType, 
Derived>::_construct_mutable_join_block
         }
     }
     if (p._is_mark_join) {
-        _join_block.replace_by_position(
-                _join_block.columns() - 1,
-                
remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
+        DCHECK(!p._is_mark_join ||
+               _join_block.get_by_position(_join_block.columns() - 
1).column->is_nullable());
     }
 }
 
@@ -183,7 +182,8 @@ 
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
                                            ? 
tnode.nested_loop_join_node.is_mark
                                            : false)
                         : tnode.hash_join_node.__isset.is_mark ? 
tnode.hash_join_node.is_mark
-                                                               : false) {
+                                                               : false),
+          _short_circuit_for_null_in_build_side(_join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     if (tnode.__isset.hash_join_node) {
         _intermediate_row_desc.reset(new RowDescriptor(
                 descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index 863160b83f6..5727318a4f4 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -113,6 +113,7 @@ protected:
     // output expr
     vectorized::VExprContextSPtrs _output_expr_ctxs;
     OperatorXPtr _build_side_child;
+    const bool _short_circuit_for_null_in_build_side;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index ecac7c94dd1..14e19dd352f 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -22,6 +22,7 @@
 #include "pipeline/exec/operator.h"
 #include "vec/core/block.h"
 #include "vec/exec/join/vnested_loop_join_node.h"
+#include "vec/columns/column_filter_helper.h"
 
 namespace doris {
 class RuntimeState;
@@ -102,12 +103,9 @@ void 
NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block*
         }
     }
     if (p._is_mark_join) {
-        vectorized::IColumn::Filter& mark_data =
-                
assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
-                        *block->get_by_position(block->columns() - 
1).column->assume_mutable())
-                        .get_data();
-        if (mark_data.size() < block->rows()) {
-            mark_data.resize_fill(block->rows(), 1);
+        auto mark_column = block->get_by_position(block->columns() - 
1).column->assume_mutable();
+        if (mark_column->size() < block->rows()) {
+            
vectorized::ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1);
         }
     }
 }
@@ -343,15 +341,12 @@ void 
NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
                 _resize_fill_tuple_is_null_column(new_size, 0, 1);
             }
         } else {
-            vectorized::IColumn::Filter& mark_data =
-                    
assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
-                            *dst_columns[dst_columns.size() - 1])
-                            .get_data();
-            mark_data.reserve(mark_data.size() + _left_side_process_count);
+            vectorized::ColumnFilterHelper 
mark_column(*dst_columns[dst_columns.size() - 1]);
+            mark_column.reserve(mark_column.size() + _left_side_process_count);
             DCHECK_LE(_left_block_start_pos + _left_side_process_count, 
_child_block->rows());
             for (int j = _left_block_start_pos;
                  j < _left_block_start_pos + _left_side_process_count; ++j) {
-                mark_data.emplace_back(IsSemi == 
_cur_probe_row_visited_flags[j]);
+                mark_column.insert_value(IsSemi == 
_cur_probe_row_visited_flags[j]);
             }
             for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
                 const vectorized::ColumnWithTypeAndName src_column =
@@ -396,11 +391,9 @@ void 
NestedLoopJoinProbeLocalState::_append_left_data_with_null(
     for (size_t i = 0; i < p._num_build_side_columns; ++i) {
         dst_columns[p._num_probe_side_columns + 
i]->insert_many_defaults(_left_side_process_count);
     }
-    vectorized::IColumn::Filter& mark_data =
-            assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
-                    *dst_columns[dst_columns.size() - 1])
-                    .get_data();
-    mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
+    auto& mark_column = *dst_columns[dst_columns.size() - 1];
+    vectorized::ColumnFilterHelper(mark_column)
+            .resize_fill(mark_column.size() + _left_side_process_count, 0);
 }
 
 void NestedLoopJoinProbeLocalState::_process_left_child_block(
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 5c9ebb452e1..11e6a0b5c93 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -548,6 +548,7 @@ struct JoinSharedState {
     // For some join case, we can apply a short circuit strategy
     // 1. _has_null_in_build_side = true
     // 2. build side rows is empty, Join op is: inner join/right outer 
join/left semi/right semi/right anti
+    bool _has_null_in_build_side = false;
     bool short_circuit_for_probe = false;
     vectorized::JoinOpVariants join_op_variants;
 };
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 5f769e4cafe..3ec7d364d56 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -159,7 +159,8 @@ 
HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* lo
           _probe_key_sz(local_state->_shared_state->probe_key_sz),
           
_left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags),
           
_right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags),
-          
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output) {}
+          
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output),
+          
_has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side)
 {}
 
 HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node)
         : _hash_table_memory_usage(join_node->_hash_table_memory_usage),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to