This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 50eaf238bf7 [fix](hudi) move wrong members in HMSExternalTable #36187 (#36282) 50eaf238bf7 is described below commit 50eaf238bf721914a3a4227b1fee40f7144c0028 Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Jun 17 11:28:55 2024 +0800 [fix](hudi) move wrong members in HMSExternalTable #36187 (#36282) pick from branch-2.1 #36187 --- .../doris/datasource/hive/HMSExternalTable.java | 94 --------- .../hudi/source/HudiCachedPartitionProcessor.java | 3 +- .../doris/datasource/hudi/source/HudiScanNode.java | 45 ++-- .../glue/translator/PhysicalPlanTranslator.java | 86 +++++--- .../org/apache/doris/nereids/rules/RuleSet.java | 2 + .../org/apache/doris/nereids/rules/RuleType.java | 1 + .../doris/nereids/rules/analysis/BindRelation.java | 17 +- .../doris/nereids/rules/analysis/CheckPolicy.java | 13 +- .../LogicalFileScanToPhysicalFileScan.java | 3 +- ...java => LogicalHudiScanToPhysicalHudiScan.java} | 34 ++-- .../doris/nereids/stats/StatsCalculator.java | 6 + .../apache/doris/nereids/trees/plans/PlanType.java | 1 + .../trees/plans/logical/LogicalFileScan.java | 8 +- .../trees/plans/logical/LogicalHudiScan.java | 226 +++++++++++++++++++++ .../trees/plans/physical/PhysicalFileScan.java | 44 +++- ...PhysicalFileScan.java => PhysicalHudiScan.java} | 125 +++++------- .../trees/plans/visitor/RelationVisitor.java | 10 + .../apache/doris/nereids/util/RelationUtil.java | 16 +- .../apache/doris/planner/SingleNodePlanner.java | 10 +- .../apache/doris/external/hms/HmsCatalogTest.java | 5 + 20 files changed, 483 insertions(+), 266 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 4cee3d311fd..2c879fba503 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource.hive; -import org.apache.doris.analysis.TableScanParams; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; @@ -31,23 +30,12 @@ import org.apache.doris.common.Config; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.hudi.HudiUtils; -import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; -import org.apache.doris.datasource.hudi.source.IncrementalRelation; -import org.apache.doris.datasource.hudi.source.MORIncrementalRelation; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; -import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.GreaterThan; -import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; -import org.apache.doris.nereids.trees.expressions.LessThanEqual; -import org.apache.doris.nereids.trees.expressions.Slot; -import org.apache.doris.nereids.trees.expressions.SlotReference; -import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; import org.apache.doris.qe.GlobalVariable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -60,7 +48,6 @@ import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -76,7 +63,6 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -162,10 +148,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI // record the event update time when enable hms event listener protected volatile long eventUpdateTime; - // for hudi incremental read - private TableScanParams scanParams = null; - private IncrementalRelation incrementalRelation = null; - public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } @@ -305,82 +287,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI .orElse(Collections.emptyList()); } - public TableScanParams getScanParams() { - return scanParams; - } - - public void setScanParams(TableScanParams scanParams) { - if (scanParams != null && scanParams.incrementalRead()) { - Map<String, String> optParams = getHadoopProperties(); - if (scanParams.getParams().containsKey("beginTime")) { - optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime")); - } - if (scanParams.getParams().containsKey("endTime")) { - optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime")); - } - scanParams.getParams().forEach((k, v) -> { - if (k.startsWith("hoodie.")) { - optParams.put(k, v); - } - }); - HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(this); - try { - boolean isCowOrRoTable = isHoodieCowTable(); - if (isCowOrRoTable) { - Map<String, String> serd = remoteTable.getSd().getSerdeInfo().getParameters(); - if ("true".equals(serd.get("hoodie.query.as.ro.table")) - && remoteTable.getTableName().endsWith("_ro")) { - // Incremental read RO table as RT table, I don't know why? - isCowOrRoTable = false; - LOG.warn("Execute incremental read on RO table"); - } - } - if (isCowOrRoTable) { - incrementalRelation = new COWIncrementalRelation( - optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient); - } else { - incrementalRelation = new MORIncrementalRelation( - optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient); - } - } catch (Exception e) { - LOG.warn("Failed to create incremental relation", e); - } - } - this.scanParams = scanParams; - } - - public IncrementalRelation getIncrementalRelation() { - return incrementalRelation; - } - - /** - * replace incremental params as AND expression - * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') => - * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278' - */ - public Set<Expression> generateIncrementalExpression(List<Slot> slots) { - if (incrementalRelation == null) { - return Collections.emptySet(); - } - SlotReference timeField = null; - for (Slot slot : slots) { - if ("_hoodie_commit_time".equals(slot.getName())) { - timeField = (SlotReference) slot; - break; - } - } - if (timeField == null) { - return Collections.emptySet(); - } - StringLiteral upperValue = new StringLiteral(incrementalRelation.getEndTs()); - StringLiteral lowerValue = new StringLiteral(incrementalRelation.getStartTs()); - ComparisonPredicate less = new LessThanEqual(timeField, upperValue); - ComparisonPredicate great = incrementalRelation.isIncludeStartTime() - ? new GreaterThanEqual(timeField, lowerValue) - : new GreaterThan(timeField, lowerValue); - return ImmutableSet.of(great, less); - } - public boolean isHiveTransactionalTable() { return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable) && isSupportedTransactionalFileFormat(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java index c8220349019..d9c1c208271 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.hudi.source; import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; +import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.TablePartitionValues; import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey; @@ -163,7 +164,7 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor { } } catch (Exception e) { LOG.warn("Failed to get hudi partitions", e); - throw new CacheException("Failed to get hudi partitions", e); + throw new CacheException("Failed to get hudi partitions: " + Util.getRootCauseMessage(e), e); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index 61edc333f6c..8f2b3e598b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -97,9 +97,6 @@ public class HudiScanNode extends HiveScanNode { private List<String> columnNames; private List<String> columnTypes; - private boolean incrementalRead = false; - private IncrementalRelation incrementalRelation; - private boolean partitionInit = false; private HoodieTimeline timeline; private Option<String> snapshotTimestamp; @@ -108,25 +105,32 @@ public class HudiScanNode extends HiveScanNode { private Iterator<HivePartition> prunedPartitionsIter; private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION; + private boolean incrementalRead = false; + private TableScanParams scanParams; + private IncrementalRelation incrementalRelation; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv * eg: s3 tvf * These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check */ - public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, + Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); isCowOrRoTable = hmsTable.isHoodieCowTable(); - if (isCowOrRoTable) { - if (LOG.isDebugEnabled()) { - LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName()); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", hmsTable.getName()); + if (LOG.isDebugEnabled()) { + if (isCowOrRoTable) { + LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getFullQualifiers()); + } else { + LOG.debug("Hudi table {} is a mor table, and will use JNI to read data in BE", + hmsTable.getFullQualifiers()); } } useHiveSyncPartition = hmsTable.useHiveSyncPartition(); + this.scanParams = scanParams.orElse(null); + this.incrementalRelation = incrementalRelation.orElse(null); + this.incrementalRead = (this.scanParams != null && this.scanParams.incrementalRead()); } @Override @@ -171,17 +175,9 @@ public class HudiScanNode extends HiveScanNode { columnTypes.add(columnType); } - TableScanParams scanParams = desc.getRef().getScanParams(); - if (scanParams != null) { - throw new UserException("Incremental read should turn on nereids planner"); - } - scanParams = hmsTable.getScanParams(); - if (scanParams != null) { - if (scanParams.incrementalRead()) { - incrementalRead = true; - } else { - throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table"); - } + if (scanParams != null && !scanParams.incrementalRead()) { + // Only support incremental read + throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table"); } if (incrementalRead) { if (isCowOrRoTable) { @@ -191,18 +187,15 @@ public class HudiScanNode extends HiveScanNode { && hmsTable.getRemoteTable().getTableName().endsWith("_ro")) { // Incremental read RO table as RT table, I don't know why? isCowOrRoTable = false; - LOG.warn("Execute incremental read on RO table"); + LOG.warn("Execute incremental read on RO table: {}", hmsTable.getFullQualifiers()); } } catch (Exception e) { // ignore } } - incrementalRelation = hmsTable.getIncrementalRelation(); if (incrementalRelation == null) { throw new UserException("Failed to create incremental relation"); } - } else { - incrementalRelation = null; } timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2ef4fc1debf..664233baa88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -51,6 +51,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.es.source.EsScanNode; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.source.HudiScanNode; import org.apache.doris.datasource.iceberg.IcebergExternalTable; @@ -125,6 +126,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; @@ -209,6 +211,7 @@ import com.google.common.collect.Sets; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; import java.util.ArrayList; import java.util.Collection; @@ -569,9 +572,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla ScanNode scanNode; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { - case HUDI: - scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false); - break; case ICEBERG: scanNode = new IcebergScanNode(context.nextPlanNodeId(), tupleDescriptor, false); IcebergScanNode icebergScanNode = (IcebergScanNode) scanNode; @@ -607,33 +607,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } else { throw new RuntimeException("do not support table type " + table.getType()); } - scanNode.setNereidsId(fileScan.getId()); - scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts())); - scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId())); - - TableName tableName = new TableName(null, "", ""); - TableRef ref = new TableRef(tableName, null, null); - BaseTableRef tableRef = new BaseTableRef(ref, table, tableName); - tupleDescriptor.setRef(tableRef); - if (fileScan.getStats() != null) { - scanNode.setCardinality((long) fileScan.getStats().getRowCount()); - } - Utils.execWithUncheckedException(scanNode::init); - context.addScanNode(scanNode, fileScan); - ScanNode finalScanNode = scanNode; - context.getRuntimeTranslator().ifPresent( - runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach( - expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) - ) - ); - context.getTopnFilterContext().translateTarget(fileScan, scanNode, context); - Utils.execWithUncheckedException(scanNode::finalizeForNereids); - // Create PlanFragment - DataPartition dataPartition = DataPartition.RANDOM; - PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); - context.addPlanFragment(planFragment); - updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); - return planFragment; + return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } @Override @@ -680,6 +654,58 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return planFragment; } + @Override + public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan, PlanTranslatorContext context) { + List<Slot> slots = fileScan.getOutput(); + ExternalTable table = fileScan.getTable(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); + + if (!(table instanceof HMSExternalTable) || ((HMSExternalTable) table).getDlaType() != DLAType.HUDI) { + throw new RuntimeException("Invalid table type for Hudi scan: " + table.getType()); + } + Preconditions.checkState(fileScan instanceof PhysicalHudiScan, + "Invalid physical scan: " + fileScan.getClass().getSimpleName() + + " for Hudi table"); + PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan; + ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), tupleDescriptor, false, + hudiScan.getScanParams(), hudiScan.getIncrementalRelation()); + + return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); + } + + @NotNull + private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileScan, PlanTranslatorContext context, + ScanNode scanNode, + ExternalTable table, TupleDescriptor tupleDescriptor) { + scanNode.setNereidsId(fileScan.getId()); + scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts())); + scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId())); + + TableName tableName = new TableName(null, "", ""); + TableRef ref = new TableRef(tableName, null, null); + BaseTableRef tableRef = new BaseTableRef(ref, table, tableName); + tupleDescriptor.setRef(tableRef); + if (fileScan.getStats() != null) { + scanNode.setCardinality((long) fileScan.getStats().getRowCount()); + } + Utils.execWithUncheckedException(scanNode::init); + context.addScanNode(scanNode, fileScan); + ScanNode finalScanNode = scanNode; + context.getRuntimeTranslator().ifPresent( + runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach( + expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, context) + ) + ); + context.getTopnFilterContext().translateTarget(fileScan, scanNode, context); + Utils.execWithUncheckedException(scanNode::finalizeForNereids); + // Create PlanFragment + DataPartition dataPartition = DataPartition.RANDOM; + PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); + context.addPlanFragment(planFragment); + updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); + return planFragment; + } + @Override public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTranslatorContext context) { List<Slot> slots = jdbcScan.getOutput(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index a5fb0b8736a..40f4b135837 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFi import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter; import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate; import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink; +import org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHudiScan; import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink; import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect; import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan; @@ -164,6 +165,7 @@ public class RuleSet { .add(new LogicalOlapScanToPhysicalOlapScan()) .add(new LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan()) .add(new LogicalSchemaScanToPhysicalSchemaScan()) + .add(new LogicalHudiScanToPhysicalHudiScan()) .add(new LogicalFileScanToPhysicalFileScan()) .add(new LogicalJdbcScanToPhysicalJdbcScan()) .add(new LogicalOdbcScanToPhysicalOdbcScan()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index bde86b61c29..94f11cb2d88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -429,6 +429,7 @@ public enum RuleType { LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 444f7f83776..d66b20e36a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.es.EsExternalTable; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.nereids.CTEContext; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.SqlCacheContext; @@ -55,6 +56,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; @@ -267,10 +269,17 @@ public class BindRelation extends OneAnalysisRuleFactory { Plan hiveViewPlan = parseAndAnalyzeHiveView(hmsTable, hiveCatalog, ddlSql, cascadesContext); return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan); } - hmsTable.setScanParams(unboundRelation.getScanParams()); - return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, - qualifierWithoutTableName, unboundRelation.getTableSample(), - unboundRelation.getTableSnapshot()); + if (hmsTable.getDlaType() == DLAType.HUDI) { + LogicalHudiScan hudiScan = new LogicalHudiScan(unboundRelation.getRelationId(), hmsTable, + qualifierWithoutTableName, unboundRelation.getTableSample(), + unboundRelation.getTableSnapshot()); + hudiScan = hudiScan.withScanParams(hmsTable, unboundRelation.getScanParams()); + return hudiScan; + } else { + return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, + qualifierWithoutTableName, unboundRelation.getTableSample(), + unboundRelation.getTableSnapshot()); + } case ICEBERG_EXTERNAL_TABLE: case PAIMON_EXTERNAL_TABLE: case MAX_COMPUTE_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java index 1e7d4dbb09d..94f7c36b108 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java @@ -25,8 +25,8 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy.RelatedPolicy; -import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.util.ExpressionUtils; @@ -65,12 +65,11 @@ public class CheckPolicy implements AnalysisRuleFactory { Set<Expression> combineFilter = new LinkedHashSet<>(); // replace incremental params as AND expression - if (relation instanceof LogicalFileScan) { - LogicalFileScan fileScan = (LogicalFileScan) relation; - if (fileScan.getTable() instanceof HMSExternalTable) { - HMSExternalTable hmsTable = (HMSExternalTable) fileScan.getTable(); - combineFilter.addAll(hmsTable.generateIncrementalExpression( - fileScan.getLogicalProperties().getOutput())); + if (relation instanceof LogicalHudiScan) { + LogicalHudiScan hudiScan = (LogicalHudiScan) relation; + if (hudiScan.getTable() instanceof HMSExternalTable) { + combineFilter.addAll(hudiScan.generateIncrementalExpression( + hudiScan.getLogicalProperties().getOutput())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index 8edb683151e..4946dcd56c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.implementation; import org.apache.doris.nereids.properties.DistributionSpecAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; import java.util.Optional; @@ -30,7 +31,7 @@ import java.util.Optional; public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalFileScan().then(fileScan -> + return logicalFileScan().when(plan -> !(plan instanceof LogicalHudiScan)).then(fileScan -> new PhysicalFileScan( fileScan.getRelationId(), fileScan.getTable(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java similarity index 53% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java index 8edb683151e..a5d676eab67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java @@ -20,28 +20,30 @@ package org.apache.doris.nereids.rules.implementation; import org.apache.doris.nereids.properties.DistributionSpecAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; -import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan; import java.util.Optional; /** - * Implementation rule that convert logical FileScan to physical FileScan. + * Implementation rule that convert logical HudiScan to physical HudiScan. */ -public class LogicalFileScanToPhysicalFileScan extends OneImplementationRuleFactory { +public class LogicalHudiScanToPhysicalHudiScan extends OneImplementationRuleFactory { @Override public Rule build() { - return logicalFileScan().then(fileScan -> - new PhysicalFileScan( - fileScan.getRelationId(), - fileScan.getTable(), - fileScan.getQualifier(), - DistributionSpecAny.INSTANCE, - Optional.empty(), - fileScan.getLogicalProperties(), - fileScan.getConjuncts(), - fileScan.getSelectedPartitions(), - fileScan.getTableSample(), - fileScan.getTableSnapshot()) - ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); + return logicalHudiScan().then(fileScan -> + new PhysicalHudiScan( + fileScan.getRelationId(), + fileScan.getTable(), + fileScan.getQualifier(), + DistributionSpecAny.INSTANCE, + Optional.empty(), + fileScan.getLogicalProperties(), + fileScan.getConjuncts(), + fileScan.getSelectedPartitions(), + fileScan.getTableSample(), + fileScan.getTableSnapshot(), + fileScan.getScanParams(), + fileScan.getIncrementalRelation()) + ).toRule(RuleType.LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index 04e71b53d03..e42c58ca6b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -69,6 +69,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect; import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; @@ -318,6 +319,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return computeCatalogRelation(fileScan); } + @Override + public Statistics visitLogicalHudiScan(LogicalHudiScan fileScan, Void context) { + return computeCatalogRelation(fileScan); + } + @Override public Statistics visitLogicalTVFRelation(LogicalTVFRelation tvfRelation, Void context) { return tvfRelation.getFunction().computeStats(tvfRelation.getOutput()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 13fd64a8798..4df122c9fc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -90,6 +90,7 @@ public enum PlanType { PHYSICAL_EMPTY_RELATION, PHYSICAL_ES_SCAN, PHYSICAL_FILE_SCAN, + PHYSICAL_HUDI_SCAN, PHYSICAL_JDBC_SCAN, PHYSICAL_ODBC_SCAN, PHYSICAL_ONE_ROW_RELATION, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 06d349fe2a6..8dd47c44b15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -45,14 +45,14 @@ import java.util.Set; */ public class LogicalFileScan extends LogicalExternalRelation { - private final SelectedPartitions selectedPartitions; - private final Optional<TableSample> tableSample; - private final Optional<TableSnapshot> tableSnapshot; + protected final SelectedPartitions selectedPartitions; + protected final Optional<TableSample> tableSample; + protected final Optional<TableSnapshot> tableSnapshot; /** * Constructor for LogicalFileScan. */ - public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, + protected LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java new file mode 100644 index 00000000000..8659ad3d9c3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.analysis.TableScanParams; +import org.apache.doris.analysis.TableSnapshot; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; +import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; +import org.apache.doris.datasource.hudi.source.IncrementalRelation; +import org.apache.doris.datasource.hudi.source.MORIncrementalRelation; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.TableSample; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Logical Hudi scan for Hudi table + */ +public class LogicalHudiScan extends LogicalFileScan { + private static final Logger LOG = LogManager.getLogger(LogicalHudiScan.class); + + // for hudi incremental read + private final Optional<TableScanParams> scanParams; + private final Optional<IncrementalRelation> incrementalRelation; + + /** + * Constructor for LogicalHudiScan. + */ + protected LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, + Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, + Optional<TableSnapshot> tableSnapshot, + Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) { + super(id, table, qualifier, groupExpression, logicalProperties, conjuncts, + selectedPartitions, tableSample, tableSnapshot); + Objects.requireNonNull(scanParams, "scanParams should not null"); + Objects.requireNonNull(incrementalRelation, "incrementalRelation should not null"); + this.scanParams = scanParams; + this.incrementalRelation = incrementalRelation; + } + + public LogicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier, + Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { + this(id, table, qualifier, Optional.empty(), Optional.empty(), + Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot, + Optional.empty(), Optional.empty()); + } + + public Optional<TableScanParams> getScanParams() { + return scanParams; + } + + public Optional<IncrementalRelation> getIncrementalRelation() { + return incrementalRelation; + } + + /** + * replace incremental params as AND expression + * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') => + * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278' + */ + public Set<Expression> generateIncrementalExpression(List<Slot> slots) { + if (!incrementalRelation.isPresent()) { + return Collections.emptySet(); + } + SlotReference timeField = null; + for (Slot slot : slots) { + if ("_hoodie_commit_time".equals(slot.getName())) { + timeField = (SlotReference) slot; + break; + } + } + if (timeField == null) { + return Collections.emptySet(); + } + StringLiteral upperValue = new StringLiteral(incrementalRelation.get().getEndTs()); + StringLiteral lowerValue = new StringLiteral(incrementalRelation.get().getStartTs()); + ComparisonPredicate less = new LessThanEqual(timeField, upperValue); + ComparisonPredicate great = incrementalRelation.get().isIncludeStartTime() + ? new GreaterThanEqual(timeField, lowerValue) + : new GreaterThan(timeField, lowerValue); + return ImmutableSet.of(great, less); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalHudiScan", + "qualified", qualifiedName(), + "output", getOutput() + ); + } + + @Override + public LogicalHudiScan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, groupExpression, + Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, + groupExpression, logicalProperties, conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + @Override + public LogicalHudiScan withConjuncts(Set<Expression> conjuncts) { + return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(), + Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + public LogicalHudiScan withSelectedPartitions(SelectedPartitions selectedPartitions) { + return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(), + Optional.of(getLogicalProperties()), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + @Override + public LogicalHudiScan withRelationId(RelationId relationId) { + return new LogicalHudiScan(relationId, (ExternalTable) table, qualifier, Optional.empty(), + Optional.empty(), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLogicalHudiScan(this, context); + } + + /** + * Set scan params for incremental read + * + * @param table should be hudi table + * @param scanParams including incremental read params + */ + public LogicalHudiScan withScanParams(HMSExternalTable table, TableScanParams scanParams) { + Optional<IncrementalRelation> newIncrementalRelation = Optional.empty(); + Optional<TableScanParams> newScanParams = Optional.empty(); + if (scanParams != null && scanParams.incrementalRead()) { + Map<String, String> optParams = table.getHadoopProperties(); + if (scanParams.getParams().containsKey("beginTime")) { + optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime")); + } + if (scanParams.getParams().containsKey("endTime")) { + optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime")); + } + scanParams.getParams().forEach((k, v) -> { + if (k.startsWith("hoodie.")) { + optParams.put(k, v); + } + }); + HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(table); + try { + boolean isCowOrRoTable = table.isHoodieCowTable(); + if (isCowOrRoTable) { + Map<String, String> serd = table.getRemoteTable().getSd().getSerdeInfo().getParameters(); + if ("true".equals(serd.get("hoodie.query.as.ro.table")) + && table.getRemoteTable().getTableName().endsWith("_ro")) { + // Incremental read RO table as RT table, I don't know why? + isCowOrRoTable = false; + LOG.warn("Execute incremental read on RO table: {}", table.getFullQualifiers()); + } + } + if (isCowOrRoTable) { + newIncrementalRelation = Optional.of(new COWIncrementalRelation( + optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient)); + } else { + newIncrementalRelation = Optional.of(new MORIncrementalRelation( + optParams, HiveMetaStoreClientHelper.getConfiguration(table), hudiClient)); + } + } catch (Exception e) { + throw new AnalysisException( + "Failed to create incremental relation for table: " + table.getFullQualifiers(), e); + } + } + newScanParams = Optional.ofNullable(scanParams); + return new LogicalHudiScan(relationId, table, qualifier, Optional.empty(), + Optional.empty(), conjuncts, selectedPartitions, tableSample, tableSnapshot, + newScanParams, newIncrementalRelation); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java index 8706db65f1e..639e026949e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java @@ -42,11 +42,11 @@ import java.util.Set; */ public class PhysicalFileScan extends PhysicalCatalogRelation { - private final DistributionSpec distributionSpec; - private final Set<Expression> conjuncts; - private final SelectedPartitions selectedPartitions; - private final Optional<TableSample> tableSample; - private final Optional<TableSnapshot> tableSnapshot; + protected final DistributionSpec distributionSpec; + protected final Set<Expression> conjuncts; + protected final SelectedPartitions selectedPartitions; + protected final Optional<TableSample> tableSample; + protected final Optional<TableSnapshot> tableSnapshot; /** * Constructor for PhysicalFileScan. @@ -56,7 +56,32 @@ public class PhysicalFileScan extends PhysicalCatalogRelation { LogicalProperties logicalProperties, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { - super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties); + this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, distributionSpec, groupExpression, + logicalProperties, conjuncts, selectedPartitions, tableSample, tableSnapshot); + } + + /** + * Constructor for PhysicalFileScan. + */ + public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, + DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, PhysicalProperties physicalProperties, + Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, + Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { + this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, distributionSpec, groupExpression, + logicalProperties, physicalProperties, statistics, conjuncts, selectedPartitions, tableSample, + tableSnapshot); + } + + /** + * For hudi file scan to specified PlanTye + */ + protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable table, List<String> qualifier, + DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, Set<Expression> conjuncts, + SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, + Optional<TableSnapshot> tableSnapshot) { + super(id, type, table, qualifier, groupExpression, logicalProperties); this.distributionSpec = distributionSpec; this.conjuncts = conjuncts; this.selectedPartitions = selectedPartitions; @@ -64,15 +89,12 @@ public class PhysicalFileScan extends PhysicalCatalogRelation { this.tableSnapshot = tableSnapshot; } - /** - * Constructor for PhysicalFileScan. - */ - public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, + protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable table, List<String> qualifier, DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { - super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties, + super(id, type, table, qualifier, groupExpression, logicalProperties, physicalProperties, statistics); this.distributionSpec = distributionSpec; this.conjuncts = conjuncts; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java similarity index 56% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java index 8706db65f1e..d5bc299a2ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java @@ -17,8 +17,10 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.hudi.source.IncrementalRelation; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DistributionSpec; import org.apache.doris.nereids.properties.LogicalProperties; @@ -34,113 +36,98 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; /** - * Physical file scan for external catalog. + * Physical Hudi scan for Hudi table. */ -public class PhysicalFileScan extends PhysicalCatalogRelation { +public class PhysicalHudiScan extends PhysicalFileScan { - private final DistributionSpec distributionSpec; - private final Set<Expression> conjuncts; - private final SelectedPartitions selectedPartitions; - private final Optional<TableSample> tableSample; - private final Optional<TableSnapshot> tableSnapshot; + // for hudi incremental read + private final Optional<TableScanParams> scanParams; + private final Optional<IncrementalRelation> incrementalRelation; /** - * Constructor for PhysicalFileScan. + * Constructor for PhysicalHudiScan. */ - public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, + public PhysicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier, DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, Optional<TableSample> tableSample, - Optional<TableSnapshot> tableSnapshot) { - super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties); - this.distributionSpec = distributionSpec; - this.conjuncts = conjuncts; - this.selectedPartitions = selectedPartitions; - this.tableSample = tableSample; - this.tableSnapshot = tableSnapshot; + Optional<TableSnapshot> tableSnapshot, + Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) { + super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier, distributionSpec, groupExpression, logicalProperties, + conjuncts, selectedPartitions, tableSample, tableSnapshot); + Objects.requireNonNull(scanParams, "scanParams should not null"); + Objects.requireNonNull(incrementalRelation, "incrementalRelation should not null"); + this.scanParams = scanParams; + this.incrementalRelation = incrementalRelation; } /** - * Constructor for PhysicalFileScan. + * Constructor for PhysicalHudiScan. */ - public PhysicalFileScan(RelationId id, ExternalTable table, List<String> qualifier, + public PhysicalHudiScan(RelationId id, ExternalTable table, List<String> qualifier, DistributionSpec distributionSpec, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, Set<Expression> conjuncts, SelectedPartitions selectedPartitions, - Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) { - super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier, groupExpression, logicalProperties, - physicalProperties, statistics); - this.distributionSpec = distributionSpec; - this.conjuncts = conjuncts; - this.selectedPartitions = selectedPartitions; - this.tableSample = tableSample; - this.tableSnapshot = tableSnapshot; + Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot, + Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation) { + super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier, distributionSpec, groupExpression, logicalProperties, + physicalProperties, statistics, conjuncts, selectedPartitions, tableSample, tableSnapshot); + this.scanParams = scanParams; + this.incrementalRelation = incrementalRelation; } - public DistributionSpec getDistributionSpec() { - return distributionSpec; + public Optional<TableScanParams> getScanParams() { + return scanParams; } - public Set<Expression> getConjuncts() { - return conjuncts; + public Optional<IncrementalRelation> getIncrementalRelation() { + return incrementalRelation; } - public SelectedPartitions getSelectedPartitions() { - return selectedPartitions; + @Override + public PhysicalHudiScan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec, + groupExpression, getLogicalProperties(), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); } - public Optional<TableSample> getTableSample() { - return tableSample; + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec, + groupExpression, logicalProperties.get(), conjuncts, selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); } - public Optional<TableSnapshot> getTableSnapshot() { - return tableSnapshot; + @Override + public PhysicalHudiScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, + Statistics statistics) { + return new PhysicalHudiScan(relationId, getTable(), qualifier, distributionSpec, + groupExpression, getLogicalProperties(), physicalProperties, statistics, conjuncts, + selectedPartitions, tableSample, tableSnapshot, + scanParams, incrementalRelation); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalHudiScan(this, context); } @Override public String toString() { - return Utils.toSqlString("PhysicalFileScan", + return Utils.toSqlString("PhysicalHudiScan", "qualified", Utils.qualifiedName(qualifier, table.getName()), "output", getOutput(), "stats", statistics, "conjuncts", conjuncts, "selected partitions num", - selectedPartitions.isPruned ? selectedPartitions.selectedPartitions.size() : "unknown" + selectedPartitions.isPruned ? selectedPartitions.selectedPartitions.size() : "unknown", + "isIncremental", incrementalRelation.isPresent() ); } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitPhysicalFileScan(this, context); - } - - @Override - public PhysicalFileScan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec, - groupExpression, getLogicalProperties(), conjuncts, selectedPartitions, tableSample, tableSnapshot); - } - - @Override - public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, - Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec, - groupExpression, logicalProperties.get(), conjuncts, selectedPartitions, tableSample, tableSnapshot); - } - - @Override - public ExternalTable getTable() { - return (ExternalTable) table; - } - - @Override - public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, - Statistics statistics) { - return new PhysicalFileScan(relationId, getTable(), qualifier, distributionSpec, - groupExpression, getLogicalProperties(), physicalProperties, statistics, conjuncts, - selectedPartitions, tableSample, tableSnapshot); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java index 046964c351d..0871e3dca37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan; import org.apache.doris.nereids.trees.plans.logical.LogicalExternalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan; import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; @@ -40,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOla import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; @@ -105,6 +107,10 @@ public interface RelationVisitor<R, C> { return visitLogicalExternalRelation(fileScan, context); } + default R visitLogicalHudiScan(LogicalHudiScan fileScan, C context) { + return visitLogicalFileScan(fileScan, context); + } + default R visitLogicalJdbcScan(LogicalJdbcScan jdbcScan, C context) { return visitLogicalExternalRelation(jdbcScan, context); } @@ -154,6 +160,10 @@ public interface RelationVisitor<R, C> { return visitPhysicalCatalogRelation(fileScan, context); } + default R visitPhysicalHudiScan(PhysicalHudiScan hudiScan, C context) { + return visitPhysicalFileScan(hudiScan, context); + } + default R visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, C context) { return visitPhysicalCatalogRelation(jdbcScan, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index a625e4490e1..b145338ff81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import java.util.List; @@ -42,13 +43,24 @@ public class RelationUtil { case 1: { // table // Use current database name from catalog. String tableName = nameParts.get(0); - String catalogName = context.getCurrentCatalog().getName(); + CatalogIf catalogIf = context.getCurrentCatalog(); + if (catalogIf == null) { + throw new IllegalStateException("Current catalog is not set."); + } + String catalogName = catalogIf.getName(); String dbName = context.getDatabase(); + if (Strings.isNullOrEmpty(dbName)) { + throw new IllegalStateException("Current database is not set."); + } return ImmutableList.of(catalogName, dbName, tableName); } case 2: { // db.table // Use database name from table name parts. - String catalogName = context.getCurrentCatalog().getName(); + CatalogIf catalogIf = context.getCurrentCatalog(); + if (catalogIf == null) { + throw new IllegalStateException("Current catalog is not set."); + } + String catalogName = catalogIf.getName(); // if the relation is view, nameParts.get(0) is dbName. String dbName = nameParts.get(0); String tableName = nameParts.get(1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index d60ab89c9c7..b91d5378cb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -98,6 +98,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -1964,7 +1965,14 @@ public class SingleNodePlanner { TableIf table = tblRef.getDesc().getTable(); switch (((HMSExternalTable) table).getDlaType()) { case HUDI: - scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + // Old planner does not support hudi incremental read, + // so just pass Optional.empty() to HudiScanNode + if (tblRef.getScanParams() != null) { + throw new UserException("Hudi incremental read is not supported, " + + "please set enable_nereids_planner = true to enable new optimizer"); + } + scanNode = new HudiScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true, + Optional.empty(), Optional.empty()); break; case ICEBERG: scanNode = new IcebergScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java index 8dd8421ebc2..b968d27b5b8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java @@ -33,6 +33,7 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; import org.apache.doris.qe.SessionVariable; @@ -133,6 +134,10 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase { tbl.getDatabase(); minTimes = 0; result = db; + + tbl.getDlaType(); + minTimes = 0; + result = DLAType.HIVE; } }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org