This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 40767003c62 [Fix](ScanNode) Move the finalize phase of ScanNode to 
after the end of the Physical Translate phase (#38604)
40767003c62 is described below

commit 40767003c627d4adf9d3b7e059ca9a1ee722015e
Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com>
AuthorDate: Mon Aug 5 08:58:59 2024 +0800

    [Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the 
Physical Translate phase (#38604)
    
    bp: #37565
    
    Currently, Doris first obtains splits and then performs projection.
    After column pruning, it calls `updateRequiredSlots` to update the
    scanRange information. However, the Trino connector's column pruning
    pushdown needs to be completed before obtaining splits.
    
    Therefore, we move the finalize phase of `ScanNode` to after the end of
    the `Physical Translate` phase, so that `createScanRangeLocations` can
    use the final columns which have been pruning.
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 .../apache/doris/datasource/FileQueryScanNode.java | 13 --------
 .../doris/datasource/jdbc/source/JdbcScanNode.java |  9 ------
 .../doris/datasource/odbc/source/OdbcScanNode.java |  9 ------
 .../datasource/paimon/source/PaimonScanNode.java   | 19 ------------
 .../glue/translator/PhysicalPlanTranslator.java    | 36 ++++++++--------------
 .../org/apache/doris/planner/OlapScanNode.java     |  6 +++-
 .../java/org/apache/doris/planner/ScanNode.java    | 10 ------
 7 files changed, 17 insertions(+), 85 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 9822855aa72..517ba8be5f8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -19,7 +19,6 @@ package org.apache.doris.datasource;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TableSample;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
@@ -40,7 +39,6 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hive.source.HiveSplit;
 import org.apache.doris.datasource.iceberg.source.IcebergSplit;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
@@ -80,7 +78,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * FileQueryScanNode for querying the file access type of catalog, now only 
support
@@ -182,16 +179,6 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         params.setSrcTupleId(-1);
     }
 
-    /**
-     * Reset required_slots in contexts. This is called after Nereids planner 
do the projection.
-     * In the projection process, some slots may be removed. So call this to 
update the slots info.
-     */
-    @Override
-    public void updateRequiredSlots(PlanTranslatorContext 
planTranslatorContext,
-            Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
-        updateRequiredSlots();
-    }
-
     private void updateRequiredSlots() throws UserException {
         params.unsetRequiredSlots();
         for (SlotDescriptor slot : desc.getSlots()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 0d292100fe0..a85dd4aaafb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
@@ -39,7 +38,6 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
@@ -59,7 +57,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 public class JdbcScanNode extends ExternalScanNode {
     private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
@@ -252,12 +249,6 @@ public class JdbcScanNode extends ExternalScanNode {
         createScanRangeLocations();
     }
 
-    @Override
-    public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> 
requiredByProjectSlotIdSet)
-            throws UserException {
-        createJdbcColumns();
-    }
-
     @Override
     protected void createScanRangeLocations() throws UserException {
         scanRangeLocations = 
Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index 9b597dddb54..2f9aa4a8334 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ExprSubstitutionMap;
 import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
@@ -33,7 +32,6 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
@@ -53,7 +51,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Full scan of an ODBC table.
@@ -117,12 +114,6 @@ public class OdbcScanNode extends ExternalScanNode {
         createScanRangeLocations();
     }
 
-    @Override
-    public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> 
requiredByProjectSlotIdSet)
-            throws UserException {
-        createOdbcColumns();
-    }
-
     @Override
     protected void createScanRangeLocations() throws UserException {
         scanRangeLocations = 
Lists.newArrayList(createSingleScanRangeLocations(backendPolicy));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index c248bd0635a..6eb1545817a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.datasource.paimon.source;
 
-import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
@@ -29,7 +28,6 @@ import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalTable;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
@@ -40,7 +38,6 @@ import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TPaimonDeletionFileDesc;
 import org.apache.doris.thrift.TPaimonFileDesc;
-import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
 import com.google.common.base.Preconditions;
@@ -289,22 +286,6 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
     }
 
-    //When calling 'setPaimonParams' and 'getSplits', the column trimming has 
not been performed yet,
-    // Therefore, paimon_column_names is temporarily reset here
-    @Override
-    public void updateRequiredSlots(PlanTranslatorContext 
planTranslatorContext,
-            Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
-        super.updateRequiredSlots(planTranslatorContext, 
requiredByProjectSlotIdSet);
-        String cols = desc.getSlots().stream().map(slot -> 
slot.getColumn().getName())
-                .collect(Collectors.joining(","));
-        for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) {
-            List<TFileRangeDesc> ranges = 
tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges;
-            for (TFileRangeDesc tFileRangeDesc : ranges) {
-                
tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols);
-            }
-        }
-    }
-
     @Override
     public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
         return getLocationType(((FileStoreTable) 
source.getPaimonTable()).location().toString());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 65289ab5201..7247c9d9291 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -46,8 +46,6 @@ import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.es.source.EsScanNode;
@@ -274,6 +272,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 throw new AnalysisException("tables with unknown column stats: 
" + builder);
             }
         }
+        for (ScanNode scanNode : context.getScanNodes()) {
+            Utils.execWithUncheckedException(scanNode::finalizeForNereids);
+        }
         return rootFragment;
     }
 
@@ -635,7 +636,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
                 )
         );
-        Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
esScanNode, dataPartition);
         context.addPlanFragment(planFragment);
@@ -687,7 +687,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, 
context)
                 )
         );
-        Utils.execWithUncheckedException(scanNode::finalizeForNereids);
         // Create PlanFragment
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = createPlanFragment(scanNode, 
dataPartition, fileScan);
@@ -712,7 +711,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
                 )
         );
-        Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
jdbcScanNode, dataPartition);
         context.addPlanFragment(planFragment);
@@ -736,7 +734,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
                 )
         );
-        Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
odbcScanNode, dataPartition);
         context.addPlanFragment(planFragment);
@@ -817,8 +814,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             olapScanNode.setUseTopnOpt(true);
             context.getTopnFilterContext().addLegacyTarget(olapScan, 
olapScanNode);
         }
-        // TODO: we need to remove all finalizeForNereids
-        olapScanNode.finalizeForNereids();
         // Create PlanFragment
         // TODO: use a util function to convert distribution to DataPartition
         DataPartition dataPartition = DataPartition.RANDOM;
@@ -908,7 +903,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                                 .translateRuntimeFilterTarget(expr, 
finalScanNode, context)
                 )
         );
-        Utils.execWithUncheckedException(scanNode::finalizeForNereids);
         context.addScanNode(scanNode);
         PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, schemaScan);
         context.addPlanFragment(planFragment);
@@ -930,7 +924,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                         .forEach(expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context)
                 )
         );
-        Utils.execWithUncheckedException(scanNode::finalizeForNereids);
         context.addScanNode(scanNode);
 
         // TODO: it is weird update label in this way
@@ -1976,6 +1969,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     }
                     requiredSlotIdSet.add(lastSlot.getId());
                 }
+                ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, 
requiredByProjectSlotIdSet);
             }
             updateScanSlotsMaterialization((ScanNode) inputPlanNode, 
requiredSlotIdSet,
                     requiredByProjectSlotIdSet, slotIdsByOrder, context);
@@ -2443,22 +2437,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         if (scanNode.getTupleDesc().getSlots().isEmpty()) {
             scanNode.getTupleDesc().getSlots().add(smallest);
         }
-        try {
-            if (context.getSessionVariable() != null
-                    && context.getSessionVariable().forbidUnknownColStats
-                    && 
!StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
-                for (SlotId slotId : requiredByProjectSlotIdSet) {
-                    if (context.isColumnStatsUnknown(scanNode, slotId)) {
-                        String colName = 
scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
-                        throw new AnalysisException("meet unknown column 
stats: " + colName);
-                    }
+        if (context.getSessionVariable() != null
+                && context.getSessionVariable().forbidUnknownColStats
+                && 
!StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) {
+            for (SlotId slotId : requiredByProjectSlotIdSet) {
+                if (context.isColumnStatsUnknown(scanNode, slotId)) {
+                    String colName = 
scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName();
+                    throw new AnalysisException("meet unknown column stats: " 
+ colName);
                 }
-                context.removeScanFromStatsUnknownColumnsMap(scanNode);
             }
-            scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet);
-        } catch (UserException e) {
-            Util.logAndThrowRuntimeException(LOG,
-                    "User Exception while reset external file scan node 
contexts.", e);
+            context.removeScanFromStatsUnknownColumnsMap(scanNode);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 52104f6e668..09496d02578 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1720,7 +1720,11 @@ public class OlapScanNode extends ScanNode {
                 : Sets.newTreeSet();
     }
 
-    @Override
+    /**
+     * Update required_slots in scan node contexts. This is called after 
Nereids planner do the projection.
+     * In the projection process, some slots may be removed. So call this to 
update the slots info.
+     * Currently, it is only used by ExternalFileScanNode, add the interface 
here to keep the Nereids code clean.
+     */
     public void updateRequiredSlots(PlanTranslatorContext context,
             Set<SlotId> requiredByProjectSlotIdSet) {
         outputColumnUniqueIds.clear();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index c2158b4a0d4..8400d1d0c27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -48,7 +48,6 @@ import org.apache.doris.datasource.FileScanNode;
 import org.apache.doris.datasource.SplitAssignment;
 import org.apache.doris.datasource.SplitGenerator;
 import org.apache.doris.datasource.SplitSource;
-import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.query.StatsDelta;
@@ -169,15 +168,6 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
         return false;
     }
 
-    /**
-     * Update required_slots in scan node contexts. This is called after 
Nereids planner do the projection.
-     * In the projection process, some slots may be removed. So call this to 
update the slots info.
-     * Currently, it is only used by ExternalFileScanNode, add the interface 
here to keep the Nereids code clean.
-     */
-    public void updateRequiredSlots(PlanTranslatorContext context,
-            Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
-    }
-
     private void computeColumnFilter(Column column, SlotDescriptor slotDesc, 
PartitionInfo partitionsInfo) {
         // Set `columnFilters` all the time because `DistributionPruner` also 
use this.
         // Maybe we could use `columnNameToRange` for `DistributionPruner` and


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to