This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new df2bd045131 [enhance](mtmv) Refuse to execute insert overwrite on the same table (#40558) df2bd045131 is described below commit df2bd045131dfdb9f31dc78233ade28331422493 Author: zhangdong <493738...@qq.com> AuthorDate: Thu Sep 19 15:06:20 2024 +0800 [enhance](mtmv) Refuse to execute insert overwrite on the same table (#40558) Refuse to execute insert overwrite on the same table: - Sometimes it is not possible to cancel a running insert overwrite - When executing insert overwrite in parallel, it causes temporary partitions to affect each other --- .../insertoverwrite/InsertOverwriteManager.java | 58 ++++++++++++++++ .../apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++ .../insert/InsertOverwriteTableCommand.java | 71 ++++++++++++++++++-- .../java/org/apache/doris/qe/StmtExecutor.java | 21 ++++++ .../InsertOverwriteManagerTest.java | 77 ++++++++++++++++++++++ .../org/apache/doris/regression/suite/Suite.groovy | 24 ++++++- .../suites/mtmv_p0/test_alter_job_mtmv.groovy | 67 +++++++++++++++++++ 7 files changed, 315 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index 81524ae0208..a00107c76a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -17,17 +17,21 @@ package org.apache.doris.insertoverwrite; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -40,7 +44,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class InsertOverwriteManager extends MasterDaemon implements Writable { private static final Logger LOG = LogManager.getLogger(InsertOverwriteManager.class); @@ -62,6 +68,11 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { @SerializedName(value = "partitionPairs") private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap(); + // TableId running insert overwrite + // dbId ==> Set<tableId> + private Map<Long, Set<Long>> runningTables = Maps.newHashMap(); + private ReentrantReadWriteLock runningLock = new ReentrantReadWriteLock(true); + public InsertOverwriteManager() { super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 1000); } @@ -270,6 +281,53 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); } + /** + * If the current table id has a running insert overwrite, throw an exception. + * If not, record it in runningTables + * + * @param db Run the db for insert overwrite + * @param table Run the table for insert overwrite + */ + public void recordRunningTableOrException(DatabaseIf db, TableIf table) { + long dbId = db.getId(); + long tableId = table.getId(); + runningLock.writeLock().lock(); + try { + if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { + throw new AnalysisException( + String.format("Not allowed running Insert Overwrite on same table: %s.%s", db.getFullName(), + table.getName())); + } + if (runningTables.containsKey(dbId)) { + runningTables.get(dbId).add(tableId); + } else { + runningTables.put(dbId, Sets.newHashSet(tableId)); + } + } finally { + runningLock.writeLock().unlock(); + } + } + + /** + * Remove from running records + * + * @param dbId Run the dbId for insert overwrite + * @param tableId Run the tableId for insert overwrite + */ + public void dropRunningRecord(long dbId, long tableId) { + runningLock.writeLock().lock(); + try { + if (runningTables.containsKey(dbId) && runningTables.get(dbId).contains(tableId)) { + runningTables.get(dbId).remove(tableId); + if (runningTables.get(dbId).size() == 0) { + runningTables.remove(dbId); + } + } + } finally { + runningLock.writeLock().unlock(); + } + } + /** * replay logs * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 0fe60a94e56..59a421509d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -229,6 +229,10 @@ public class MTMVTask extends AbstractTask { ctx.setQueryId(queryId); ctx.getState().setNereids(true); command.run(ctx, executor); + if (getStatus() == TaskStatus.CANCELED) { + // Throwing an exception to interrupt subsequent partition update tasks + throw new JobException("task is CANCELED"); + } if (ctx.getState().getStateType() != MysqlStateType.OK) { throw new JobException(ctx.getState().getErrorMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index afcb5ee81d2..064fccaf521 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -28,6 +28,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.InternalDatabaseUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.insertoverwrite.InsertOverwriteManager; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -60,11 +61,14 @@ import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.awaitility.Awaitility; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * insert into select command implementation @@ -81,6 +85,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS private LogicalPlan logicalQuery; private Optional<String> labelName; private final Optional<LogicalPlan> cte; + private AtomicBoolean isCancelled = new AtomicBoolean(false); + private AtomicBoolean isRunning = new AtomicBoolean(false); /** * constructor @@ -157,35 +163,88 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS // Do not create temp partition on FE partitionNames = new ArrayList<>(); } + InsertOverwriteManager insertOverwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); + insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), targetTable); + isRunning.set(true); long taskId = 0; try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + taskId = insertOverwriteManager.registerTaskGroup(); // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); + insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); - taskId = Env.getCurrentEnv().getInsertOverwriteManager() + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", + ctx.getQueryIdentifier()); + return; + } + taskId = insertOverwriteManager .registerTask(targetTable.getDatabase().getId(), targetTable.getId(), tempPartitionNames); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before addTempPartitions, queryId: {}", + ctx.getQueryIdentifier()); + // not need deal temp partition + insertOverwriteManager.taskSuccess(taskId); + return; + } InsertOverwriteUtil.addTempPartitions(targetTable, partitionNames, tempPartitionNames); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before insertInto, queryId: {}", ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } insertInto(ctx, executor, tempPartitionNames); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before replacePartition, queryId: {}", + ctx.getQueryIdentifier()); + insertOverwriteManager.taskFail(taskId); + return; + } InsertOverwriteUtil.replacePartition(targetTable, partitionNames, tempPartitionNames); - Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); + if (isCancelled.get()) { + LOG.info("insert overwrite is cancelled before taskSuccess, do nothing, queryId: {}", + ctx.getQueryIdentifier()); + } + insertOverwriteManager.taskSuccess(taskId); } } catch (Exception e) { LOG.warn("insert into overwrite failed with task(or group) id " + taskId); if (isAutoDetectOverwrite()) { - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); + insertOverwriteManager.taskGroupFail(taskId); } else { - Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId); + insertOverwriteManager.taskFail(taskId); } throw e; } finally { ConnectContext.get().setSkipAuth(false); + insertOverwriteManager + .dropRunningRecord(targetTable.getDatabase().getId(), targetTable.getId()); + isRunning.set(false); + } + } + + /** + * cancel insert overwrite + */ + public void cancel() { + this.isCancelled.set(true); + } + + /** + * wait insert overwrite not running + */ + public void waitNotRunning() { + long waitMaxTimeSecond = 10L; + try { + Awaitility.await().atMost(waitMaxTimeSecond, TimeUnit.SECONDS).untilFalse(isRunning); + } catch (Exception e) { + LOG.warn("waiting time exceeds {} second, stop wait, labelName: {}", waitMaxTimeSecond, + labelName.isPresent() ? labelName.get() : "", e); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f1dda99ff2d..672f11aab61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1516,6 +1516,11 @@ public class StmtExecutor { } return; } + Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand(); + if (insertOverwriteTableCommand.isPresent()) { + // If the be scheduling has not been triggered yet, cancel the scheduling first + insertOverwriteTableCommand.get().cancel(); + } Coordinator coordRef = coord; if (coordRef != null) { coordRef.cancel(); @@ -1526,6 +1531,22 @@ public class StmtExecutor { if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) { Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context); } + if (insertOverwriteTableCommand.isPresent()) { + // Wait for the command to run or cancel completion + insertOverwriteTableCommand.get().waitNotRunning(); + } + } + + private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() { + if (parsedStmt instanceof LogicalPlanAdapter) { + LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) parsedStmt; + LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan(); + if (logicalPlan instanceof InsertOverwriteTableCommand) { + InsertOverwriteTableCommand insertOverwriteTableCommand = (InsertOverwriteTableCommand) logicalPlan; + return Optional.of(insertOverwriteTableCommand); + } + } + return Optional.empty(); } // Because this is called by other thread diff --git a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java new file mode 100644 index 00000000000..4bf6c9f12d5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.insertoverwrite; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; + +import mockit.Expectations; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class InsertOverwriteManagerTest { + @Mocked + private DatabaseIf db; + + @Mocked + private TableIf table; + + @Before + public void setUp() + throws NoSuchMethodException, SecurityException, AnalysisException, DdlException, MetaNotFoundException { + + new Expectations() { + { + db.getId(); + minTimes = 0; + result = 1L; + + db.getFullName(); + minTimes = 0; + result = "db1"; + + table.getId(); + minTimes = 0; + result = 2L; + + table.getName(); + minTimes = 0; + result = "table1"; + } + }; + } + + @Test + public void testParallel() { + InsertOverwriteManager manager = new InsertOverwriteManager(); + manager.recordRunningTableOrException(db, table); + try { + manager.recordRunningTableOrException(db, table); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Not allowed")); + } + manager.dropRunningRecord(db.getId(), table.getId()); + manager.recordRunningTableOrException(db, table); + } + +} diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 5bf81a49b6a..8e6b04978bc 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1293,7 +1293,29 @@ class Suite implements GroovyInterceptable { } logger.info("The state of ${showTasks} is ${status}") Thread.sleep(1000); - } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL')) + if (status != "SUCCESS") { + logger.info("status is not success") + } + Assert.assertEquals("SUCCESS", status) + } + + void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) { + Thread.sleep(2000); + String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC" + String status = "NULL" + List<List<Object>> result + long startTime = System.currentTimeMillis() + long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min + do { + result = sql(showTasks) + logger.info("result: " + result.toString()) + if (!result.isEmpty()) { + status = result.last().get(4) + } + logger.info("The state of ${showTasks} is ${status}") + Thread.sleep(1000); + } while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED')) if (status != "SUCCESS") { logger.info("status is not success") } diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy new file mode 100644 index 00000000000..fa1618d5bf5 --- /dev/null +++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_alter_job_mtmv") { + String suiteName = "test_alter_job_mtmv" + String tableName = "${suiteName}_table" + String mvName = "${suiteName}_mv" + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" + + sql """ + CREATE TABLE `${tableName}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + sql """ + insert into ${tableName} values(1,"2017-01-15",1),(1,"2017-02-15",2),(1,"2017-03-15",3); + """ + + //This is an immediately built materialized view that cancels running tasks and creates new ones after updating job information. + // Due to the uncertainty of the case, there may be several situations here: + // 1. The task has not been created yet, so it has not been cancelled + // 2. The task has been completed, so there was no cancellation + // 3. The task has been created but not yet completed + // But regardless of the status of the previous case, + // this case is used to ensure that the newly launched task can run successfully after modifying the materialized view + sql """ + CREATE MATERIALIZED VIEW ${mvName} + REFRESH COMPLETE ON MANUAL + partition by(`date`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + SELECT * FROM ${tableName}; + """ + sql """alter MATERIALIZED VIEW ${mvName} refresh COMPLETE on commit; """ + waitingMTMVTaskFinishedByMvNameAllowCancel(mvName) + + sql """drop table if exists `${tableName}`""" + sql """drop materialized view if exists ${mvName};""" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org