This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d26aca4cad [fix](pipeline) sort_merge should throw exception in 
has_next_block if got failed status (#29076)
6d26aca4cad is described below

commit 6d26aca4cad3757bbdb243c0d5f24729ba09c674
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Wed Dec 27 10:06:01 2023 +0800

    [fix](pipeline) sort_merge should throw exception in has_next_block if got 
failed status (#29076)
    
    Test in 
regression-test/suites/datatype_p0/decimalv3/test_decimalv3_overflow.groovy::249
 sometimes failed when there are multiple BEs and FE process report status 
slowly for some reason.
    
    explain select k1, k2, k1 * k2 from test_decimal128_overflow2 order by 1,2,3
    --------------
    
    
+----------------------------------------------------------------------------------------------------------------------------+
    | Explain String(Nereids Planner)                                           
                                                 |
    
+----------------------------------------------------------------------------------------------------------------------------+
    | PLAN FRAGMENT 0                                                           
                                                 |
    |   OUTPUT EXPRS:                                                           
                                                 |
    |     k1[#5]                                                                
                                                 |
    |     k2[#6]                                                                
                                                 |
    |     (k1 * k2)[#7]                                                         
                                                 |
    |   PARTITION: UNPARTITIONED                                                
                                                 |
    |                                                                           
                                                 |
    |   HAS_COLO_PLAN_NODE: false                                               
                                                 |
    |                                                                           
                                                 |
    |   VRESULT SINK                                                            
                                                 |
    |      MYSQL_PROTOCAL                                                       
                                                 |
    |                                                                           
                                                 |
    |   111:VMERGING-EXCHANGE                                                   
                                                 |
    |      offset: 0                                                            
                                                 |
    |                                                                           
                                                 |
    | PLAN FRAGMENT 1                                                           
                                                 |
    |                                                                           
                                                 |
    |   PARTITION: HASH_PARTITIONED: k1[#0], k2[#1]                             
                                                 |
    |                                                                           
                                                 |
    |   HAS_COLO_PLAN_NODE: false                                               
                                                 |
    |                                                                           
                                                 |
    |   STREAM DATA SINK                                                        
                                                 |
    |     EXCHANGE ID: 111                                                      
                                                 |
    |     UNPARTITIONED                                                         
                                                 |
    |                                                                           
                                                 |
    |   108:VSORT                                                               
                                                 |
    |   |  order by: k1[#5] ASC, k2[#6] ASC, (k1 * k2)[#7] ASC                  
                                                 |
    |   |  offset: 0                                                            
                                                 |
    |   |                                                                       
                                                 |
    |   102:VOlapScanNode                                                       
                                                 |
    |      TABLE: 
regression_test_datatype_p0_decimalv3.test_decimal128_overflow2(test_decimal128_overflow2),
 PREAGGREGATION: ON |
    |      partitions=1/1 (test_decimal128_overflow2), tablets=8/8, 
tabletList=22841,22843,22845 ...                             |
    |      cardinality=6, avgRowSize=0.0, numNodes=1                            
                                                 |
    |      pushAggOp=NONE                                                       
                                                 |
    |      projections: k1[#0], k2[#1], (k1[#0] * k2[#1])                       
                                                 |
    |      project output tuple id: 1                                           
                                                 |
    
+----------------------------------------------------------------------------------------------------------------------------+
    36 rows in set (0.03 sec)
    Why failed:
    
    Multiple BEs
    Fragments 0 and 1 are MUST on different BEs
    Pipeline task of VOlapScanNode which executes k1*k2 failed sets query 
status to cancelled
    Pipeline task of VSort call try close, send Cancelled status to 
VMergeExchange
    sort_curso did not throw exception when it meets error
---
 be/src/vec/core/sort_cursor.h             |  2 ++
 be/src/vec/runtime/vsorted_run_merger.cpp | 14 +++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 71db3428541..b43449b50b0 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -244,6 +244,8 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
             }
             MergeSortCursorImpl::reset(_block);
             return status.ok();
+        } else if (!status.ok()) {
+            throw std::runtime_error(status.msg());
         }
         return false;
     }
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp 
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 6d2f68db628..3637bd54aed 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -69,12 +69,16 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) 
{
 }
 
 Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
-    for (const auto& supplier : input_runs) {
-        if (_use_sort_desc) {
-            _cursors.emplace_back(supplier, _desc);
-        } else {
-            _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, 
_nulls_first);
+    try {
+        for (const auto& supplier : input_runs) {
+            if (_use_sort_desc) {
+                _cursors.emplace_back(supplier, _desc);
+            } else {
+                _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, 
_nulls_first);
+            }
         }
+    } catch (const std::exception& e) {
+        return Status::Cancelled(e.what());
     }
 
     for (auto& _cursor : _cursors) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to