This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 4414edd66d6 [enhance](mtmv)Mv refresh on commit (#35702) 4414edd66d6 is described below commit 4414edd66d6b0d1ed5b4d3de3606a91164007011 Author: zhangdong <493738...@qq.com> AuthorDate: Fri May 31 13:57:57 2024 +0800 [enhance](mtmv)Mv refresh on commit (#35702) pick from master #34548 The modification involving CloudGlobalTransactionMgr was not picked up to 2.1 because the 2.1 branch does not yet have the Thunderbolt CloudGlobalTransactionMgr --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 1 + .../main/java/org/apache/doris/catalog/Env.java | 20 ++++ .../apache/doris/datasource/InternalCatalog.java | 14 ++- .../DataChangeEvent.java} | 50 +------- .../DropPartitionEvent.java} | 50 +------- .../main/java/org/apache/doris/event/Event.java | 60 ++++++++++ .../EventException.java} | 48 ++------ .../EventListener.java} | 49 +------- .../org/apache/doris/event/EventProcessor.java | 57 +++++++++ .../MTMVRefreshEnum.java => event/EventType.java} | 52 +-------- .../ReplacePartitionEvent.java} | 50 +------- .../MTMVRefreshEnum.java => event/TableEvent.java} | 60 ++++------ .../apache/doris/job/extensions/mtmv/MTMVJob.java | 29 ++++- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 1 + .../java/org/apache/doris/mtmv/BaseTableInfo.java | 20 ++-- .../java/org/apache/doris/mtmv/MTMVJobManager.java | 19 ++- .../org/apache/doris/mtmv/MTMVRefreshEnum.java | 1 + .../java/org/apache/doris/mtmv/MTMVService.java | 31 ++++- .../doris/nereids/parser/LogicalPlanBuilder.java | 3 + .../doris/transaction/DatabaseTransactionMgr.java | 32 +++++ regression-test/data/mtmv_p0/test_commit_mtmv.out | 40 +++++++ .../suites/mtmv_p0/test_commit_mtmv.groovy | 130 +++++++++++++++++++++ 22 files changed, 490 insertions(+), 327 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index c78f11b0007..a7b4c4788cf 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -215,6 +215,7 @@ buildMode refreshTrigger : ON MANUAL | ON SCHEDULE refreshSchedule + | ON COMMIT ; refreshSchedule diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 81a8b682c73..93edb9dce5f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -141,6 +141,8 @@ import org.apache.doris.deploy.DeployManager; import org.apache.doris.deploy.impl.AmbariDeployManager; import org.apache.doris.deploy.impl.K8sDeployManager; import org.apache.doris.deploy.impl.LocalFileDeployManager; +import org.apache.doris.event.EventProcessor; +import org.apache.doris.event.ReplacePartitionEvent; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; @@ -529,6 +531,7 @@ public class Env { private TopicPublisherThread topicPublisherThread; private MTMVService mtmvService; + private EventProcessor eventProcessor; private InsertOverwriteManager insertOverwriteManager; @@ -772,6 +775,7 @@ public class Env { this.topicPublisherThread = new TopicPublisherThread( "TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo); this.mtmvService = new MTMVService(); + this.eventProcessor = new EventProcessor(mtmvService); this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); @@ -839,6 +843,10 @@ public class Env { return mtmvService; } + public EventProcessor getEventProcessor() { + return eventProcessor; + } + public InsertOverwriteManager getInsertOverwriteManager() { return insertOverwriteManager; } @@ -5547,6 +5555,18 @@ public class Env { long version = olapTable.getNextVersion(); long versionTime = System.currentTimeMillis(); olapTable.updateVisibleVersionAndTime(version, versionTime); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of replace the partition + try { + Env.getCurrentEnv().getEventProcessor().processEvent( + new ReplacePartitionEvent(db.getCatalog().getId(), db.getId(), + olapTable.getId())); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } // write log ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 7a107321f4d..3bb5cb0bcb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -139,6 +139,7 @@ import org.apache.doris.datasource.es.EsRepository; import org.apache.doris.datasource.hive.HMSCachedClient; import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.event.DropPartitionEvent; import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterDatabasePropertyInfo; @@ -1809,11 +1810,22 @@ public class InternalCatalog implements CatalogIf<Database> { long version = olapTable.getNextVersion(); long versionTime = System.currentTimeMillis(); olapTable.updateVisibleVersionAndTime(version, versionTime); + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of deleting the partition + try { + Env.getCurrentEnv().getEventProcessor().processEvent( + new DropPartitionEvent(db.getCatalog().getId(), db.getId(), + olapTable.getId())); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } // log DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition, clause.isForceDrop(), recycleTime, version, versionTime); Env.getCurrentEnv().getEditLog().logDropPartition(info); - LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}", partitionName, olapTable.getId(), olapTable.getName(), isTempPartition, clause.isForceDrop()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java index 0f4f904c573..d58e62bfdde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DataChangeEvent.java @@ -15,52 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { - - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS +public class DataChangeEvent extends TableEvent { + public DataChangeEvent(long ctlId, long dbId, long tableId) { + super(EventType.DATA_CHANGE, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java index 0f4f904c573..67339ebd05a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/DropPartitionEvent.java @@ -15,52 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { - - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS +public class DropPartitionEvent extends TableEvent { + public DropPartitionEvent(long ctlId, long dbId, long tableId) { + super(EventType.DROP_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/Event.java b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java new file mode 100644 index 00000000000..e049a1aeb8c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/Event.java @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +import org.apache.doris.catalog.Env; + +import java.util.Objects; + +public abstract class Event { + protected final long eventId; + + // eventTime of the event. Used instead of calling getter on event everytime + protected final long eventTime; + + // eventType from the NotificationEvent + protected final EventType eventType; + + protected Event(EventType eventType) { + Objects.requireNonNull(eventType, "require eventType"); + this.eventId = Env.getCurrentEnv().getNextId(); + this.eventTime = System.currentTimeMillis(); + this.eventType = eventType; + } + + public long getEventId() { + return eventId; + } + + public long getEventTime() { + return eventTime; + } + + public EventType getEventType() { + return eventType; + } + + @Override + public String toString() { + return "Event{" + + "eventId=" + eventId + + ", eventTime=" + eventTime + + ", eventType=" + eventType + + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java similarity index 52% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/EventException.java index 0f4f904c573..425ca03d65f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventException.java @@ -15,52 +15,20 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; -/** - * refresh enum - */ -public class MTMVRefreshEnum { +package org.apache.doris.event; - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } +public class EventException extends Exception { - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule + public EventException(String msg, Throwable cause) { + super(msg, cause); } - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE + public EventException(String msg) { + super(msg); } - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS + public EventException(Exception e) { + super(e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java index 0f4f904c573..d5c142bf934 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventListener.java @@ -15,52 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { +public interface EventListener { - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS - } + void processEvent(Event event) throws EventException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java new file mode 100644 index 00000000000..4731a17a372 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventProcessor.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.event; + +import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; +import java.util.Set; + +public class EventProcessor { + + private static final Logger LOG = LogManager.getLogger(EventProcessor.class); + + private Set<EventListener> listeners = Sets.newHashSet(); + + public EventProcessor(EventListener... args) { + for (EventListener listener : args) { + this.listeners.add(listener); + } + } + + public boolean processEvent(Event event) { + Objects.requireNonNull(event); + if (LOG.isDebugEnabled()) { + LOG.debug("processEvent: {}", event); + } + boolean result = true; + for (EventListener listener : listeners) { + try { + listener.processEvent(event); + } catch (EventException e) { + // A listener processing failure does not affect other listeners + LOG.warn("[{}] process event failed, event: {}, errMsg: {}", listener.getClass().getName(), event, + e.getMessage()); + result = false; + } + } + return result; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/EventType.java index 0f4f904c573..be942141fd3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/EventType.java @@ -15,52 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { - - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS - } +public enum EventType { + DATA_CHANGE, + REPLACE_PARTITION, + DROP_PARTITION } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java index 0f4f904c573..371d5cd553c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/ReplacePartitionEvent.java @@ -15,52 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { - - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full - } - - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred - } - - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule - } - - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE - } - - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS +public class ReplacePartitionEvent extends TableEvent { + public ReplacePartitionEvent(long ctlId, long dbId, long tableId) { + super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java similarity index 52% copy from fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java copy to fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java index 0f4f904c573..210ad2df40f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/event/TableEvent.java @@ -15,52 +15,38 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.mtmv; +package org.apache.doris.event; -/** - * refresh enum - */ -public class MTMVRefreshEnum { +public abstract class TableEvent extends Event { + protected final long ctlId; + protected final long dbId; + protected final long tableId; - /** - * RefreshMethod - */ - public enum RefreshMethod { - COMPLETE, //complete - AUTO //try to update incrementally, if not possible, update in full + public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) { + super(eventType); + this.ctlId = ctlId; + this.dbId = dbId; + this.tableId = tableId; } - /** - * BuildMode - */ - public enum BuildMode { - IMMEDIATE, //right now - DEFERRED // deferred + public long getCtlId() { + return ctlId; } - /** - * RefreshTrigger - */ - public enum RefreshTrigger { - MANUAL, //manual - SCHEDULE // schedule + public long getDbId() { + return dbId; } - /** - * MTMVState - */ - public enum MTMVState { - INIT, - NORMAL, - SCHEMA_CHANGE + public long getTableId() { + return tableId; } - /** - * MTMVRefreshState - */ - public enum MTMVRefreshState { - INIT, - FAIL, - SUCCESS + @Override + public String toString() { + return "TableEvent{" + + "ctlId=" + ctlId + + ", dbId=" + dbId + + ", tableId=" + tableId + + "} " + super.toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java index 4f44b2e14b9..5d7cf4435b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVJob.java @@ -140,27 +140,44 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> { /** * if user trigger, return true - * if system trigger, Check if there are any system triggered tasks, and if so, return false + * else, only can have 2 task. because every task can refresh all data. * * @param taskContext * @return */ @Override public boolean isReadyForScheduling(MTMVTaskContext taskContext) { - if (taskContext != null) { + if (isManual(taskContext)) { return true; } List<MTMVTask> runningTasks = getRunningTasks(); + int runningNum = 0; for (MTMVTask task : runningTasks) { - if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) { - LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}", - task); - return false; + if (!isManual(task.getTaskContext())) { + runningNum++; + // Prerequisite: Each refresh will calculate which partitions to refresh + // + // For example, there is currently a running task that is refreshing partition p1. + // If the data of p2 changes at this time and triggers a refresh task t2, + // according to the logic (>=1), t2 will be lost + // + // If the logic is >=2, t2 will wait lock of MTMVJob. + // If the p3 data changes again and triggers the refresh task t3, + // then t3 will be discarded. However, when t2 runs, both p2 and p3 data will be refreshed. + if (runningNum >= 2) { + LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}", + task); + return false; + } } } return true; } + private boolean isManual(MTMVTaskContext taskContext) { + return taskContext != null && taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL; + } + @Override public ShowResultSetMetaData getJobMetaData() { return JOB_META_DATA; diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 240c7de6a71..517909f5e1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -107,6 +107,7 @@ public class MTMVTask extends AbstractTask { public enum MTMVTaskTriggerMode { MANUAL, + COMMIT, SYSTEM } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java index 9b3b6be04f1..bc9a3fdd205 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/BaseTableInfo.java @@ -32,18 +32,24 @@ public class BaseTableInfo { private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class); @SerializedName("ti") - private Long tableId; + private long tableId; @SerializedName("di") - private Long dbId; + private long dbId; @SerializedName("ci") - private Long ctlId; + private long ctlId; - public BaseTableInfo(Long tableId, Long dbId) { + public BaseTableInfo(long tableId, long dbId) { this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID; } + public BaseTableInfo(long tableId, long dbId, long ctlId) { + this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null"); + this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null"); + this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null"); + } + public BaseTableInfo(TableIf table) { DatabaseIf database = table.getDatabase(); java.util.Objects.requireNonNull(database, "database is null"); @@ -54,15 +60,15 @@ public class BaseTableInfo { this.ctlId = catalog.getId(); } - public Long getTableId() { + public long getTableId() { return tableId; } - public Long getDbId() { + public long getDbId() { return dbId; } - public Long getCtlId() { + public long getCtlId() { return ctlId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java index f53a7b60868..bed44e8d37d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java @@ -45,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; import org.apache.doris.persist.AlterMTMV; import org.apache.doris.qe.ConnectContext; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -79,11 +80,10 @@ public class MTMVJobManager implements MTMVHookService { private JobExecutionConfiguration getJobConfig(MTMV mtmv) { JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration(); - if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger() - .equals(RefreshTrigger.SCHEDULE)) { + RefreshTrigger refreshTrigger = mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger(); + if (refreshTrigger.equals(RefreshTrigger.SCHEDULE)) { setScheduleJobConfig(jobExecutionConfiguration, mtmv); - } else if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger() - .equals(RefreshTrigger.MANUAL)) { + } else if (refreshTrigger.equals(RefreshTrigger.MANUAL) || refreshTrigger.equals(RefreshTrigger.COMMIT)) { setManualJobConfig(jobExecutionConfiguration, mtmv); } return jobExecutionConfiguration; @@ -210,9 +210,20 @@ public class MTMVJobManager implements MTMVHookService { job.cancelTaskById(info.getTaskId()); } + public void onCommit(MTMV mtmv) throws DdlException, JobException { + MTMVJob job = getJobByMTMV(mtmv); + MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(), + false); + Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext); + } + private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException { Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb()); MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW); + return getJobByMTMV(mtmv); + } + + private MTMVJob getJobByMTMV(MTMV mtmv) throws DdlException { List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager() .queryJobs(JobType.MV, mtmv.getJobInfo().getJobName()); if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java index 0f4f904c573..b9d27db9c22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshEnum.java @@ -43,6 +43,7 @@ public class MTMVRefreshEnum { */ public enum RefreshTrigger { MANUAL, //manual + COMMIT, //manual SCHEDULE // schedule } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java index cbbaef6b917..d5d86b7eeda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java @@ -22,8 +22,13 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.event.Event; +import org.apache.doris.event.EventException; +import org.apache.doris.event.EventListener; +import org.apache.doris.event.TableEvent; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.extensions.mtmv.MTMVTask; +import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger; import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo; import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo; @@ -36,8 +41,9 @@ import org.apache.logging.log4j.Logger; import java.util.Map; import java.util.Objects; +import java.util.Set; -public class MTMVService { +public class MTMVService implements EventListener { private static final Logger LOG = LogManager.getLogger(MTMVService.class); private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap(); @@ -162,4 +168,27 @@ public class MTMVService { mtmvHookService.cancelMTMVTask(info); } } + + @Override + public void processEvent(Event event) throws EventException { + Objects.requireNonNull(event); + if (!(event instanceof TableEvent)) { + return; + } + TableEvent tableEvent = (TableEvent) event; + LOG.info("processEvent, Event: {}", event); + Set<BaseTableInfo> mtmvs = relationManager.getMtmvsByBaseTableOneLevel( + new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId())); + for (BaseTableInfo baseTableInfo : mtmvs) { + try { + // check if mtmv should trigger by event + MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId()); + if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) { + jobManager.onCommit(mtmv); + } + } catch (Exception e) { + throw new EventException(e); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a80c0612deb..4faceb3ae94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -700,6 +700,9 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (ctx.MANUAL() != null) { return new MTMVRefreshTriggerInfo(RefreshTrigger.MANUAL); } + if (ctx.COMMIT() != null) { + return new MTMVRefreshTriggerInfo(RefreshTrigger.COMMIT); + } if (ctx.SCHEDULE() != null) { return new MTMVRefreshTriggerInfo(RefreshTrigger.SCHEDULE, visitRefreshSchedule(ctx.refreshSchedule())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 6d53148e5af..0462a1ec1ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -49,6 +49,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.event.DataChangeEvent; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; @@ -1052,6 +1053,17 @@ public class DatabaseTransactionMgr { } finally { MetaLockUtils.writeUnlockTables(tableList); } + // Here, we only wait for the EventProcessor to finish processing the event, + // but regardless of the success or failure of the result, + // it does not affect the logic of transaction + try { + produceEvent(transactionState, db); + } catch (Throwable t) { + // According to normal logic, no exceptions will be thrown, + // but in order to avoid bugs affecting the original logic, all exceptions are caught + LOG.warn("produceEvent failed: ", t); + } + // The visible latch should only be counted down after all things are done // (finish transaction, write edit log, etc). // Otherwise, there is no way for stream load to query the result right after loading finished, @@ -1075,6 +1087,26 @@ public class DatabaseTransactionMgr { } } + private void produceEvent(TransactionState transactionState, Database db) { + Collection<TableCommitInfo> tableCommitInfos; + if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) { + tableCommitInfos = transactionState.getSubTxnTableCommitInfos(); + } else { + tableCommitInfos = transactionState.getIdToTableCommitInfos().values(); + } + for (TableCommitInfo tableCommitInfo : tableCommitInfos) { + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table == null) { + LOG.warn("table {} does not exist when produceEvent. transaction: {}, db: {}", + tableId, transactionState.getTransactionId(), db.getId()); + continue; + } + Env.getCurrentEnv().getEventProcessor().processEvent( + new DataChangeEvent(db.getCatalog().getId(), db.getId(), tableId)); + } + } + private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db, List<Pair<OlapTable, Partition>> relatedTblPartitions) { Iterator<TableCommitInfo> tableCommitInfoIterator diff --git a/regression-test/data/mtmv_p0/test_commit_mtmv.out b/regression-test/data/mtmv_p0/test_commit_mtmv.out new file mode 100644 index 00000000000..fafb8f883a4 --- /dev/null +++ b/regression-test/data/mtmv_p0/test_commit_mtmv.out @@ -0,0 +1,40 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !mv1 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !task1 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv2 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !task2 -- +{"triggerMode":"COMMIT","partitions":[],"isComplete":false} + +-- !mv1_2 -- +1 2017-01-15 1 +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv2_2 -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_init -- +1 2017-01-15 1 +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_drop -- +2 2017-02-15 2 +3 2017-03-15 3 + +-- !mv1_replace -- +3 2017-03-15 3 + diff --git a/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy new file mode 100644 index 00000000000..cd02dcd57d7 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_commit_mtmv.groovy @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_commit_mtmv") { + def tableName = "test_commit_mtmv_table" + def mvName1 = "test_commit_mtmv1" + def mvName2 = "test_commit_mtmv2" + def dbName = "regression_test_mtmv_p0" + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + + sql """ + CREATE MATERIALIZED VIEW ${mvName1} + BUILD DEFERRED REFRESH AUTO ON COMMIT + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """ + CREATE MATERIALIZED VIEW ${mvName2} + BUILD DEFERRED REFRESH AUTO ON COMMIT + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${mvName1}; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);; + """ + def jobName1 = getJobName(dbName, mvName1); + waitingMTMVTaskFinished(jobName1) + order_qt_mv1 "SELECT * FROM ${mvName1}" + order_qt_task1 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvName1}' order by CreateTime desc limit 1" + + def jobName2 = getJobName(dbName, mvName2); + waitingMTMVTaskFinished(jobName2) + order_qt_mv2 "SELECT * FROM ${mvName2}" + order_qt_task2 "SELECT TaskContext from tasks('type'='mv') where MvName='${mvName2}' order by CreateTime desc limit 1" + + // on manual can not trigger by commit + sql """ + alter MATERIALIZED VIEW ${mvName2} REFRESH ON MANUAL; + """ + + sql """ + insert into ${tableName} values(1,"2017-01-15",1);; + """ + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_2 "SELECT * FROM ${mvName1}" + waitingMTMVTaskFinished(jobName2) + order_qt_mv2_2 "SELECT * FROM ${mvName2}" + + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + + // test drop partition + sql """ + CREATE TABLE IF NOT EXISTS `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703 VALUES [('2017-03-01'), ('2017-04-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + CREATE MATERIALIZED VIEW ${mvName1} + BUILD DEFERRED REFRESH AUTO ON COMMIT + PARTITION BY (`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(2,"2017-02-15",2),(3,"2017-03-15",3);; + """ + jobName1 = getJobName(dbName, mvName1); + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_init "SELECT * FROM ${mvName1}" + + sql """alter table ${tableName} drop PARTITION p201701""" + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_drop "SELECT * FROM ${mvName1}" + + // test replace partition + sql """ALTER TABLE ${tableName} ADD TEMPORARY PARTITION p201702_t VALUES [('2017-02-01'), ('2017-03-01'));""" + sql """ALTER TABLE ${tableName} REPLACE PARTITION (p201702) WITH TEMPORARY PARTITION (p201702_t);""" + waitingMTMVTaskFinished(jobName1) + order_qt_mv1_replace "SELECT * FROM ${mvName1}" + + sql """drop materialized view if exists ${mvName1};""" + sql """drop materialized view if exists ${mvName2};""" + sql """drop table if exists `${tableName}`""" + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org