This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b5da3f74f5 [improvement](join) avoid unnecessary copying in 
_build_output_block (#21360)
b5da3f74f5 is described below

commit b5da3f74f507752fd6327e7e9af14f7e8052bfcb
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Tue Jul 4 12:13:49 2023 +0800

    [improvement](join) avoid unnecessary copying in _build_output_block 
(#21360)
    
    If the source columns are mutually exclusive within a temporary block, 
there is no need to duplicate the data.
---
 be/src/vec/exec/join/vhash_join_node.cpp        |  7 +++-
 be/src/vec/exec/join/vjoin_node_base.cpp        | 44 ++++++++++++++++++-------
 be/src/vec/exec/join/vjoin_node_base.h          |  2 +-
 be/src/vec/exec/join/vnested_loop_join_node.cpp |  6 +++-
 4 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 678d2f6483..074e210bf5 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -623,7 +623,12 @@ Status HashJoinNode::pull(doris::RuntimeState* state, 
vectorized::Block* output_
         SCOPED_TIMER(_join_filter_timer);
         RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, &temp_block, 
temp_block.columns()));
     }
-    RETURN_IF_ERROR(_build_output_block(&temp_block, output_block));
+
+    // Here make _join_block release the columns' ptr
+    _join_block.set_columns(_join_block.clone_empty_columns());
+    mutable_join_block.clear();
+
+    RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
     _reset_tuple_is_null_column();
     reached_limit(output_block, eos);
     return Status::OK();
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp 
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 57870a0ac8..a4e1493d58 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -33,6 +33,7 @@
 #include "util/telemetry/telemetry.h"
 #include "util/threadpool.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
@@ -149,7 +150,8 @@ void VJoinNodeBase::_construct_mutable_join_block() {
     }
 }
 
-Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* 
output_block) {
+Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* 
output_block,
+                                          bool keep_origin) {
     SCOPED_TIMER(_build_output_block_timer);
     auto is_mem_reuse = output_block->mem_reuse();
     MutableBlock mutable_block =
@@ -160,13 +162,21 @@ Status VJoinNodeBase::_build_output_block(Block* 
origin_block, Block* output_blo
     // TODO: After FE plan support same nullable of output expr and origin 
block and mutable column
     // we should replace `insert_column_datas` by `insert_range_from`
 
-    auto insert_column_datas = [](auto& to, const auto& from, size_t rows) {
-        if (to->is_nullable() && !from.is_nullable()) {
-            auto& null_column = reinterpret_cast<ColumnNullable&>(*to);
-            null_column.get_nested_column().insert_range_from(from, 0, rows);
-            null_column.get_null_map_column().get_data().resize_fill(rows, 0);
+    auto insert_column_datas = [keep_origin](auto& to, ColumnPtr& from, size_t 
rows) {
+        if (to->is_nullable() && !from->is_nullable()) {
+            if (keep_origin || !from->is_exclusive()) {
+                auto& null_column = reinterpret_cast<ColumnNullable&>(*to);
+                null_column.get_nested_column().insert_range_from(*from, 0, 
rows);
+                null_column.get_null_map_column().get_data().resize_fill(rows, 
0);
+            } else {
+                to = make_nullable(from, false)->assume_mutable();
+            }
         } else {
-            to->insert_range_from(from, 0, rows);
+            if (keep_origin || !from->is_exclusive()) {
+                to->insert_range_from(*from, 0, rows);
+            } else {
+                to = from->assume_mutable();
+            }
         }
     };
     if (rows != 0) {
@@ -174,7 +184,7 @@ Status VJoinNodeBase::_build_output_block(Block* 
origin_block, Block* output_blo
         if (_output_expr_ctxs.empty()) {
             DCHECK(mutable_columns.size() == 
row_desc().num_materialized_slots());
             for (int i = 0; i < mutable_columns.size(); ++i) {
-                insert_column_datas(mutable_columns[i], 
*origin_block->get_by_position(i).column,
+                insert_column_datas(mutable_columns[i], 
origin_block->get_by_position(i).column,
                                     rows);
             }
         } else {
@@ -183,13 +193,23 @@ Status VJoinNodeBase::_build_output_block(Block* 
origin_block, Block* output_blo
             for (int i = 0; i < mutable_columns.size(); ++i) {
                 auto result_column_id = -1;
                 RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, 
&result_column_id));
-                auto column_ptr = 
origin_block->get_by_position(result_column_id)
-                                          
.column->convert_to_full_column_if_const();
-                insert_column_datas(mutable_columns[i], *column_ptr, rows);
+                auto& origin_column = 
origin_block->get_by_position(result_column_id).column;
+
+                /// `convert_to_full_column_if_const` will create a pointer to 
the origin column if
+                /// the origin column is not ColumnConst/ColumnArray, this 
make the column be not
+                /// exclusive.
+                /// TODO: maybe need a method to check if a column need to be 
converted to full
+                /// column.
+                if (is_column_const(*origin_column) || 
check_column<ColumnArray>(origin_column)) {
+                    auto column_ptr = 
origin_column->convert_to_full_column_if_const();
+                    insert_column_datas(mutable_columns[i], column_ptr, rows);
+                } else {
+                    insert_column_datas(mutable_columns[i], origin_column, 
rows);
+                }
             }
         }
 
-        if (!is_mem_reuse) {
+        if (!is_mem_reuse || !keep_origin) {
             output_block->swap(mutable_block.to_block());
         }
         DCHECK(output_block->rows() == rows);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index f29897719d..120e77785e 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -80,7 +80,7 @@ protected:
     // Convert the intermediate blocks to the final result. For example, if 
the block from probe
     // side is non-nullable and the join op is righter outer join, we need to 
convert the non-nullable
     // columns from probe side to a nullable column.
-    Status _build_output_block(Block* origin_block, Block* output_block);
+    Status _build_output_block(Block* origin_block, Block* output_block, bool 
keep_origin = true);
     // Open probe side asynchronously.
     void _probe_side_open_thread(RuntimeState* state, std::promise<Status>* 
status);
 
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 0b50860874..a3b6c30c9f 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -672,13 +672,17 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, 
vectorized::Block* block,
 
         {
             Block tmp_block = _join_block;
+
+            // Here make _join_block release the columns' ptr
+            _join_block.set_columns(_join_block.clone_empty_columns());
+
             _add_tuple_is_null_column(&tmp_block);
             {
                 SCOPED_TIMER(_join_filter_timer);
                 RETURN_IF_ERROR(
                         VExprContext::filter_block(_conjuncts, &tmp_block, 
tmp_block.columns()));
             }
-            RETURN_IF_ERROR(_build_output_block(&tmp_block, block));
+            RETURN_IF_ERROR(_build_output_block(&tmp_block, block, false));
             _reset_tuple_is_null_column();
         }
         _join_block.clear_column_data();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to