This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3b104e334a [Bug](load) fix missing nullable info in stream load (#12302) 3b104e334a is described below commit 3b104e334a9cb3e8f940343bab3fc53232993816 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Sep 5 13:41:28 2022 +0800 [Bug](load) fix missing nullable info in stream load (#12302) --- .../apache/doris/planner/StreamLoadPlanner.java | 26 ++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 0c0a41be24..8b407fdc6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; @@ -84,6 +85,7 @@ public class StreamLoadPlanner { private StreamLoadScanNode scanNode; private TupleDescriptor tupleDesc; + private TupleDescriptor scanTupleDesc; public StreamLoadPlanner(Database db, OlapTable destTable, LoadTaskInfo taskInfo) { this.db = db; @@ -123,7 +125,10 @@ public class StreamLoadPlanner { throw new UserException("There is no sequence column in the table " + destTable.getName()); } resetAnalyzer(); - // construct tuple descriptor, used for scanNode and dataSink + // note: we use two tuples separately for Scan and Sink here to avoid wrong nullable info. + // construct tuple descriptor, used for scanNode + scanTupleDesc = descTable.createTupleDescriptor("ScanTuple"); + // construct tuple descriptor, used for dataSink tupleDesc = descTable.createTupleDescriptor("DstTableTuple"); boolean negative = taskInfo.getNegative(); // here we should be full schema to fill the descriptor table @@ -132,13 +137,30 @@ public class StreamLoadPlanner { slotDesc.setIsMaterialized(true); slotDesc.setColumn(col); slotDesc.setIsNullable(col.isAllowNull()); + + SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc); + scanSlotDesc.setIsMaterialized(true); + scanSlotDesc.setColumn(col); + scanSlotDesc.setIsNullable(col.isAllowNull()); + for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { + try { + if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null + && importColumnDesc.getColumnName().equals(col.getName())) { + scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable()); + break; + } + } catch (Exception e) { + // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed now. + // We just skip this case here. + } + } if (negative && !col.isKey() && col.getAggregationType() != AggregateType.SUM) { throw new DdlException("Column is not SUM AggregateType. column:" + col.getName()); } } // create scan node - scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, taskInfo); + scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), scanTupleDesc, destTable, taskInfo); scanNode.init(analyzer); descTable.computeStatAndMemLayout(); scanNode.finalize(analyzer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org