This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea1554374cf [fix](multicast) fix DCHECK failure of block mem reuse for 
multicast (#26127)
ea1554374cf is described below

commit ea1554374cf759aea1b8204bfa518f18372150f8
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Tue Oct 31 16:35:26 2023 +0800

    [fix](multicast) fix DCHECK failure of block mem reuse for multicast 
(#26127)
    
    * [fix](multicast) fix DCHECK failure of block mem reuse for multicast
---
 .../exec/multi_cast_data_stream_source.cpp         |  4 +-
 be/src/vec/exprs/vexpr_context.cpp                 | 16 +++-
 be/src/vec/exprs/vexpr_context.h                   |  3 +-
 .../correctness_p0/test_bugfix_block_reuse.out     |  8 ++
 .../correctness_p0/test_bugfix_block_reuse.groovy  | 91 ++++++++++++++++++++++
 5 files changed, 117 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index dd1c1e46d21..c8036f79baf 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -107,7 +107,7 @@ Status 
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
 
     if (!_output_expr_contexts.empty() && output_block->rows() > 0) {
         
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
-                _output_expr_contexts, *output_block, block));
+                _output_expr_contexts, *output_block, block, true));
         materialize_block_inplace(*block);
     }
     if (eos) {
@@ -171,7 +171,7 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
 
     if (!local_state._output_expr_contexts.empty() && output_block->rows() > 
0) {
         
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
-                local_state._output_expr_contexts, *output_block, block));
+                local_state._output_expr_contexts, *output_block, block, 
true));
         materialize_block_inplace(*block);
     }
     COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index 155faf5d007..60eb577c650 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -287,15 +287,27 @@ Status 
VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs&
     return Status::OK();
 }
 
+// do_projection: for some query(e.g. in 
MultiCastDataStreamerSourceOperator::get_block()),
+// output_vexpr_ctxs will output the same column more than once, and if the 
output_block
+// is mem-reused later, it will trigger DCHECK_EQ(d.column->use_count(), 1) 
failure when
+// doing Block::clear_column_data, set do_projection to true to copy the 
column data to
+// avoid this problem.
 Status VExprContext::get_output_block_after_execute_exprs(
-        const VExprContextSPtrs& output_vexpr_ctxs, const Block& input_block, 
Block* output_block) {
+        const VExprContextSPtrs& output_vexpr_ctxs, const Block& input_block, 
Block* output_block,
+        bool do_projection) {
+    auto rows = input_block.rows();
     vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
     vectorized::ColumnsWithTypeAndName result_columns;
     for (auto& vexpr_ctx : output_vexpr_ctxs) {
         int result_column_id = -1;
         RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
         DCHECK(result_column_id != -1);
-        
result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
+        const auto& col = tmp_block.get_by_position(result_column_id);
+        if (do_projection) {
+            result_columns.emplace_back(col.column->clone_resized(rows), 
col.type, col.name);
+        } else {
+            
result_columns.emplace_back(tmp_block.get_by_position(result_column_id));
+        }
     }
     *output_block = {result_columns};
     return Status::OK();
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 7e0f0275cc3..db5c4c87d8d 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -93,7 +93,8 @@ public:
                                                      int column_to_keep, 
IColumn::Filter& filter);
 
     [[nodiscard]] static Status get_output_block_after_execute_exprs(const 
VExprContextSPtrs&,
-                                                                     const 
Block&, Block*);
+                                                                     const 
Block&, Block*,
+                                                                     bool 
do_projection = false);
 
     int get_last_result_column_id() const {
         DCHECK(_last_result_column_id != -1);
diff --git a/regression-test/data/correctness_p0/test_bugfix_block_reuse.out 
b/regression-test/data/correctness_p0/test_bugfix_block_reuse.out
new file mode 100644
index 00000000000..e52ec66614c
--- /dev/null
+++ b/regression-test/data/correctness_p0/test_bugfix_block_reuse.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_0 --
+
+-- !sql_1 --
+1.100
+2.200
+3.300
+
diff --git 
a/regression-test/suites/correctness_p0/test_bugfix_block_reuse.groovy 
b/regression-test/suites/correctness_p0/test_bugfix_block_reuse.groovy
new file mode 100644
index 00000000000..fb25cd7e687
--- /dev/null
+++ b/regression-test/suites/correctness_p0/test_bugfix_block_reuse.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test some keywords that may conflict.
+// For example, "bin" is used for function "bin",
+// and also used "show catalog recycle bin"
+suite("test_bugfix_block_reuse") {
+    sql "drop table if exists test_bugfix_block_reuse;"
+    sql """
+        create table test_bugfix_block_reuse (
+            k1 int, v1 decimal(20,3)
+        ) distributed by hash(k1) properties("replication_num"="1");
+    """
+    sql "insert into test_bugfix_block_reuse values(1, 1.1), (2, 2.2), (3, 
3.3);"
+    sql "sync"
+    qt_sql_0 """
+        with ta as (
+          select
+            `v1` as source_,
+            'funnel_seq_1' as funnel_seq_
+          from
+            test_bugfix_block_reuse
+        )
+        select
+          left_.source_ as source_
+        from
+          (
+            select
+              source_ as source_,
+              row_number() over(PARTITION BY source_) as session_id_
+            from
+              ta
+            where
+              funnel_seq_ IN ('funnel_seq_1')
+          ) left_
+          inner join (
+            select
+              source_ as source_
+            from
+              ta
+            where
+              funnel_seq_ IN ('funnel_seq_2')
+          ) right_ on right_.source_ = left_.source_
+          order by 1;
+    """
+
+    qt_sql_1 """
+        with ta as (
+          select
+            `v1` as source_,
+            'funnel_seq_1' as funnel_seq_
+          from
+            test_bugfix_block_reuse
+        )
+        select
+          left_.source_ as source_
+        from
+          (
+            select
+              source_ as source_,
+              row_number() over(PARTITION BY source_) as session_id_
+            from
+              ta
+            where
+              funnel_seq_ IN ('funnel_seq_1')
+          ) left_
+          inner join (
+            select
+              source_ as source_
+            from
+              ta
+            where
+              funnel_seq_ IN ('funnel_seq_1')
+          ) right_ on right_.source_ = left_.source_
+          order by 1;
+    """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to