This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6c41b50c8acc4f4cce7c718ba94fe2adfa8d4927
Author: 924060929 <924060...@qq.com>
AuthorDate: Tue Jul 30 20:36:11 2024 +0800

    [fix](nereids) fix insert stmt throw MultiCastDataSink cannot be cast to 
DataStreamSink (#38526)
    
    fix `insert ... with ... select ...`, which not use some cte, and throw
    an exception:
    ```
    errCode = 2, detailMessage = class 
org.apache.doris.planner.MultiCastDataSink cannot be cast to class 
org.apache.doris.planner.DataStreamSink 
(org.apache.doris.planner.MultiCastDataSink and 
org.apache.doris.planner.DataStreamSink are in unnamed module of loader 'app')
    ```
---
 .../plans/commands/insert/OlapInsertExecutor.java  | 24 +++++++++++++++++-
 regression-test/suites/insert_p0/insert.groovy     | 29 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 0153700863d..43a3327a378 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -40,6 +40,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
 import org.apache.doris.planner.DataSink;
 import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.MultiCastDataSink;
 import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.qe.ConnectContext;
@@ -139,7 +140,28 @@ public class OlapInsertExecutor extends 
AbstractInsertExecutor {
             // set schema and partition info for tablet id shuffle exchange
             if (fragment.getPlanRoot() instanceof ExchangeNode
                     && fragment.getDataPartition().getType() == 
TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
-                DataStreamSink dataStreamSink = (DataStreamSink) 
(fragment.getChild(0).getSink());
+                DataSink childFragmentSink = fragment.getChild(0).getSink();
+                DataStreamSink dataStreamSink = null;
+                if (childFragmentSink instanceof MultiCastDataSink) {
+                    MultiCastDataSink multiCastDataSink = (MultiCastDataSink) 
childFragmentSink;
+                    int outputExchangeId = 
(fragment.getPlanRoot()).getId().asInt();
+                    // which DataStreamSink link to the output exchangeNode?
+                    for (DataStreamSink currentDataStreamSink : 
multiCastDataSink.getDataStreamSinks()) {
+                        int sinkExchangeId = 
currentDataStreamSink.getExchNodeId().asInt();
+                        if (outputExchangeId == sinkExchangeId) {
+                            dataStreamSink = currentDataStreamSink;
+                            break;
+                        }
+                    }
+                    if (dataStreamSink == null) {
+                        throw new IllegalStateException("Can not find 
DataStreamSink in the MultiCastDataSink");
+                    }
+                } else if (childFragmentSink instanceof DataStreamSink) {
+                    dataStreamSink = (DataStreamSink) childFragmentSink;
+                } else {
+                    throw new IllegalStateException("Unsupported DataSink: " + 
childFragmentSink);
+                }
+
                 Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), 
ConnectContext.get());
                 
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
                         database.getId(), olapTableSink.getDstTable(), 
analyzer));
diff --git a/regression-test/suites/insert_p0/insert.groovy 
b/regression-test/suites/insert_p0/insert.groovy
index 573d5d8366c..4d1eae21962 100644
--- a/regression-test/suites/insert_p0/insert.groovy
+++ b/regression-test/suites/insert_p0/insert.groovy
@@ -83,4 +83,33 @@ suite("insert") {
     sql "sync"
     qt_insert """ select * from mutable_datatype order by c_bigint, c_double, 
c_string, c_date, c_timestamp, c_boolean, c_short_decimal"""
 
+
+    multi_sql """
+        drop table if exists table_select_test1;
+        CREATE TABLE table_select_test1 (
+          `id` int    
+        )
+        distributed by hash(id)
+        properties('replication_num'='1');
+
+        insert into table_select_test1 values(2);
+
+        drop table if exists table_test_insert1;
+        create table table_test_insert1 (id int)
+        partition by range(id)
+        (
+          partition p1 values[('1'), ('50')),
+          partition p2 values[('50'), ('100'))
+        )
+        distributed by hash(id) buckets 100
+        properties('replication_num'='1')
+        
+        insert into table_test_insert1 values(1), (50);
+        
+        insert into table_test_insert1
+        with
+          a as (select * from table_select_test1),
+          b as (select * from a)
+        select id from a;
+        """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to