This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new df2bd045131 [enhance](mtmv) Refuse to execute insert overwrite on the 
same table (#40558)
df2bd045131 is described below

commit df2bd045131dfdb9f31dc78233ade28331422493
Author: zhangdong <493738...@qq.com>
AuthorDate: Thu Sep 19 15:06:20 2024 +0800

    [enhance](mtmv) Refuse to execute insert overwrite on the same table 
(#40558)
    
    Refuse to execute insert overwrite on the same table:
    - Sometimes it is not possible to cancel a running insert overwrite
    - When executing insert overwrite in parallel, it causes temporary
    partitions to affect each other
---
 .../insertoverwrite/InsertOverwriteManager.java    | 58 ++++++++++++++++
 .../apache/doris/job/extensions/mtmv/MTMVTask.java |  4 ++
 .../insert/InsertOverwriteTableCommand.java        | 71 ++++++++++++++++++--
 .../java/org/apache/doris/qe/StmtExecutor.java     | 21 ++++++
 .../InsertOverwriteManagerTest.java                | 77 ++++++++++++++++++++++
 .../org/apache/doris/regression/suite/Suite.groovy | 24 ++++++-
 .../suites/mtmv_p0/test_alter_job_mtmv.groovy      | 67 +++++++++++++++++++
 7 files changed, 315 insertions(+), 7 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index 81524ae0208..a00107c76a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -17,17 +17,21 @@
 
 package org.apache.doris.insertoverwrite;
 
+import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.MasterDaemon;
 import 
org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType;
+import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -40,7 +44,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class InsertOverwriteManager extends MasterDaemon implements Writable {
     private static final Logger LOG = 
LogManager.getLogger(InsertOverwriteManager.class);
@@ -62,6 +68,11 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
     @SerializedName(value = "partitionPairs")
     private Map<Long, Map<Long, Long>> partitionPairs = 
Maps.newConcurrentMap();
 
+    // TableId running insert overwrite
+    // dbId ==> Set<tableId>
+    private Map<Long, Set<Long>> runningTables = Maps.newHashMap();
+    private ReentrantReadWriteLock runningLock = new 
ReentrantReadWriteLock(true);
+
     public InsertOverwriteManager() {
         super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND * 
1000);
     }
@@ -270,6 +281,53 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         return InsertOverwriteUtil.dropPartitions(olapTable, 
task.getTempPartitionNames());
     }
 
+    /**
+     * If the current table id has a running insert overwrite, throw an 
exception.
+     * If not, record it in runningTables
+     *
+     * @param db Run the db for insert overwrite
+     * @param table Run the table for insert overwrite
+     */
+    public void recordRunningTableOrException(DatabaseIf db, TableIf table) {
+        long dbId = db.getId();
+        long tableId = table.getId();
+        runningLock.writeLock().lock();
+        try {
+            if (runningTables.containsKey(dbId) && 
runningTables.get(dbId).contains(tableId)) {
+                throw new AnalysisException(
+                        String.format("Not allowed running Insert Overwrite on 
same table: %s.%s", db.getFullName(),
+                                table.getName()));
+            }
+            if (runningTables.containsKey(dbId)) {
+                runningTables.get(dbId).add(tableId);
+            } else {
+                runningTables.put(dbId, Sets.newHashSet(tableId));
+            }
+        } finally {
+            runningLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Remove from running records
+     *
+     * @param dbId Run the dbId for insert overwrite
+     * @param tableId Run the tableId for insert overwrite
+     */
+    public void dropRunningRecord(long dbId, long tableId) {
+        runningLock.writeLock().lock();
+        try {
+            if (runningTables.containsKey(dbId) && 
runningTables.get(dbId).contains(tableId)) {
+                runningTables.get(dbId).remove(tableId);
+                if (runningTables.get(dbId).size() == 0) {
+                    runningTables.remove(dbId);
+                }
+            }
+        } finally {
+            runningLock.writeLock().unlock();
+        }
+    }
+
     /**
      * replay logs
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 0fe60a94e56..59a421509d9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -229,6 +229,10 @@ public class MTMVTask extends AbstractTask {
         ctx.setQueryId(queryId);
         ctx.getState().setNereids(true);
         command.run(ctx, executor);
+        if (getStatus() == TaskStatus.CANCELED) {
+            // Throwing an exception to interrupt subsequent partition update 
tasks
+            throw new JobException("task is CANCELED");
+        }
         if (ctx.getState().getStateType() != MysqlStateType.OK) {
             throw new JobException(ctx.getState().getErrorMessage());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index afcb5ee81d2..064fccaf521 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.InternalDatabaseUtil;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.insertoverwrite.InsertOverwriteManager;
 import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
 import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -60,11 +61,14 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * insert into select command implementation
@@ -81,6 +85,8 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
     private LogicalPlan logicalQuery;
     private Optional<String> labelName;
     private final Optional<LogicalPlan> cte;
+    private AtomicBoolean isCancelled = new AtomicBoolean(false);
+    private AtomicBoolean isRunning = new AtomicBoolean(false);
 
     /**
      * constructor
@@ -157,35 +163,88 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
             // Do not create temp partition on FE
             partitionNames = new ArrayList<>();
         }
+        InsertOverwriteManager insertOverwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
+        
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(), 
targetTable);
+        isRunning.set(true);
         long taskId = 0;
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                taskId = insertOverwriteManager.registerTaskGroup();
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
+                insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
-                taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+                if (isCancelled.get()) {
+                    LOG.info("insert overwrite is cancelled before 
registerTask, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    return;
+                }
+                taskId = insertOverwriteManager
                         .registerTask(targetTable.getDatabase().getId(), 
targetTable.getId(), tempPartitionNames);
+                if (isCancelled.get()) {
+                    LOG.info("insert overwrite is cancelled before 
addTempPartitions, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    // not need deal temp partition
+                    insertOverwriteManager.taskSuccess(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.addTempPartitions(targetTable, 
partitionNames, tempPartitionNames);
+                if (isCancelled.get()) {
+                    LOG.info("insert overwrite is cancelled before insertInto, 
queryId: {}", ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 insertInto(ctx, executor, tempPartitionNames);
+                if (isCancelled.get()) {
+                    LOG.info("insert overwrite is cancelled before 
replacePartition, queryId: {}",
+                            ctx.getQueryIdentifier());
+                    insertOverwriteManager.taskFail(taskId);
+                    return;
+                }
                 InsertOverwriteUtil.replacePartition(targetTable, 
partitionNames, tempPartitionNames);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+                if (isCancelled.get()) {
+                    LOG.info("insert overwrite is cancelled before 
taskSuccess, do nothing, queryId: {}",
+                            ctx.getQueryIdentifier());
+                }
+                insertOverwriteManager.taskSuccess(taskId);
             }
         } catch (Exception e) {
             LOG.warn("insert into overwrite failed with task(or group) id " + 
taskId);
             if (isAutoDetectOverwrite()) {
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
+                insertOverwriteManager.taskGroupFail(taskId);
             } else {
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
+                insertOverwriteManager.taskFail(taskId);
             }
             throw e;
         } finally {
             ConnectContext.get().setSkipAuth(false);
+            insertOverwriteManager
+                    .dropRunningRecord(targetTable.getDatabase().getId(), 
targetTable.getId());
+            isRunning.set(false);
+        }
+    }
+
+    /**
+     * cancel insert overwrite
+     */
+    public void cancel() {
+        this.isCancelled.set(true);
+    }
+
+    /**
+     * wait insert overwrite not running
+     */
+    public void waitNotRunning() {
+        long waitMaxTimeSecond = 10L;
+        try {
+            Awaitility.await().atMost(waitMaxTimeSecond, 
TimeUnit.SECONDS).untilFalse(isRunning);
+        } catch (Exception e) {
+            LOG.warn("waiting time exceeds {} second, stop wait, labelName: 
{}", waitMaxTimeSecond,
+                    labelName.isPresent() ? labelName.get() : "", e);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f1dda99ff2d..672f11aab61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1516,6 +1516,11 @@ public class StmtExecutor {
             }
             return;
         }
+        Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = 
getInsertOverwriteTableCommand();
+        if (insertOverwriteTableCommand.isPresent()) {
+            // If the be scheduling has not been triggered yet, cancel the 
scheduling first
+            insertOverwriteTableCommand.get().cancel();
+        }
         Coordinator coordRef = coord;
         if (coordRef != null) {
             coordRef.cancel();
@@ -1526,6 +1531,22 @@ public class StmtExecutor {
         if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof 
AnalyzeDBStmt) {
             Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
         }
+        if (insertOverwriteTableCommand.isPresent()) {
+            // Wait for the command to run or cancel completion
+            insertOverwriteTableCommand.get().waitNotRunning();
+        }
+    }
+
+    private Optional<InsertOverwriteTableCommand> 
getInsertOverwriteTableCommand() {
+        if (parsedStmt instanceof LogicalPlanAdapter) {
+            LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) 
parsedStmt;
+            LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
+            if (logicalPlan instanceof InsertOverwriteTableCommand) {
+                InsertOverwriteTableCommand insertOverwriteTableCommand = 
(InsertOverwriteTableCommand) logicalPlan;
+                return Optional.of(insertOverwriteTableCommand);
+            }
+        }
+        return Optional.empty();
     }
 
     // Because this is called by other thread
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
new file mode 100644
index 00000000000..4bf6c9f12d5
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.insertoverwrite;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InsertOverwriteManagerTest {
+    @Mocked
+    private DatabaseIf db;
+
+    @Mocked
+    private TableIf table;
+
+    @Before
+    public void setUp()
+            throws NoSuchMethodException, SecurityException, 
AnalysisException, DdlException, MetaNotFoundException {
+
+        new Expectations() {
+            {
+                db.getId();
+                minTimes = 0;
+                result = 1L;
+
+                db.getFullName();
+                minTimes = 0;
+                result = "db1";
+
+                table.getId();
+                minTimes = 0;
+                result = 2L;
+
+                table.getName();
+                minTimes = 0;
+                result = "table1";
+            }
+        };
+    }
+
+    @Test
+    public void testParallel() {
+        InsertOverwriteManager manager = new InsertOverwriteManager();
+        manager.recordRunningTableOrException(db, table);
+        try {
+            manager.recordRunningTableOrException(db, table);
+        } catch (Exception e) {
+            Assert.assertTrue(e.getMessage().contains("Not allowed"));
+        }
+        manager.dropRunningRecord(db.getId(), table.getId());
+        manager.recordRunningTableOrException(db, table);
+    }
+
+}
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 5bf81a49b6a..8e6b04978bc 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1293,7 +1293,29 @@ class Suite implements GroovyInterceptable {
             }
             logger.info("The state of ${showTasks} is ${status}")
             Thread.sleep(1000);
-        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING'  || status == 'NULL'))
+        if (status != "SUCCESS") {
+            logger.info("status is not success")
+        }
+        Assert.assertEquals("SUCCESS", status)
+    }
+
+    void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
+        Thread.sleep(2000);
+        String showTasks = "select 
TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from 
tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
+        String status = "NULL"
+        List<List<Object>> result
+        long startTime = System.currentTimeMillis()
+        long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
+        do {
+            result = sql(showTasks)
+            logger.info("result: " + result.toString())
+            if (!result.isEmpty()) {
+                status = result.last().get(4)
+            }
+            logger.info("The state of ${showTasks} is ${status}")
+            Thread.sleep(1000);
+        } while (timeoutTimestamp > System.currentTimeMillis() && (status == 
'PENDING' || status == 'RUNNING'  || status == 'NULL' || status == 'CANCELED'))
         if (status != "SUCCESS") {
             logger.info("status is not success")
         }
diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy 
b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy
new file mode 100644
index 00000000000..fa1618d5bf5
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert;
+
+suite("test_alter_job_mtmv") {
+    String suiteName = "test_alter_job_mtmv"
+    String tableName = "${suiteName}_table"
+    String mvName = "${suiteName}_mv"
+    sql """drop table if exists `${tableName}`"""
+    sql """drop materialized view if exists ${mvName};"""
+
+    sql """
+        CREATE TABLE `${tableName}` (
+          `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
+          `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
+          `num` SMALLINT NOT NULL COMMENT '\"数量\"'
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`user_id`, `date`, `num`)
+        COMMENT 'OLAP'
+        PARTITION BY RANGE(`date`)
+        (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
+        PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
+        PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
+        DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+        PROPERTIES ('replication_num' = '1') ;
+        """
+    sql """
+        insert into ${tableName} 
values(1,"2017-01-15",1),(1,"2017-02-15",2),(1,"2017-03-15",3);
+        """
+
+    //This is an immediately built materialized view that cancels running 
tasks and creates new ones after updating job information.
+    //  Due to the uncertainty of the case, there may be several situations 
here:
+    //  1. The task has not been created yet, so it has not been cancelled
+    //  2. The task has been completed, so there was no cancellation
+    //  3. The task has been created but not yet completed
+    // But regardless of the status of the previous case,
+    // this case is used to ensure that the newly launched task can run 
successfully after modifying the materialized view
+    sql """
+        CREATE MATERIALIZED VIEW ${mvName}
+            REFRESH COMPLETE ON MANUAL
+            partition by(`date`)
+            DISTRIBUTED BY RANDOM BUCKETS 2
+            PROPERTIES ('replication_num' = '1')
+            AS
+            SELECT * FROM ${tableName};
+    """
+    sql """alter MATERIALIZED VIEW ${mvName} refresh COMPLETE on commit; """
+    waitingMTMVTaskFinishedByMvNameAllowCancel(mvName)
+
+    sql """drop table if exists `${tableName}`"""
+    sql """drop materialized view if exists ${mvName};"""
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to