seawinde commented on code in PR #26146: URL: https://github.com/apache/doris/pull/26146#discussion_r1403249317
########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCacheManager.java: ########## @@ -0,0 +1,312 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.job.common.TaskStatus; +import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState; +import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.exceptions.ParseException; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; +import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; +import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.visitor.TableCollector; +import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext; +import org.apache.doris.persist.AlterMTMV; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * when do some operation, do something about cache + */ +public class MTMVCacheManager implements MTMVHookService { + private static final Logger LOG = LogManager.getLogger(MTMVCacheManager.class); + private Map<BaseTableInfo, Set<BaseTableInfo>> tableMTMVs = Maps.newConcurrentMap(); + + public Set<BaseTableInfo> getMtmvsByBaseTable(BaseTableInfo table) { + return tableMTMVs.get(table); + } + + public boolean isAvailableMTMV(MTMV mtmv, ConnectContext ctx) throws AnalysisException, DdlException { + // check session variable if enable rewrite + if (!ctx.getSessionVariable().isEnableMvRewrite()) { + return false; + } + MTMVCache mtmvCache = mtmv.getCache(); + if (mtmvCache == null) { + return false; + } + // chaek mv is normal + if (!(mtmv.getStatus().getState() == MTMVState.NORMAL + && mtmv.getStatus().getRefreshState() == MTMVRefreshState.SUCCESS)) { + return false; + } + // check external table + boolean containsExternalTable = containsExternalTable(mtmvCache.getBaseTables()); + if (containsExternalTable) { + return ctx.getSessionVariable().isEnableExternalMvRewrite(); + } + // check gracePeriod + Long gracePeriod = mtmv.getGracePeriod(); + // do not care data is delayed + if (gracePeriod < 0) { + return true; + } + // compare with base table + Long mtmvLastTime = getTableLastVisibleVersionTime(mtmv); + Long maxAvailableTime = mtmvLastTime + gracePeriod; + for (BaseTableInfo baseTableInfo : mtmvCache.getBaseTables()) { + long tableLastVisibleVersionTime = getTableLastVisibleVersionTime(baseTableInfo); + if (tableLastVisibleVersionTime > maxAvailableTime) { + return false; + } + } + return true; + } + + private long getTableLastVisibleVersionTime(BaseTableInfo baseTableInfo) throws AnalysisException, DdlException { + Table table = Env.getCurrentEnv().getInternalCatalog() + .getDbOrAnalysisException(baseTableInfo.getDbId()) + .getTableOrDdlException(baseTableInfo.getTableId(), TableType.OLAP); + return getTableLastVisibleVersionTime((OlapTable) table); + } + + private long getTableLastVisibleVersionTime(OlapTable table) { + long result = 0L; + long visibleVersionTime; + for (Partition partition : table.getAllPartitions()) { + visibleVersionTime = partition.getVisibleVersionTime(); + if (visibleVersionTime > result) { + result = visibleVersionTime; + } + } + return result; + } + + private boolean containsExternalTable(Set<BaseTableInfo> baseTableInfos) { + for (BaseTableInfo baseTableInfo : baseTableInfos) { + if (InternalCatalog.INTERNAL_CATALOG_ID != baseTableInfo.getCtlId()) { + return true; + } + } + return false; + } + + public static MTMVCache generateMTMVCache(MTMV mtmv, ConnectContext ctx) { + Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx); + return new MTMVCache(getBaseTables(plan), getBaseViews(plan)); + } + + private static Set<BaseTableInfo> getBaseTables(Plan plan) { + TableCollectorContext collectorContext = + new TableCollector.TableCollectorContext( + Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP)); + plan.accept(TableCollector.INSTANCE, collectorContext); + List<TableIf> collectedTables = collectorContext.getCollectedTables(); + return transferTableIfToInfo(collectedTables); + } + + private static Set<BaseTableInfo> getBaseViews(Plan plan) { + TableCollectorContext collectorContext = + new TableCollector.TableCollectorContext( + Sets.newHashSet(TableType.VIEW)); + plan.accept(TableCollector.INSTANCE, collectorContext); + List<TableIf> collectedTables = collectorContext.getCollectedTables(); + return transferTableIfToInfo(collectedTables); + } + + private static Set<BaseTableInfo> transferTableIfToInfo(List<TableIf> tables) { + Set<BaseTableInfo> result = Sets.newHashSet(); + for (TableIf table : tables) { + result.add(new BaseTableInfo(table)); + } + return result; + } + + private static Plan getPlanBySql(String querySql, ConnectContext ctx) { Review Comment: This plan should be rewrite plan, not physical plan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org