This is an automated email from the ASF dual-hosted git repository. adonisling pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 81c5732dc7 [feature-wip](MTMV) Support creating materialized view for multiple tables (#11646) 81c5732dc7 is described below commit 81c5732dc7af361227d304153a3153fb07df4b22 Author: Adonis Ling <adonis0...@gmail.com> AuthorDate: Fri Sep 2 14:51:56 2022 +0800 [feature-wip](MTMV) Support creating materialized view for multiple tables (#11646) Support creating materialized view for multiple tables. Examples: mysql> CREATE TABLE t1 (pk INT, v1 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1'); mysql> CREATE TABLE t2 (pk INT, v2 INT SUM) AGGREGATE KEY (pk) DISTRIBUTED BY hash (pk) PROPERTIES ('replication_num' = '1'); mysql> CREATE MATERIALIZED VIEW mv BUILD IMMEDIATE REFRESH COMPLETE KEY (mv_pk) DISTRIBUTED BY HASH (mv_pk) PROPERTIES ('replication_num' = '1') AS SELECT t1.pk as mv_pk FROM t1, t2 WHERE t1.pk = t2.pk; --- fe/fe-core/src/main/cup/sql_parser.cup | 20 ++- .../main/java/org/apache/doris/alter/Alter.java | 10 +- .../doris/alter/MaterializedViewHandler.java | 12 ++ .../CreateMultiTableMaterializedViewStmt.java | 149 ++++++++++++++++--- .../org/apache/doris/analysis/CreateTableStmt.java | 17 ++- .../org/apache/doris/analysis/MVColumnItem.java | 32 +++-- .../org/apache/doris/analysis/MVRefreshInfo.java | 44 ++++-- .../analysis/MVRefreshIntervalTriggerInfo.java | 8 ++ .../doris/analysis/MVRefreshTriggerInfo.java | 24 +++- .../main/java/org/apache/doris/catalog/Env.java | 2 +- .../org/apache/doris/catalog/MaterializedView.java | 77 ++++++++++ .../org/apache/doris/catalog/OlapTableFactory.java | 157 +++++++++++++++++++++ .../main/java/org/apache/doris/catalog/Table.java | 2 + .../java/org/apache/doris/catalog/TableIf.java | 6 +- .../apache/doris/datasource/InternalCatalog.java | 19 ++- .../apache/doris/planner/SingleNodePlanner.java | 1 + .../main/java/org/apache/doris/qe/DdlExecutor.java | 4 +- .../CreateMultiTableMaterializedViewStmtTest.java | 145 +++++++++++++++++++ 18 files changed, 654 insertions(+), 75 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 6ebe1e6645..e4111d53d4 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -446,6 +446,7 @@ nonterminal DistributionDesc opt_distribution; nonterminal Integer opt_distribution_number; nonterminal Long opt_field_length; nonterminal KeysDesc opt_keys; +nonterminal KeysDesc opt_mv_keys; nonterminal PartitionKeyDesc partition_key_desc; nonterminal PartitionKeyDesc list_partition_key_desc; @@ -1337,6 +1338,16 @@ opt_mv_refersh_info ::= :} ; +opt_mv_keys ::= + {: + RESULT = new KeysDesc(KeysType.DUP_KEYS, Lists.newArrayList()); + :} + | KW_KEY LPAREN ident_list:keys RPAREN + {: + RESULT = new KeysDesc(KeysType.DUP_KEYS, keys); + :} + ; + // Create Statement create_stmt ::= /* Database */ @@ -1461,12 +1472,13 @@ create_stmt ::= :} | KW_CREATE KW_MATERIALIZED KW_VIEW ident:mvName build_mv:buildMethod opt_mv_refersh_info:refreshInfo - opt_partition:partition - opt_distribution:distribution - opt_properties:tblProperties + opt_mv_keys:keyDesc + opt_partition:partitionDesc + opt_distribution:distributionDesc + opt_properties:properties KW_AS query_stmt:query {: - RESULT = new CreateMultiTableMaterializedViewStmt(mvName, buildMethod, refreshInfo, partition, distribution, tblProperties, query); + RESULT = new CreateMultiTableMaterializedViewStmt(mvName, buildMethod, refreshInfo, keyDesc, partitionDesc, distributionDesc, properties, query); :} | KW_CREATE KW_INDEX opt_if_not_exists:ifNotExists ident:indexName KW_ON table_name:tableName LPAREN ident_list:cols RPAREN opt_index_type:indexType opt_comment:comment {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ab259b78d7..c602c6dd7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -120,8 +120,14 @@ public class Alter { } public void processCreateMultiTableMaterializedView(CreateMultiTableMaterializedViewStmt stmt) - throws AnalysisException { - throw new AnalysisException("Create multi table materialized view is unsupported : " + stmt.toSql()); + throws UserException { + // check db + Database db = stmt.getDatabase(); + // check cluster capacity + Env.getCurrentSystemInfo().checkClusterCapacity(stmt.getClusterName()); + // check db quota + db.checkQuota(); + ((MaterializedViewHandler) materializedViewHandler).processCreateMultiTablesMaterializedView(stmt); } public void processDropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 11d025feee..7434dca654 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.AlterClause; import org.apache.doris.analysis.CancelAlterTableStmt; import org.apache.doris.analysis.CancelStmt; import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt; import org.apache.doris.analysis.DropMaterializedViewStmt; import org.apache.doris.analysis.DropRollupClause; import org.apache.doris.analysis.MVColumnItem; @@ -48,6 +49,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.IdGeneratorUtil; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.PropertyAnalyzer; @@ -1139,4 +1141,14 @@ public class MaterializedViewHandler extends AlterHandler { return tableRunningJobMap; } + public void processCreateMultiTablesMaterializedView(CreateMultiTableMaterializedViewStmt addMVClause) + throws UserException { + Map<String, OlapTable> olapTables = addMVClause.getOlapTables(); + try { + olapTables.values().forEach(Table::writeLock); + Env.getCurrentEnv().createTable(addMVClause); + } finally { + olapTables.values().forEach(Table::writeUnlock); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmt.java index 5302cdfa90..2f25591985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmt.java @@ -17,35 +17,126 @@ package org.apache.doris.analysis; +import org.apache.doris.analysis.ColumnDef.DefaultValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -public class CreateMultiTableMaterializedViewStmt extends DdlStmt { - private String mvName; - private MVRefreshInfo.BuildMode buildMethod; - private MVRefreshInfo refreshInfo; - private PartitionDesc partition; - private DistributionDesc distribution; - private Map<String, String> tblProperties; - private QueryStmt queryStmt; +public class CreateMultiTableMaterializedViewStmt extends CreateTableStmt { + private final String mvName; + private final MVRefreshInfo.BuildMode buildMethod; + private final MVRefreshInfo refreshInfo; + private final QueryStmt queryStmt; + private Database database; + private final Map<String, OlapTable> olapTables = Maps.newHashMap(); public CreateMultiTableMaterializedViewStmt(String mvName, MVRefreshInfo.BuildMode buildMethod, - MVRefreshInfo refreshInfo, PartitionDesc partition, DistributionDesc distribution, - Map<String, String> tblProperties, QueryStmt queryStmt) { + MVRefreshInfo refreshInfo, KeysDesc keyDesc, PartitionDesc partitionDesc, DistributionDesc distributionDesc, + Map<String, String> properties, QueryStmt queryStmt) { this.mvName = mvName; this.buildMethod = buildMethod; this.refreshInfo = refreshInfo; - this.partition = partition; - this.distribution = distribution; - this.tblProperties = tblProperties; this.queryStmt = queryStmt; + + this.keysDesc = keyDesc; + this.partitionDesc = partitionDesc; + this.distributionDesc = distributionDesc; + this.properties = properties; } @Override public void analyze(Analyzer analyzer) throws UserException { refreshInfo.analyze(analyzer); + queryStmt.analyze(analyzer); + if (queryStmt instanceof SelectStmt) { + analyzeSelectClause((SelectStmt) queryStmt); + } + tableName = new TableName(null, database.getFullName(), mvName); + super.analyze(analyzer); + } + + private void analyzeSelectClause(SelectStmt selectStmt) throws AnalysisException, DdlException { + for (TableRef tableRef : selectStmt.getTableRefs()) { + String dbName = tableRef.getName().getDb(); + if (database == null) { + database = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbName); + } else if (!dbName.equals(database.getFullName())) { + throw new AnalysisException("The databases of multiple tables must be the same."); + } + OlapTable table = (OlapTable) database.getTableOrAnalysisException(tableRef.getName().getTbl()); + olapTables.put(table.getName(), table); + } + columnDefs = generateColumnDefinitions(selectStmt.getSelectList()); + } + + private List<ColumnDef> generateColumnDefinitions(SelectList selectList) throws AnalysisException, DdlException { + List<MVColumnItem> mvColumnItems = generateMVColumnItems(olapTables, selectList); + List<Column> schema = generateSchema(mvColumnItems); + return schema.stream() + .map(column -> new ColumnDef( + column.getName(), + new TypeDef(column.getType()), + column.isKey(), + column.getAggregationType(), + column.isAllowNull(), + new DefaultValue(column.getDefaultValue() != null, column.getDefaultValue()), + column.getComment()) + ).collect(Collectors.toList()); + } + + private List<Column> generateSchema(List<MVColumnItem> mvColumnItems) throws DdlException { + List<Column> columns = Lists.newArrayList(); + for (MVColumnItem mvColumnItem : mvColumnItems) { + OlapTable olapTable = olapTables.get(mvColumnItem.getBaseTableName()); + columns.add(mvColumnItem.toMVColumn(olapTable)); + } + return columns; + } + + private List<MVColumnItem> generateMVColumnItems(Map<String, OlapTable> olapTables, SelectList selectList) + throws AnalysisException { + Map<String, MVColumnItem> uniqueMVColumnItems = Maps.newLinkedHashMap(); + for (SelectListItem item : selectList.getItems()) { + MVColumnItem mvColumnItem = generateMVColumnItem(item); + if (uniqueMVColumnItems.put(mvColumnItem.getName(), mvColumnItem) != null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, mvColumnItem.getName()); + } + } + return Lists.newArrayList(uniqueMVColumnItems.values().iterator()); + } + + private MVColumnItem generateMVColumnItem(SelectListItem item) { + Expr itemExpr = item.getExpr(); + MVColumnItem mvColumnItem = null; + if (itemExpr instanceof SlotRef) { + SlotRef slotRef = (SlotRef) itemExpr; + String alias = item.getAlias(); + String name = (alias != null) ? alias.toLowerCase() : slotRef.getColumnName().toLowerCase(); + mvColumnItem = new MVColumnItem( + name, + slotRef.getType(), + slotRef.getColumn().getAggregationType(), + slotRef.getColumn().isAggregationTypeImplicit(), + null, + slotRef.getColumnName(), + slotRef.getTableName().getTbl() + ); + } + return mvColumnItem; } @Override @@ -55,18 +146,38 @@ public class CreateMultiTableMaterializedViewStmt extends DdlStmt { if (refreshInfo != null) { sb.append(" ").append(refreshInfo.toString()); } - if (partition != null) { - sb.append(" ").append(partition.toString()); + if (partitionDesc != null) { + sb.append(" ").append(partitionDesc.toString()); } - if (distribution != null) { - sb.append(" ").append(distribution.toString()); + if (distributionDesc != null) { + sb.append(" ").append(distributionDesc.toString()); } - if (tblProperties != null && !tblProperties.isEmpty()) { + if (properties != null && !properties.isEmpty()) { sb.append("\nPROPERTIES ("); - sb.append(new PrintableMap<>(tblProperties, " = ", true, true, true)); + sb.append(new PrintableMap<>(properties, " = ", true, true, true)); sb.append(")"); } sb.append(" AS ").append(queryStmt.toSql()); return sb.toString(); } + + public String getMVName() { + return mvName; + } + + public Database getDatabase() { + return database; + } + + public Map<String, OlapTable> getOlapTables() { + return olapTables; + } + + public MVRefreshInfo getRefreshInfo() { + return refreshInfo; + } + + public QueryStmt getQueryStmt() { + return queryStmt; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 9e8d6f5c6a..5345110ba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -47,7 +47,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -63,17 +62,17 @@ public class CreateTableStmt extends DdlStmt { private boolean ifNotExists; private boolean isExternal; - private TableName tableName; - private List<ColumnDef> columnDefs; + protected TableName tableName; + protected List<ColumnDef> columnDefs; private List<IndexDef> indexDefs; - private KeysDesc keysDesc; - private PartitionDesc partitionDesc; - private DistributionDesc distributionDesc; - private Map<String, String> properties; + protected KeysDesc keysDesc; + protected PartitionDesc partitionDesc; + protected DistributionDesc distributionDesc; + protected Map<String, String> properties; private Map<String, String> extProperties; private String engineName; private String comment; - private List<AlterClause> rollupAlterClauseList; + private List<AlterClause> rollupAlterClauseList = Lists.newArrayList(); private static Set<String> engineNames; @@ -164,7 +163,7 @@ public class CreateTableStmt extends DdlStmt { this.ifNotExists = ifNotExists; this.comment = Strings.nullToEmpty(comment); - this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList; + this.rollupAlterClauseList = (rollupAlterClauseList == null) ? Lists.newArrayList() : rollupAlterClauseList; } // This is for iceberg/hudi table, which has no column schema diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java index fabf5b1020..b87a245c03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVColumnItem.java @@ -17,14 +17,13 @@ package org.apache.doris.analysis; +import org.apache.doris.analysis.ColumnDef.DefaultValue; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Type; import org.apache.doris.common.DdlException; -import com.google.common.base.Preconditions; - /** * This is a result of semantic analysis for AddMaterializedViewClause. * It is used to construct real mv column in MaterializedViewHandler. @@ -40,15 +39,22 @@ public class MVColumnItem { private boolean isAggregationTypeImplicit; private Expr defineExpr; private String baseColumnName; + private String baseTableName; public MVColumnItem(String name, Type type, AggregateType aggregateType, boolean isAggregationTypeImplicit, Expr defineExpr, String baseColumnName) { + this(name, type, aggregateType, isAggregationTypeImplicit, defineExpr, baseColumnName, null); + } + + public MVColumnItem(String name, Type type, AggregateType aggregateType, boolean isAggregationTypeImplicit, + Expr defineExpr, String baseColumnName, String baseTableName) { this.name = name; this.type = type; this.aggregationType = aggregateType; this.isAggregationTypeImplicit = isAggregationTypeImplicit; this.defineExpr = defineExpr; this.baseColumnName = baseColumnName; + this.baseTableName = baseTableName; } public MVColumnItem(String name, Type type) { @@ -102,21 +108,29 @@ public class MVColumnItem { return baseColumnName; } + public String getBaseTableName() { + return baseTableName; + } + public Column toMVColumn(OlapTable olapTable) throws DdlException { - Column baseColumn = olapTable.getBaseColumn(name); - if (baseColumn == null) { - Preconditions.checkNotNull(defineExpr != null); - Column result = new Column(name, type, isKey, aggregationType, ColumnDef.DefaultValue.ZERO, ""); + Column result; + if (defineExpr != null) { + result = new Column(name, type, isKey, aggregationType, DefaultValue.ZERO, ""); result.setDefineExpr(defineExpr); - return result; } else { - Column result = new Column(baseColumn); + Column baseColumn = olapTable.getBaseColumn(baseColumnName); + result = new Column(baseColumn); + result.setName(name); result.setIsKey(isKey); // If the mv column type is inconsistent with the base column type, the daily test will core. // So, I comment this line firstly. // result.setType(type); result.setAggregationType(aggregationType, isAggregationTypeImplicit); - return result; } + return result; + } + + public void setBaseTableName(String baseTableName) { + this.baseTableName = baseTableName; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java index fcb60146b5..aeabe1e410 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshInfo.java @@ -19,26 +19,31 @@ package org.apache.doris.analysis; import org.apache.doris.common.UserException; +import com.google.gson.annotations.SerializedName; + public class MVRefreshInfo { - private final boolean neverRefresh; + @SerializedName("neverRefresh") + private boolean neverRefresh; + @SerializedName("refreshMethod") private RefreshMethod refreshMethod; + @SerializedName("triggerInfo") private MVRefreshTriggerInfo triggerInfo; + // For deserialization + public MVRefreshInfo() {} + public MVRefreshInfo(boolean neverRefresh) { - this.neverRefresh = neverRefresh; - if (!neverRefresh) { - refreshMethod = RefreshMethod.COMPLETE; - triggerInfo = null; - } + this(neverRefresh, RefreshMethod.COMPLETE, null); } public MVRefreshInfo(RefreshMethod method, MVRefreshTriggerInfo trigger) { - this.neverRefresh = false; - this.refreshMethod = method; - if (!neverRefresh) { - refreshMethod = RefreshMethod.COMPLETE; - triggerInfo = trigger; - } + this(false, method, trigger); + } + + public MVRefreshInfo(boolean neverRefresh, RefreshMethod method, MVRefreshTriggerInfo trigger) { + this.neverRefresh = neverRefresh; + refreshMethod = method; + triggerInfo = trigger; } void analyze(Analyzer analyzer) throws UserException { @@ -60,8 +65,20 @@ public class MVRefreshInfo { return sb.toString(); } + public boolean isNeverRefresh() { + return neverRefresh; + } + + public RefreshMethod getRefreshMethod() { + return refreshMethod; + } + + public MVRefreshTriggerInfo getTriggerInfo() { + return triggerInfo; + } + enum RefreshMethod { - FAST, COMPLETE, FORCE + COMPLETE, FAST, FORCE } enum RefreshTrigger { @@ -72,4 +89,3 @@ public class MVRefreshInfo { IMMEDIATE, DEFERRED } } - diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshIntervalTriggerInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshIntervalTriggerInfo.java index 954017cb35..07f5d2c064 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshIntervalTriggerInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshIntervalTriggerInfo.java @@ -17,11 +17,19 @@ package org.apache.doris.analysis; +import com.google.gson.annotations.SerializedName; + public class MVRefreshIntervalTriggerInfo { + @SerializedName("startTime") private String startTime; + @SerializedName("interval") private long interval; + @SerializedName("timeUnit") private String timeUnit; + // For deserialization + public MVRefreshIntervalTriggerInfo() {} + public MVRefreshIntervalTriggerInfo(String startTime, long interval, String timeUnit) { this.startTime = startTime; this.interval = interval; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshTriggerInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshTriggerInfo.java index 8467b68459..8ed19d09a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshTriggerInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/MVRefreshTriggerInfo.java @@ -21,18 +21,28 @@ import org.apache.doris.analysis.MVRefreshInfo.RefreshTrigger; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import com.google.gson.annotations.SerializedName; + public class MVRefreshTriggerInfo { + @SerializedName("refreshTrigger") private RefreshTrigger refreshTrigger; + @SerializedName("intervalTrigger") private MVRefreshIntervalTriggerInfo intervalTrigger; + // For deserialization + public MVRefreshTriggerInfo() {} + + public MVRefreshTriggerInfo(RefreshTrigger trigger) { + this(trigger, null); + } + public MVRefreshTriggerInfo(MVRefreshIntervalTriggerInfo trigger) { - this.intervalTrigger = trigger; - this.refreshTrigger = RefreshTrigger.INTERVAL; + this(RefreshTrigger.INTERVAL, trigger); } - public MVRefreshTriggerInfo(RefreshTrigger trigger) { - this.intervalTrigger = null; - this.refreshTrigger = trigger; + public MVRefreshTriggerInfo(RefreshTrigger refreshTrigger, MVRefreshIntervalTriggerInfo intervalTrigger) { + this.refreshTrigger = refreshTrigger; + this.intervalTrigger = intervalTrigger; } void analyze(Analyzer analyzer) throws UserException { @@ -49,6 +59,10 @@ public class MVRefreshTriggerInfo { return refreshTrigger; } + public MVRefreshIntervalTriggerInfo getIntervalTrigger() { + return intervalTrigger; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ebc18ca24b..3f891fe487 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3634,7 +3634,7 @@ public class Env { this.alter.processCreateMaterializedView(stmt); } - public void createMultiTableMaterializedView(CreateMultiTableMaterializedViewStmt stmt) throws AnalysisException { + public void createMultiTableMaterializedView(CreateMultiTableMaterializedViewStmt stmt) throws UserException { this.alter.processCreateMultiTableMaterializedView(stmt); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java new file mode 100644 index 0000000000..fa9481380a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedView.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.MVRefreshInfo; +import org.apache.doris.catalog.OlapTableFactory.MaterializedViewParams; +import org.apache.doris.common.io.Text; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class MaterializedView extends OlapTable { + @SerializedName("refreshInfo") + private MVRefreshInfo refreshInfo; + @SerializedName("query") + private String query; + + // For deserialization + public MaterializedView() { + type = TableType.MATERIALIZED_VIEW; + } + + MaterializedView(MaterializedViewParams params) { + super( + params.tableId, + params.tableName, + params.schema, + params.keysType, + params.partitionInfo, + params.distributionInfo + ); + type = TableType.MATERIALIZED_VIEW; + refreshInfo = params.mvRefreshInfo; + query = params.queryStmt.toSql(); + } + + public MVRefreshInfo getRefreshInfo() { + return refreshInfo; + } + + public String getQuery() { + return query; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + MaterializedView materializedView = GsonUtils.GSON.fromJson(Text.readString(in), this.getClass()); + refreshInfo = materializedView.refreshInfo; + query = materializedView.query; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java new file mode 100644 index 0000000000..dfb044f54e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import org.apache.doris.analysis.CreateMultiTableMaterializedViewStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DdlStmt; +import org.apache.doris.analysis.MVRefreshInfo; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.catalog.TableIf.TableType; + +import com.google.common.base.Preconditions; + +import java.util.List; + +public class OlapTableFactory { + + public static class BuildParams { + public long tableId; + public String tableName; + public List<Column> schema; + public KeysType keysType; + public PartitionInfo partitionInfo; + public DistributionInfo distributionInfo; + } + + public static class OlapTableParams extends BuildParams { + public TableIndexes indexes; + } + + public static class MaterializedViewParams extends BuildParams { + public MVRefreshInfo mvRefreshInfo; + public QueryStmt queryStmt; + } + + private BuildParams params; + + public static TableType getTableType(DdlStmt stmt) { + if (stmt instanceof CreateMultiTableMaterializedViewStmt) { + return TableType.MATERIALIZED_VIEW; + } else if (stmt instanceof CreateTableStmt) { + return TableType.OLAP; + } else { + throw new IllegalArgumentException("Invalid DDL statement: " + stmt.toSql()); + } + } + + public OlapTableFactory init(TableType type) { + params = (type == TableType.OLAP) ? new OlapTableParams() : new MaterializedViewParams(); + return this; + } + + public Table build() { + Preconditions.checkNotNull(params, "The factory isn't initialized."); + + if (params instanceof OlapTableParams) { + OlapTableParams olapTableParams = (OlapTableParams) params; + return new OlapTable( + olapTableParams.tableId, + olapTableParams.tableName, + olapTableParams.schema, + olapTableParams.keysType, + olapTableParams.partitionInfo, + olapTableParams.distributionInfo, + olapTableParams.indexes + ); + } else { + MaterializedViewParams materializedViewParams = (MaterializedViewParams) params; + return new MaterializedView(materializedViewParams); + } + } + + public BuildParams getBuildParams() { + return params; + } + + public OlapTableFactory withTableId(long tableId) { + params.tableId = tableId; + return this; + } + + public OlapTableFactory withTableName(String tableName) { + params.tableName = tableName; + return this; + } + + public OlapTableFactory withSchema(List<Column> schema) { + params.schema = schema; + return this; + } + + public OlapTableFactory withKeysType(KeysType keysType) { + params.keysType = keysType; + return this; + } + + public OlapTableFactory withPartitionInfo(PartitionInfo partitionInfo) { + params.partitionInfo = partitionInfo; + return this; + } + + public OlapTableFactory withDistributionInfo(DistributionInfo distributionInfo) { + params.distributionInfo = distributionInfo; + return this; + } + + public OlapTableFactory withIndexes(TableIndexes indexes) { + Preconditions.checkState(params instanceof OlapTableParams, "Invalid argument for " + + params.getClass().getSimpleName()); + OlapTableParams olapTableParams = (OlapTableParams) params; + olapTableParams.indexes = indexes; + return this; + } + + public OlapTableFactory withQueryStmt(QueryStmt queryStmt) { + Preconditions.checkState(params instanceof MaterializedViewParams, "Invalid argument for " + + params.getClass().getSimpleName()); + MaterializedViewParams materializedViewParams = (MaterializedViewParams) params; + materializedViewParams.queryStmt = queryStmt; + return this; + } + + public OlapTableFactory withRefreshInfo(MVRefreshInfo mvRefreshInfo) { + Preconditions.checkState(params instanceof MaterializedViewParams, "Invalid argument for " + + params.getClass().getSimpleName()); + MaterializedViewParams materializedViewParams = (MaterializedViewParams) params; + materializedViewParams.mvRefreshInfo = mvRefreshInfo; + return this; + } + + public OlapTableFactory withExtraParams(DdlStmt stmt) { + boolean isMaterializedView = stmt instanceof CreateMultiTableMaterializedViewStmt; + if (!isMaterializedView) { + CreateTableStmt createOlapTableStmt = (CreateTableStmt) stmt; + return withIndexes(new TableIndexes(createOlapTableStmt.getIndexes())); + } else { + CreateMultiTableMaterializedViewStmt createMVStmt = (CreateMultiTableMaterializedViewStmt) stmt; + return withRefreshInfo(createMVStmt.getRefreshInfo()) + .withQueryStmt(createMVStmt.getQueryStmt()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 71298db5c4..5602fa9389 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -315,6 +315,8 @@ public abstract class Table extends MetaObject implements Writable, TableIf { TableType type = TableType.valueOf(Text.readString(in)); if (type == TableType.OLAP) { table = new OlapTable(); + } else if (type == TableType.MATERIALIZED_VIEW) { + table = new MaterializedView(); } else if (type == TableType.ODBC) { table = new OdbcTable(); } else if (type == TableType.MYSQL) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 9b7b1bb32b..ab8955ebe1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -93,14 +93,14 @@ public interface TableIf { String getComment(boolean escapeQuota); - public TTableDescriptor toThrift(); + TTableDescriptor toThrift(); /** * Doris table type. */ - public enum TableType { + enum TableType { MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, HUDI, - TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE; + TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW; public String toEngineName() { switch (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 3c5c460cc9..456ee1693e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -91,6 +91,7 @@ import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.OlapTable.OlapTableState; +import org.apache.doris.catalog.OlapTableFactory; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionItem; @@ -104,7 +105,6 @@ import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; -import org.apache.doris.catalog.TableIndexes; import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; @@ -1726,14 +1726,19 @@ public class InternalCatalog implements CatalogIf<Database> { short shortKeyColumnCount = Env.calcShortKeyColumnCount(baseSchema, stmt.getProperties()); LOG.debug("create table[{}] short key column count: {}", tableName, shortKeyColumnCount); - // indexes - TableIndexes indexes = new TableIndexes(stmt.getIndexes()); - // create table long tableId = idGeneratorBuffer.getNextId(); - OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo, - defaultDistributionInfo, indexes); - + TableType tableType = OlapTableFactory.getTableType(stmt); + OlapTable olapTable = (OlapTable) new OlapTableFactory() + .init(tableType) + .withTableId(tableId) + .withTableName(tableName) + .withSchema(baseSchema) + .withKeysType(keysType) + .withPartitionInfo(partitionInfo) + .withDistributionInfo(defaultDistributionInfo) + .withExtraParams(stmt) + .build(); olapTable.setComment(stmt.getComment()); // set base index id diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 90793bcddb..1b1acce0ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1726,6 +1726,7 @@ public class SingleNodePlanner { switch (tblRef.getTable().getType()) { case OLAP: + case MATERIALIZED_VIEW: OlapScanNode olapNode = new OlapScanNode(ctx.getNextNodeId(), tblRef.getDesc(), "OlapScanNode"); olapNode.setForceOpenPreAgg(tblRef.isForcePreAggOpened()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index 64c85c4d42..2abdb5b8cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -148,6 +148,8 @@ public class DdlExecutor { EncryptKeyHelper.createEncryptKey((CreateEncryptKeyStmt) ddlStmt); } else if (ddlStmt instanceof DropEncryptKeyStmt) { EncryptKeyHelper.dropEncryptKey((DropEncryptKeyStmt) ddlStmt); + } else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) { + env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof CreateTableStmt) { env.createTable((CreateTableStmt) ddlStmt); } else if (ddlStmt instanceof CreateTableLikeStmt) { @@ -158,8 +160,6 @@ public class DdlExecutor { env.dropTable((DropTableStmt) ddlStmt); } else if (ddlStmt instanceof CreateMaterializedViewStmt) { env.createMaterializedView((CreateMaterializedViewStmt) ddlStmt); - } else if (ddlStmt instanceof CreateMultiTableMaterializedViewStmt) { - env.createMultiTableMaterializedView((CreateMultiTableMaterializedViewStmt) ddlStmt); } else if (ddlStmt instanceof AlterTableStmt) { env.alterTable((AlterTableStmt) ddlStmt); } else if (ddlStmt instanceof AlterTableStatsStmt) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmtTest.java new file mode 100644 index 0000000000..1e73514ea0 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateMultiTableMaterializedViewStmtTest.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedView; +import org.apache.doris.catalog.OlapTableFactory; +import org.apache.doris.catalog.SinglePartitionInfo; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.ExceptionChecker; +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.DataInputBuffer; +import org.apache.doris.common.io.DataOutputBuffer; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.common.util.Util; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TStorageType; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class CreateMultiTableMaterializedViewStmtTest extends TestWithFeService { + + @BeforeEach + protected void setUp() throws Exception { + createDatabase("test"); + connectContext.setDatabase("default_cluster:test"); + } + + @AfterEach + public void tearDown() { + Env.getCurrentEnv().clear(); + } + + @Test + public void testSimple() throws Exception { + createTable("create table test.t1 (pk int, v1 int sum) aggregate key (pk) " + + "distributed by hash (pk) buckets 1 properties ('replication_num' = '1');"); + createTable("create table test.t2 (pk int, v2 int sum) aggregate key (pk) " + + "distributed by hash (pk) buckets 1 properties ('replication_num' = '1');"); + StmtExecutor executor = new StmtExecutor(connectContext, "create materialized view mv " + + "build immediate refresh complete key (mv_pk) distributed by hash (mv_pk) " + + "as select test.t1.pk as mv_pk from test.t1, test.t2 where test.t1.pk = test.t2.pk"); + ExceptionChecker.expectThrowsNoException(executor::execute); + } + + @Test + public void testSerialization() throws Exception { + createTable("create table test.t1 (pk int, v1 int sum) aggregate key (pk) " + + "distributed by hash (pk) buckets 1 properties ('replication_num' = '1');"); + createTable("create table test.t2 (pk int, v2 int sum) aggregate key (pk) " + + "distributed by hash (pk) buckets 1 properties ('replication_num' = '1');"); + + String sql = "create materialized view mv build immediate refresh complete " + + "key (mv_pk) distributed by hash (mv_pk) " + + "as select test.t1.pk as mv_pk from test.t1, test.t2 where test.t1.pk = test.t2.pk"; + testSerialization(sql); + + sql = "create materialized view mv1 build immediate refresh complete start with '1:00' next 1 day " + + "key (mv_pk) distributed by hash (mv_pk) " + + "as select test.t1.pk as mv_pk from test.t1, test.t2 where test.t1.pk = test.t2.pk"; + testSerialization(sql); + } + + private void testSerialization(String sql) throws UserException, IOException { + MaterializedView mv = createMaterializedView(sql); + DataOutputBuffer out = new DataOutputBuffer(1024); + mv.write(out); + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), out.getLength()); + MaterializedView other = new MaterializedView(); + other.readFields(in); + + Assertions.assertEquals(TableType.MATERIALIZED_VIEW, mv.getType()); + Assertions.assertEquals(mv.getType(), other.getType()); + Assertions.assertEquals(mv.getName(), other.getName()); + Assertions.assertEquals(mv.getQuery(), other.getQuery()); + + MVRefreshInfo refreshInfo = mv.getRefreshInfo(); + MVRefreshInfo otherRefreshInfo = other.getRefreshInfo(); + Assertions.assertEquals(refreshInfo.isNeverRefresh(), otherRefreshInfo.isNeverRefresh()); + Assertions.assertEquals(refreshInfo.getRefreshMethod(), otherRefreshInfo.getRefreshMethod()); + + Assertions.assertEquals( + refreshInfo.getTriggerInfo().getRefreshTrigger(), + otherRefreshInfo.getTriggerInfo().getRefreshTrigger() + ); + + MVRefreshIntervalTriggerInfo intervalTrigger = refreshInfo.getTriggerInfo().getIntervalTrigger(); + MVRefreshIntervalTriggerInfo otherIntervalTrigger = otherRefreshInfo.getTriggerInfo().getIntervalTrigger(); + if (intervalTrigger == null) { + Assertions.assertNull(otherIntervalTrigger); + } else { + Assertions.assertEquals(intervalTrigger.getStartTime(), otherIntervalTrigger.getStartTime()); + Assertions.assertEquals(intervalTrigger.getInterval(), otherIntervalTrigger.getInterval()); + Assertions.assertEquals(intervalTrigger.getTimeUnit(), otherIntervalTrigger.getTimeUnit()); + } + } + + private MaterializedView createMaterializedView(String sql) throws UserException { + CreateMultiTableMaterializedViewStmt stmt = (CreateMultiTableMaterializedViewStmt) SqlParserUtils + .parseAndAnalyzeStmt(sql, connectContext); + MaterializedView mv = (MaterializedView) new OlapTableFactory() + .init(OlapTableFactory.getTableType(stmt)) + .withTableId(0) + .withTableName(stmt.getMVName()) + .withKeysType(stmt.getKeysDesc().getKeysType()) + .withSchema(stmt.getColumns()) + .withPartitionInfo(new SinglePartitionInfo()) + .withDistributionInfo(stmt.getDistributionDesc().toDistributionInfo(stmt.getColumns())) + .withExtraParams(stmt) + .build(); + mv.setBaseIndexId(1); + mv.setIndexMeta( + 1, + stmt.getMVName(), + stmt.getColumns(), + 0, + Util.generateSchemaHash(), + Env.calcShortKeyColumnCount(stmt.getColumns(), stmt.getProperties()), + TStorageType.COLUMN, + stmt.keysDesc.getKeysType()); + return mv; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org