This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 40767003c62 [Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the Physical Translate phase (#38604) 40767003c62 is described below commit 40767003c627d4adf9d3b7e059ca9a1ee722015e Author: Tiewei Fang <43782773+bepppo...@users.noreply.github.com> AuthorDate: Mon Aug 5 08:58:59 2024 +0800 [Fix](ScanNode) Move the finalize phase of ScanNode to after the end of the Physical Translate phase (#38604) bp: #37565 Currently, Doris first obtains splits and then performs projection. After column pruning, it calls `updateRequiredSlots` to update the scanRange information. However, the Trino connector's column pruning pushdown needs to be completed before obtaining splits. Therefore, we move the finalize phase of `ScanNode` to after the end of the `Physical Translate` phase, so that `createScanRangeLocations` can use the final columns which have been pruning. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- .../apache/doris/datasource/FileQueryScanNode.java | 13 -------- .../doris/datasource/jdbc/source/JdbcScanNode.java | 9 ------ .../doris/datasource/odbc/source/OdbcScanNode.java | 9 ------ .../datasource/paimon/source/PaimonScanNode.java | 19 ------------ .../glue/translator/PhysicalPlanTranslator.java | 36 ++++++++-------------- .../org/apache/doris/planner/OlapScanNode.java | 6 +++- .../java/org/apache/doris/planner/ScanNode.java | 10 ------ 7 files changed, 17 insertions(+), 85 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 9822855aa72..517ba8be5f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -19,7 +19,6 @@ package org.apache.doris.datasource; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TableSample; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; @@ -40,7 +39,6 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.datasource.iceberg.source.IcebergSplit; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -80,7 +78,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; /** * FileQueryScanNode for querying the file access type of catalog, now only support @@ -182,16 +179,6 @@ public abstract class FileQueryScanNode extends FileScanNode { params.setSrcTupleId(-1); } - /** - * Reset required_slots in contexts. This is called after Nereids planner do the projection. - * In the projection process, some slots may be removed. So call this to update the slots info. - */ - @Override - public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext, - Set<SlotId> requiredByProjectSlotIdSet) throws UserException { - updateRequiredSlots(); - } - private void updateRequiredSlots() throws UserException { params.unsetRequiredSlots(); for (SlotDescriptor slot : desc.getSlots()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java index 0d292100fe0..a85dd4aaafb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java @@ -27,7 +27,6 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -39,7 +38,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.jdbc.JdbcExternalTable; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; @@ -59,7 +57,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; -import java.util.Set; public class JdbcScanNode extends ExternalScanNode { private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class); @@ -252,12 +249,6 @@ public class JdbcScanNode extends ExternalScanNode { createScanRangeLocations(); } - @Override - public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet) - throws UserException { - createJdbcColumns(); - } - @Override protected void createScanRangeLocations() throws UserException { scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java index 9b597dddb54..2f9aa4a8334 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; @@ -33,7 +32,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; @@ -53,7 +51,6 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; -import java.util.Set; /** * Full scan of an ODBC table. @@ -117,12 +114,6 @@ public class OdbcScanNode extends ExternalScanNode { createScanRangeLocations(); } - @Override - public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet) - throws UserException { - createOdbcColumns(); - } - @Override protected void createScanRangeLocations() throws UserException { scanRangeLocations = Lists.newArrayList(createSingleScanRangeLocations(backendPolicy)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index c248bd0635a..6eb1545817a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource.paimon.source; -import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; @@ -29,7 +28,6 @@ import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -40,7 +38,6 @@ import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TPaimonDeletionFileDesc; import org.apache.doris.thrift.TPaimonFileDesc; -import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; @@ -289,22 +286,6 @@ public class PaimonScanNode extends FileQueryScanNode { } } - //When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet, - // Therefore, paimon_column_names is temporarily reset here - @Override - public void updateRequiredSlots(PlanTranslatorContext planTranslatorContext, - Set<SlotId> requiredByProjectSlotIdSet) throws UserException { - super.updateRequiredSlots(planTranslatorContext, requiredByProjectSlotIdSet); - String cols = desc.getSlots().stream().map(slot -> slot.getColumn().getName()) - .collect(Collectors.joining(",")); - for (TScanRangeLocations tScanRangeLocations : scanRangeLocations) { - List<TFileRangeDesc> ranges = tScanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges; - for (TFileRangeDesc tFileRangeDesc : ranges) { - tFileRangeDesc.table_format_params.paimon_params.setPaimonColumnNames(cols); - } - } - } - @Override public TFileType getLocationType() throws DdlException, MetaNotFoundException { return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 65289ab5201..7247c9d9291 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -46,8 +46,6 @@ import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.source.EsScanNode; @@ -274,6 +272,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla throw new AnalysisException("tables with unknown column stats: " + builder); } } + for (ScanNode scanNode : context.getScanNodes()) { + Utils.execWithUncheckedException(scanNode::finalizeForNereids); + } return rootFragment; } @@ -635,7 +636,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context) ) ); - Utils.execWithUncheckedException(esScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition); context.addPlanFragment(planFragment); @@ -687,7 +687,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - Utils.execWithUncheckedException(scanNode::finalizeForNereids); // Create PlanFragment DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); @@ -712,7 +711,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context) ) ); - Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition); context.addPlanFragment(planFragment); @@ -736,7 +734,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context) ) ); - Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids); DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition); context.addPlanFragment(planFragment); @@ -817,8 +814,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla olapScanNode.setUseTopnOpt(true); context.getTopnFilterContext().addLegacyTarget(olapScan, olapScanNode); } - // TODO: we need to remove all finalizeForNereids - olapScanNode.finalizeForNereids(); // Create PlanFragment // TODO: use a util function to convert distribution to DataPartition DataPartition dataPartition = DataPartition.RANDOM; @@ -908,7 +903,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla .translateRuntimeFilterTarget(expr, finalScanNode, context) ) ); - Utils.execWithUncheckedException(scanNode::finalizeForNereids); context.addScanNode(scanNode); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); @@ -930,7 +924,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla .forEach(expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, scanNode, context) ) ); - Utils.execWithUncheckedException(scanNode::finalizeForNereids); context.addScanNode(scanNode); // TODO: it is weird update label in this way @@ -1976,6 +1969,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } requiredSlotIdSet.add(lastSlot.getId()); } + ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet); } updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, slotIdsByOrder, context); @@ -2443,22 +2437,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla if (scanNode.getTupleDesc().getSlots().isEmpty()) { scanNode.getTupleDesc().getSlots().add(smallest); } - try { - if (context.getSessionVariable() != null - && context.getSessionVariable().forbidUnknownColStats - && !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) { - for (SlotId slotId : requiredByProjectSlotIdSet) { - if (context.isColumnStatsUnknown(scanNode, slotId)) { - String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName(); - throw new AnalysisException("meet unknown column stats: " + colName); - } + if (context.getSessionVariable() != null + && context.getSessionVariable().forbidUnknownColStats + && !StatisticConstants.isSystemTable(scanNode.getTupleDesc().getTable())) { + for (SlotId slotId : requiredByProjectSlotIdSet) { + if (context.isColumnStatsUnknown(scanNode, slotId)) { + String colName = scanNode.getTupleDesc().getSlot(slotId.asInt()).getColumn().getName(); + throw new AnalysisException("meet unknown column stats: " + colName); } - context.removeScanFromStatsUnknownColumnsMap(scanNode); } - scanNode.updateRequiredSlots(context, requiredByProjectSlotIdSet); - } catch (UserException e) { - Util.logAndThrowRuntimeException(LOG, - "User Exception while reset external file scan node contexts.", e); + context.removeScanFromStatsUnknownColumnsMap(scanNode); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 52104f6e668..09496d02578 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1720,7 +1720,11 @@ public class OlapScanNode extends ScanNode { : Sets.newTreeSet(); } - @Override + /** + * Update required_slots in scan node contexts. This is called after Nereids planner do the projection. + * In the projection process, some slots may be removed. So call this to update the slots info. + * Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean. + */ public void updateRequiredSlots(PlanTranslatorContext context, Set<SlotId> requiredByProjectSlotIdSet) { outputColumnUniqueIds.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index c2158b4a0d4..8400d1d0c27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -48,7 +48,6 @@ import org.apache.doris.datasource.FileScanNode; import org.apache.doris.datasource.SplitAssignment; import org.apache.doris.datasource.SplitGenerator; import org.apache.doris.datasource.SplitSource; -import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.query.StatsDelta; @@ -169,15 +168,6 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { return false; } - /** - * Update required_slots in scan node contexts. This is called after Nereids planner do the projection. - * In the projection process, some slots may be removed. So call this to update the slots info. - * Currently, it is only used by ExternalFileScanNode, add the interface here to keep the Nereids code clean. - */ - public void updateRequiredSlots(PlanTranslatorContext context, - Set<SlotId> requiredByProjectSlotIdSet) throws UserException { - } - private void computeColumnFilter(Column column, SlotDescriptor slotDesc, PartitionInfo partitionsInfo) { // Set `columnFilters` all the time because `DistributionPruner` also use this. // Maybe we could use `columnNameToRange` for `DistributionPruner` and --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org