This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0d2d9357109 (join) fix broadcast join running when hash table build 
not finished
0d2d9357109 is described below

commit 0d2d9357109e2ca108201782f90119b8e6313d8c
Author: zhangstar333 <2561612...@qq.com>
AuthorDate: Thu Jul 11 11:23:09 2024 +0800

    (join) fix broadcast join running when hash table build not finished
---
 be/src/pipeline/exec/analytic_source_operator.cpp  |   5 +
 be/src/pipeline/exec/operator.cpp                  |   5 -
 regression-test/data/query_p0/join/test_join6.out  |   5 +
 .../suites/query_p0/join/test_join6.groovy         | 291 +++++++++++++++++++++
 4 files changed, 301 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index bc8f3279f92..2619bf4799a 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -559,6 +559,11 @@ Status AnalyticLocalState::close(RuntimeState* state) {
 
     std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
     _result_window_columns.swap(tmp_result_window_columns);
+    // Some kinds of source operators has a 1-1 relationship with a sink 
operator (such as AnalyticOperator).
+    // We must ensure AnalyticSinkOperator will not be blocked if 
AnalyticSourceOperator already closed.
+    if (_shared_state && _shared_state->sink_deps.size() == 1) {
+        _shared_state->sink_deps.front()->set_always_ready();
+    }
     return PipelineXLocalState<AnalyticSharedState>::close(state);
 }
 
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 141eb1eaf59..b944fcfe21a 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -498,11 +498,6 @@ Status 
PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     }
     _closed = true;
-    // Some kinds of source operators has a 1-1 relationship with a sink 
operator (such as AnalyticOperator).
-    // We must ensure AnalyticSinkOperator will not be blocked if 
AnalyticSourceOperator already closed.
-    if (_shared_state && _shared_state->sink_deps.size() == 1) {
-        _shared_state->sink_deps.front()->set_always_ready();
-    }
     return Status::OK();
 }
 
diff --git a/regression-test/data/query_p0/join/test_join6.out 
b/regression-test/data/query_p0/join/test_join6.out
new file mode 100644
index 00000000000..eca6185e834
--- /dev/null
+++ b/regression-test/data/query_p0/join/test_join6.out
@@ -0,0 +1,5 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_hash_join --
+
+-- !select_hash_join2 --
+
diff --git a/regression-test/suites/query_p0/join/test_join6.groovy 
b/regression-test/suites/query_p0/join/test_join6.groovy
new file mode 100644
index 00000000000..612ea373793
--- /dev/null
+++ b/regression-test/suites/query_p0/join/test_join6.groovy
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_join6", "query,p0") {
+    def DBname = "regression_test_join6"
+    sql "DROP DATABASE IF EXISTS ${DBname}"
+    sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
+    sql "use ${DBname}"
+
+    def tbName1 = 
"table_20_undef_partitions2_keys3_properties4_distributed_by5"
+    def tbName2 = 
"table_20_undef_partitions2_keys3_properties4_distributed_by52"
+    def tbName3 = 
"table_30_undef_partitions2_keys3_properties4_distributed_by5"
+    def tbName4 = 
"table_30_undef_partitions2_keys3_properties4_distributed_by52"
+    def tbName5 = 
"table_50_undef_partitions2_keys3_properties4_distributed_by5"
+    def tbName6 = 
"table_50_undef_partitions2_keys3_properties4_distributed_by52"
+    def tbName7 = 
"table_100_undef_partitions2_keys3_properties4_distributed_by5"
+    def tbName8 = 
"table_100_undef_partitions2_keys3_properties4_distributed_by52"
+    def tbName9 = 
"table_200_undef_partitions2_keys3_properties4_distributed_by5"
+    def tbName10 = 
"table_200_undef_partitions2_keys3_properties4_distributed_by52"
+
+    sql "DROP TABLE IF EXISTS ${tbName1};"
+    sql "DROP TABLE IF EXISTS ${tbName2};"
+    sql "DROP TABLE IF EXISTS ${tbName3};"
+    sql "DROP TABLE IF EXISTS ${tbName4};"
+    sql "DROP TABLE IF EXISTS ${tbName5};"
+    sql "DROP TABLE IF EXISTS ${tbName6};"
+    sql "DROP TABLE IF EXISTS ${tbName7};"
+    sql "DROP TABLE IF EXISTS ${tbName8};"
+    sql "DROP TABLE IF EXISTS ${tbName9};"
+    sql "DROP TABLE IF EXISTS ${tbName10};"
+
+    sql """
+        create table 
table_20_undef_partitions2_keys3_properties4_distributed_by5 (
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed2)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_20_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,6,4,-3343259,7),(1,null,2,-5659896,0),(2,2,2369913,-5247778,-4711382),(3,6545002,3,2,4),(4,9,3,4,5),(5,4,5,4,1),(6,4,-4704791,null,6),(7,null,3,null,9),(8,-1012411,4,null,-1244656),(9,1,8,9,-5175872),(10,8,0,-4239951,2),(11,8,-2231762,4817469,2),(12,9,9,5,-427963),(13,4,0,null,-5587539),(14,-5949786,2,2,34322
 [...]
+    """
+
+    sql """
+        create table 
table_20_undef_partitions2_keys3_properties4_distributed_by52 (
+        col_int_undef_signed int    ,
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+        PARTITION BY             RANGE(col_int_undef_signed) (
+                        PARTITION p0 VALUES LESS THAN ('4'),
+                        PARTITION p1 VALUES LESS THAN ('6'),
+                        PARTITION p2 VALUES LESS THAN ('7'),
+                        PARTITION p3 VALUES LESS THAN ('8'),
+                        PARTITION p4 VALUES LESS THAN ('10'),
+                        PARTITION p5 VALUES LESS THAN ('83647'),
+                        PARTITION p100 VALUES LESS THAN ('2147483647')
+                    )
+                
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_20_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,6,-179064,5213411,5),(1,3,5,2,6),(2,4226261,7,null,3),(3,9,null,4,4),(4,-1003770,2,1,1),(5,8,7,null,8176864),(6,3388266,5,8,8),(7,5,1,2,null),(8,9,2064412,0,null),(9,1489553,8,-446412,6),(10,1,3,0,1),(11,null,3,4621304,null),(12,null,-3058026,-262645,9),(13,null,null,9,3),(14,null,null,5037128,7),(15,299896,
 [...]
+    """
+
+    sql """
+        create table 
table_30_undef_partitions2_keys3_properties4_distributed_by5 (
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed2)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_30_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,2,null,0,null),(1,-242819,2983243,7071252,3),(2,1,-2342407,-1423905,8),(3,null,null,7,4),(4,-1494065,3,7,2),(5,5,0,-595225,5),(6,5,-3324113,0,5),(7,6829192,3527453,6,5436506),(8,1,-3189592,2,9),(9,null,2,6,2),(10,-4070807,null,-3324205,7),(11,8,-5293967,1,-5040205),(12,6,7440524,null,null),(13,null,2,9,5),(14
 [...]
+    """
+
+    sql """
+        create table 
table_30_undef_partitions2_keys3_properties4_distributed_by52 (
+        col_int_undef_signed int    ,
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+        PARTITION BY             RANGE(col_int_undef_signed) (
+                        PARTITION p0 VALUES LESS THAN ('4'),
+                        PARTITION p1 VALUES LESS THAN ('6'),
+                        PARTITION p2 VALUES LESS THAN ('7'),
+                        PARTITION p3 VALUES LESS THAN ('8'),
+                        PARTITION p4 VALUES LESS THAN ('10'),
+                        PARTITION p5 VALUES LESS THAN ('83647'),
+                        PARTITION p100 VALUES LESS THAN ('2147483647')
+                    )
+                
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_30_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,9,9,null,1),(1,6821639,9,null,-5431086),(2,8,4,6,7701043),(3,2,-6700938,1425835,7),(4,null,1,3,4),(5,8,8,-714745,null),(6,7,3,4447765,null),(7,1,-2101501,0,5),(8,7,0,9,6),(9,4696294,3,2,-3197661),(10,8,4600901,8,1),(11,-1042936,null,-2187191,0),(12,5116430,0,2687672,9),(13,3,3,8,1287742),(14,-3829647,3,4,751
 [...]
+    """
+
+    sql """
+        create table 
table_50_undef_partitions2_keys3_properties4_distributed_by5 (
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed2)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_50_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,8,0,3,7),(1,6,227612,4,8),(2,-590975,9,-4411568,6),(3,-7241036,null,3,5),(4,1,7,null,8),(5,2509741,5,5,1),(6,2,9,null,4817793),(7,6,8,3,0),(8,null,1,4,null),(9,711269,null,-613109,null),(10,null,7,0,7),(11,null,-5534845,0,4),(12,5,2,9,6850777),(13,-5789051,8,6,2463068),(14,2,5,953451,1),(15,-6229147,-6738861,
 [...]
+    """
+
+    sql """
+        create table 
table_50_undef_partitions2_keys3_properties4_distributed_by52 (
+        col_int_undef_signed int    ,
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+        PARTITION BY             RANGE(col_int_undef_signed) (
+                        PARTITION p0 VALUES LESS THAN ('4'),
+                        PARTITION p1 VALUES LESS THAN ('6'),
+                        PARTITION p2 VALUES LESS THAN ('7'),
+                        PARTITION p3 VALUES LESS THAN ('8'),
+                        PARTITION p4 VALUES LESS THAN ('10'),
+                        PARTITION p5 VALUES LESS THAN ('83647'),
+                        PARTITION p100 VALUES LESS THAN ('2147483647')
+                    )
+                
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_50_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,-7314662,null,0,4373927),(1,0,9,2,null),(2,5,2151343,-1467194,null),(3,null,null,-6124108,null),(4,5795207,4306466,4,7),(5,6,8,3,9),(6,null,8,-7232808,9),(7,9,6,9,6),(8,4637962,-1241311,2,8),(9,1,2,3,null),(10,0,-1652390,1,3),(11,0,9,6,2),(12,-8342795,0,5539034,-4960208),(13,2768087,7,-6242297,4996873),(14,1
 [...]
+    """
+
+    sql """
+        create table 
table_100_undef_partitions2_keys3_properties4_distributed_by5 (
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed2)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_100_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,3,7164641,5,8),(1,null,3916062,5,6),(2,1,5533498,0,9),(3,7,2,null,7057679),(4,1,0,7,7),(5,null,4,2448564,1),(6,7531976,7324373,9,7),(7,3,1,1,3),(8,6,8131576,9,-1793807),(9,9,2,4214547,9),(10,-7299852,5,1,3),(11,7,3,-1036551,5),(12,-6108579,84823,4,1229534),(13,-1065629,5,4,null),(14,null,8072633,3328285,2),(
 [...]
+    """
+
+
+    sql """
+        create table 
table_100_undef_partitions2_keys3_properties4_distributed_by52 (
+        col_int_undef_signed int    ,
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+        PARTITION BY             RANGE(col_int_undef_signed) (
+                        PARTITION p0 VALUES LESS THAN ('4'),
+                        PARTITION p1 VALUES LESS THAN ('6'),
+                        PARTITION p2 VALUES LESS THAN ('7'),
+                        PARTITION p3 VALUES LESS THAN ('8'),
+                        PARTITION p4 VALUES LESS THAN ('10'),
+                        PARTITION p5 VALUES LESS THAN ('83647'),
+                        PARTITION p100 VALUES LESS THAN ('2147483647')
+                    )
+                
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_100_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,7865838,-348902,null,8),(1,-9434,9,8,0),(2,1845860,6675073,-7931956,-66007),(3,-7523286,210291,3,4),(4,null,-1341350,-5318642,1),(5,-6634226,2179558,2,7),(6,2,7,2,3),(7,9,2,3,-7773846),(8,0,8,6,2407384),(9,0,1,7,7),(10,5,5,null,8),(11,9,null,8283010,6),(12,7359987,5145929,2,5),(13,0,5225949,0,6770846),(14,1
 [...]
+    """
+
+    sql """
+        create table 
table_200_undef_partitions2_keys3_properties4_distributed_by5 (
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed2)
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_200_undef_partitions2_keys3_properties4_distributed_by5(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,null,7,3,9),(1,6970022,9,6,2),(2,null,0,null,7262031),(3,4,6,null,7236151),(4,789682,7324018,5,5),(5,-2056178,9,0,0),(6,-7081969,-2103366,0,1),(7,3,5,3,3),(8,3175437,4,6,-2017026),(9,3,null,null,7),(10,-5725039,5,2,3),(11,8,9,2,5),(12,-6487649,1,5,-2847073),(13,3415118,null,4,-6786736),(14,null,4,7,1),(15,99
 [...]
+    """
+
+    sql """
+        create table 
table_200_undef_partitions2_keys3_properties4_distributed_by52 (
+        col_int_undef_signed int    ,
+        col_int_undef_signed2 int    ,
+        col_int_undef_signed3 int    ,
+        col_int_undef_signed4 int    ,
+        pk int
+        ) engine=olap
+        DUPLICATE KEY(col_int_undef_signed, col_int_undef_signed2)
+        PARTITION BY             RANGE(col_int_undef_signed) (
+                        PARTITION p0 VALUES LESS THAN ('4'),
+                        PARTITION p1 VALUES LESS THAN ('6'),
+                        PARTITION p2 VALUES LESS THAN ('7'),
+                        PARTITION p3 VALUES LESS THAN ('8'),
+                        PARTITION p4 VALUES LESS THAN ('10'),
+                        PARTITION p5 VALUES LESS THAN ('83647'),
+                        PARTITION p100 VALUES LESS THAN ('2147483647')
+                    )
+                
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+    sql """
+        insert into 
table_200_undef_partitions2_keys3_properties4_distributed_by52(pk,col_int_undef_signed,col_int_undef_signed2,col_int_undef_signed3,col_int_undef_signed4)
 values 
(0,null,6178782,4,-1498997),(1,null,null,2,4),(2,8,6,6114625,6840353),(3,6,-3487226,4,-18364),(4,6647558,0,7,4),(5,5,1,3,3991803),(6,null,3,3,6),(7,-1597140,3,3,2),(8,6415967,null,9,null),(9,0,2,-1569216,8263281),(10,2546741,4,-4334118,8),(11,2375117,5,null,-3767162),(12,4,290235,null,6),(13,5569849,8,6,null),
 [...]
+    """
+
+    qt_select_hash_join """
+        SELECT /*+   leading( tbl5 { tbl3 tbl4 } tbl1 tbl2    ) */ tbl4 . 
col_int_undef_signed4 AS col_int_undef_signed , 2 AS col_int_undef_signed2 , 
tbl3 . col_int_undef_signed2 AS col_int_undef_signed3 , tbl1 . 
col_int_undef_signed AS col_int_undef_signed4 FROM 
table_50_undef_partitions2_keys3_properties4_distributed_by5 AS tbl1  INNER 
JOIN table_20_undef_partitions2_keys3_properties4_distributed_by5 AS tbl2 ON 
tbl2 . col_int_undef_signed = tbl1 . col_int_undef_signed2 AND tbl2 . col_ [...]
+    """
+
+    qt_select_hash_join2 """
+        SELECT
+            /*+  ORDERED leading( { tbl3 tbl4 tbl1 tbl2  }      ) */
+            3
+        FROM
+            table_30_undef_partitions2_keys3_properties4_distributed_by52 AS 
tbl1
+        WHERE
+            (
+                tbl1.col_int_undef_signed2 IN (
+                    SELECT
+                        0
+                    FROM
+                        (
+                            SELECT
+                                1
+                            FROM
+                                
table_20_undef_partitions2_keys3_properties4_distributed_by52 AS tbl1
+                                JOIN 
table_30_undef_partitions2_keys3_properties4_distributed_by52 AS tbl2 ON 
tbl2.col_int_undef_signed4 = tbl1.col_int_undef_signed2
+                        ) AS tbl1
+                )
+                AND (
+                    tbl1.col_int_undef_signed2 NOT IN (
+                        SELECT
+                            tbl1.col_int_undef_signed AS col_int_undef_signed
+                        FROM
+                            
table_20_undef_partitions2_keys3_properties4_distributed_by52 AS tbl1
+                    )
+                )
+            );
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to