This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b104e334a [Bug](load) fix missing nullable info in stream load 
(#12302)
3b104e334a is described below

commit 3b104e334a9cb3e8f940343bab3fc53232993816
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Sep 5 13:41:28 2022 +0800

    [Bug](load) fix missing nullable info in stream load (#12302)
---
 .../apache/doris/planner/StreamLoadPlanner.java    | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 0c0a41be24..8b407fdc6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -20,6 +20,7 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -84,6 +85,7 @@ public class StreamLoadPlanner {
 
     private StreamLoadScanNode scanNode;
     private TupleDescriptor tupleDesc;
+    private TupleDescriptor scanTupleDesc;
 
     public StreamLoadPlanner(Database db, OlapTable destTable, LoadTaskInfo 
taskInfo) {
         this.db = db;
@@ -123,7 +125,10 @@ public class StreamLoadPlanner {
             throw new UserException("There is no sequence column in the table 
" + destTable.getName());
         }
         resetAnalyzer();
-        // construct tuple descriptor, used for scanNode and dataSink
+        // note: we use two tuples separately for Scan and Sink here to avoid 
wrong nullable info.
+        // construct tuple descriptor, used for scanNode
+        scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
+        // construct tuple descriptor, used for dataSink
         tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
         boolean negative = taskInfo.getNegative();
         // here we should be full schema to fill the descriptor table
@@ -132,13 +137,30 @@ public class StreamLoadPlanner {
             slotDesc.setIsMaterialized(true);
             slotDesc.setColumn(col);
             slotDesc.setIsNullable(col.isAllowNull());
+
+            SlotDescriptor scanSlotDesc = 
descTable.addSlotDescriptor(scanTupleDesc);
+            scanSlotDesc.setIsMaterialized(true);
+            scanSlotDesc.setColumn(col);
+            scanSlotDesc.setIsNullable(col.isAllowNull());
+            for (ImportColumnDesc importColumnDesc : 
taskInfo.getColumnExprDescs().descs) {
+                try {
+                    if (!importColumnDesc.isColumn() && 
importColumnDesc.getColumnName() != null
+                            && 
importColumnDesc.getColumnName().equals(col.getName())) {
+                        
scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
+                        break;
+                    }
+                } catch (Exception e) {
+                    // An exception may be thrown here because the 
`importColumnDesc.getExpr()` is not analyzed now.
+                    // We just skip this case here.
+                }
+            }
             if (negative && !col.isKey() && col.getAggregationType() != 
AggregateType.SUM) {
                 throw new DdlException("Column is not SUM AggregateType. 
column:" + col.getName());
             }
         }
 
         // create scan node
-        scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), 
tupleDesc, destTable, taskInfo);
+        scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), 
scanTupleDesc, destTable, taskInfo);
         scanNode.init(analyzer);
         descTable.computeStatAndMemLayout();
         scanNode.finalize(analyzer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to