This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new dab01385671 [branch-2.1](insert-overwrite) Fix insert overwrite auto 
detect transaction safe (#38103) (#38442)
dab01385671 is described below

commit dab013856719ef899c39cd69026bf2784d1b6c73
Author: zclllhhjj <zhaochan...@selectdb.com>
AuthorDate: Mon Jul 29 10:21:03 2024 +0800

    [branch-2.1](insert-overwrite) Fix insert overwrite auto detect transaction 
safe (#38103) (#38442)
    
    pick https://github.com/apache/doris/pull/38103
---
 .../java/org/apache/doris/catalog/OlapTable.java   |   9 +-
 .../insertoverwrite/InsertOverwriteManager.java    |  66 ++++++++++---
 .../doris/insertoverwrite/InsertOverwriteUtil.java |   1 +
 .../insert/InsertOverwriteTableCommand.java        |  11 ++-
 .../apache/doris/service/FrontendServiceImpl.java  | 107 ++++++++++++++-------
 .../test_iot_auto_detect_concurrent.groovy         |   2 +
 6 files changed, 143 insertions(+), 53 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index b0169571dfc..55ec9938d9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1112,10 +1112,13 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
         return Sets.newHashSet(nameToPartition.keySet());
     }
 
-    public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) {
+    // for those elements equal in partiton ids, get their names.
+    public List<String> getEqualPartitionNames(List<Long> partitionIds1, 
List<Long> partitionIds2) {
         List<String> names = new ArrayList<String>();
-        for (Long id : partitionIds) {
-            names.add(idToPartition.get(id).getName());
+        for (int i = 0; i < partitionIds1.size(); i++) {
+            if (partitionIds1.get(i).equals(partitionIds2.get(i))) {
+                names.add(getPartition(partitionIds1.get(i)).getName());
+            }
         }
         return names;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index e0c46dde920..81524ae0208 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
     // but we only change one time and save the relations in partitionPairs. 
they're protected by taskLocks
     @SerializedName(value = "taskLocks")
     private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
-    // <groupId, <oldPartId, newPartId>>
+    // <groupId, <oldPartId, newPartId>>. no need concern which task it 
belongs to.
     @SerializedName(value = "partitionPairs")
     private Map<Long, Map<Long, Long>> partitionPairs = 
Maps.newConcurrentMap();
 
@@ -91,7 +91,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
      *
      * @return group id, like a transaction id.
      */
-    public long preRegisterTask() {
+    public long registerTaskGroup() {
         long groupId = Env.getCurrentEnv().getNextId();
         taskGroups.put(groupId, new ArrayList<Long>());
         taskLocks.put(groupId, new ReentrantLock());
@@ -107,44 +107,81 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         taskGroups.get(groupId).add(taskId);
     }
 
-    public List<Long> tryReplacePartitionIds(long groupId, List<Long> 
oldPartitionIds) {
+    /**
+     * this func should in lock scope of getLock(groupId)
+     *
+     * @param newIds if have replaced, replace with new. otherwise itself.
+     */
+    public boolean tryReplacePartitionIds(long groupId, List<Long> 
oldPartitionIds, List<Long> newIds) {
         Map<Long, Long> relations = partitionPairs.get(groupId);
-        List<Long> newIds = new ArrayList<Long>();
-        for (Long id : oldPartitionIds) {
+        boolean needReplace = false;
+        for (int i = 0; i < oldPartitionIds.size(); i++) {
+            long id = oldPartitionIds.get(i);
             if (relations.containsKey(id)) {
                 // if we replaced it. then return new one.
                 newIds.add(relations.get(id));
             } else {
-                // otherwise itself. we will deal it soon.
                 newIds.add(id);
+                needReplace = true;
             }
         }
-        return newIds;
+        return needReplace;
     }
 
+    // this func should in lock scope of getLock(groupId)
     public void recordPartitionPairs(long groupId, List<Long> oldIds, 
List<Long> newIds) {
         Map<Long, Long> relations = partitionPairs.get(groupId);
         Preconditions.checkArgument(oldIds.size() == newIds.size());
         for (int i = 0; i < oldIds.size(); i++) {
             relations.put(oldIds.get(i), newIds.get(i));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " 
+ newIds.get(i) + "]");
+            }
         }
     }
 
+    // lock is a symbol of TaskGroup exist. if not, means already failed.
     public ReentrantLock getLock(long groupId) {
         return taskLocks.get(groupId);
     }
 
+    // When goes into failure, some BE may still not know and send new request.
+    // it will cause ConcurrentModification or NullPointer.
     public void taskGroupFail(long groupId) {
         LOG.info("insert overwrite auto detect partition task group [" + 
groupId + "] failed");
-        for (Long taskId : taskGroups.get(groupId)) {
-            taskFail(taskId);
+        ReentrantLock lock = getLock(groupId);
+        lock.lock();
+        try {
+            // will rollback temp partitions in `taskFail`
+            for (Long taskId : taskGroups.get(groupId)) {
+                taskFail(taskId);
+            }
+            cleanTaskGroup(groupId);
+        } finally {
+            lock.unlock();
         }
-        cleanTaskGroup(groupId);
     }
 
-    public void taskGroupSuccess(long groupId) {
+    // here we will make all raplacement of this group visiable. if someone 
fails, nothing happen.
+    public void taskGroupSuccess(long groupId, OlapTable targetTable) throws 
DdlException {
+        try {
+            Map<Long, Long> relations = partitionPairs.get(groupId);
+            ArrayList<String> oldNames = new ArrayList<>();
+            ArrayList<String> newNames = new ArrayList<>();
+            for (Entry<Long, Long> partitionPair : relations.entrySet()) {
+                
oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName());
+                
newNames.add(targetTable.getPartition(partitionPair.getValue()).getName());
+            }
+            InsertOverwriteUtil.replacePartition(targetTable, oldNames, 
newNames);
+        } catch (Exception e) {
+            LOG.warn("insert overwrite task making replacement failed because 
" + e.getMessage()
+                    + "all new partition will not be visible and will be 
recycled by partition GC.");
+            throw e;
+        }
         LOG.info("insert overwrite auto detect partition task group [" + 
groupId + "] succeed");
         for (Long taskId : taskGroups.get(groupId)) {
+            Env.getCurrentEnv().getEditLog()
+                    .logInsertOverwrite(new InsertOverwriteLog(taskId, 
tasks.get(taskId), InsertOverwriteOpType.ADD));
             taskSuccess(taskId);
         }
         cleanTaskGroup(groupId);
@@ -164,6 +201,9 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
     public void taskFail(long taskId) {
         LOG.info("insert overwrite task [" + taskId + "] failed");
         boolean rollback = rollback(taskId);
+        if (!rollback) {
+            LOG.warn("roll back task [" + taskId + "] failed");
+        }
         if (rollback) {
             removeTask(taskId);
         } else {
@@ -192,6 +232,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         }
     }
 
+    // cancel it. should try to remove them after.
     private void cancelTask(long taskId) {
         if (tasks.containsKey(taskId)) {
             LOG.info("cancel insert overwrite task: {}", tasks.get(taskId));
@@ -201,6 +242,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         }
     }
 
+    // task and partitions has been removed. it's safe to remove task.
     private void removeTask(long taskId) {
         if (tasks.containsKey(taskId)) {
             LOG.info("remove insert overwrite task: {}", tasks.get(taskId));
@@ -222,7 +264,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         try {
             olapTable = task.getTable();
         } catch (DdlException e) {
-            LOG.warn("can not get table, task: {}", task);
+            LOG.warn("can not get table, task: {}, reason: {}", task, 
e.getMessage());
             return true;
         }
         return InsertOverwriteUtil.dropPartitions(olapTable, 
task.getTempPartitionNames());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
index c4d3068e09f..a0e04a35bd9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
@@ -54,6 +54,7 @@ public class InsertOverwriteUtil {
             for (int i = 0; i < partitionNames.size(); i++) {
                 Env.getCurrentEnv().addPartitionLike((Database) 
tableIf.getDatabase(), tableIf.getName(),
                         new AddPartitionLikeClause(tempPartitionNames.get(i), 
partitionNames.get(i), true));
+                LOG.info("successfully add temp partition [{}] for [{}]", 
tempPartitionNames.get(i), tableIf.getName());
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index bd5af3225f0..75b80ade581 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -169,11 +169,12 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
         try {
             if (isAutoDetectOverwrite()) {
                 // taskId here is a group id. it contains all replace tasks 
made and registered in rpc process.
-                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask();
-                // When inserting, BE will call to replace partition by 
FrontendService. FE do the real
-                // add&replacement and return replace result. So there's no 
need to do anything else.
+                taskId = 
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+                // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
+                // partitions and return. for transactional, the replacement 
will really occur when insert successed,
+                // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
                 insertInto(ctx, executor, taskId);
-                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId);
+                
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, 
(OlapTable) targetTable);
             } else {
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
                 taskId = Env.getCurrentEnv().getInsertOverwriteManager()
@@ -184,7 +185,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                 
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
             }
         } catch (Exception e) {
-            LOG.warn("insert into overwrite failed");
+            LOG.warn("insert into overwrite failed with task(or group) id " + 
taskId);
             if (isAutoDetectOverwrite()) {
                 
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 4935e026f11..77bf3173377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -276,7 +276,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 // Frontend service used to serve all request for this frontend through
 // thrift protocol
@@ -3568,7 +3567,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         LOG.info("Receive replace partition request: {}", request);
         long dbId = request.getDbId();
         long tableId = request.getTableId();
-        List<Long> partitionIds = request.getPartitionIds();
+        List<Long> reqPartitionIds = request.getPartitionIds();
         long taskGroupId = request.getOverwriteGroupId();
         TReplacePartitionResult result = new TReplacePartitionResult();
         TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
@@ -3607,41 +3606,60 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         OlapTable olapTable = (OlapTable) table;
         InsertOverwriteManager overwriteManager = 
Env.getCurrentEnv().getInsertOverwriteManager();
         ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
-        List<String> allReqPartNames; // all request partitions
+        if (taskLock == null) {
+            errorStatus.setErrorMsgs(Lists
+                    .newArrayList(new String("cannot find task group " + 
taskGroupId + ", maybe already failed.")));
+            result.setStatus(errorStatus);
+            LOG.warn("send create partition error status: {}", result);
+            return result;
+        }
+
+        ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] 
-> [7 8 5 6]
+        ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: 
[1 2]
+        ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp 
partition ids. for [7 8]
+        boolean needReplace = false;
         try {
             taskLock.lock();
+            // double check lock. maybe taskLock is not null, but has been 
removed from the Map. means the task failed.
+            if (overwriteManager.getLock(taskGroupId) == null) {
+                errorStatus.setErrorMsgs(Lists
+                        .newArrayList(new String("cannot find task group " + 
taskGroupId + ", maybe already failed.")));
+                result.setStatus(errorStatus);
+                LOG.warn("send create partition error status: {}", result);
+                return result;
+            }
+
             // we dont lock the table. other thread in this txn will be 
controled by taskLock.
-            // if we have already replaced. dont do it again, but acquire the 
recorded new partition directly.
+            // if we have already replaced, dont do it again, but acquire the 
recorded new partition directly.
             // if not by this txn, just let it fail naturally is ok.
-            List<Long> replacedPartIds = 
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
-            // here if replacedPartIds still have null. this will throw 
exception.
-            allReqPartNames = 
olapTable.uncheckedGetPartNamesById(replacedPartIds);
-
-            List<Long> pendingPartitionIds = IntStream.range(0, 
partitionIds.size())
-                    .filter(i -> partitionIds.get(i) == 
replacedPartIds.get(i)) // equal means not replaced
-                    .mapToObj(partitionIds::get)
-                    .collect(Collectors.toList());
-            // from here we ONLY deal the pending partitions. not include the 
dealed(by others).
-            if (!pendingPartitionIds.isEmpty()) {
-                // below two must have same order inner.
-                List<String> pendingPartitionNames = 
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
-                List<String> tempPartitionNames = InsertOverwriteUtil
-                        .generateTempPartitionNames(pendingPartitionNames);
+            needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, 
reqPartitionIds, resultPartitionIds);
+            // request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need 
replace.
+            if (needReplace) {
+                // names for [1 2]
+                List<String> pendingPartitionNames = 
olapTable.getEqualPartitionNames(reqPartitionIds,
+                                resultPartitionIds);
+                for (String name : pendingPartitionNames) {
+                    
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
+                }
 
-                long taskId = overwriteManager.registerTask(dbId, tableId, 
tempPartitionNames);
+                // names for [7 8]
+                List<String> newTempNames = InsertOverwriteUtil
+                        .generateTempPartitionNames(pendingPartitionNames);
+                // a task means one time insert overwrite
+                long taskId = overwriteManager.registerTask(dbId, tableId, 
newTempNames);
                 overwriteManager.registerTaskInGroup(taskGroupId, taskId);
-                InsertOverwriteUtil.addTempPartitions(olapTable, 
pendingPartitionNames, tempPartitionNames);
-                InsertOverwriteUtil.replacePartition(olapTable, 
pendingPartitionNames, tempPartitionNames);
+                InsertOverwriteUtil.addTempPartitions(olapTable, 
pendingPartitionNames, newTempNames);
                 // now temp partitions are bumped up and use new names. we get 
their ids and record them.
-                List<Long> newPartitionIds = new ArrayList<Long>();
-                for (String newPartName : pendingPartitionNames) {
-                    
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
+                for (String newPartName : newTempNames) {
+                    
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
                 }
                 overwriteManager.recordPartitionPairs(taskGroupId, 
pendingPartitionIds, newPartitionIds);
+
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("partition replacement: ");
                     for (int i = 0; i < pendingPartitionIds.size(); i++) {
-                        LOG.debug("[" + pendingPartitionIds.get(i) + ", " + 
newPartitionIds.get(i) + "], ");
+                        LOG.debug("[" + pendingPartitionIds.get(i) + " - " + 
pendingPartitionNames.get(i) + ", "
+                                + newPartitionIds.get(i) + " - " + 
newTempNames.get(i) + "], ");
                     }
                 }
             }
@@ -3654,15 +3672,38 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             taskLock.unlock();
         }
 
-        // build partition & tablets. now all partitions in allReqPartNames 
are replaced
-        // an recorded.
-        // so they won't be changed again. if other transaction changing it. 
just let it
-        // fail.
-        List<TOlapTablePartition> partitions = Lists.newArrayList();
-        List<TTabletLocation> tablets = Lists.newArrayList();
+        // result: [1 2 5 6], make it [7 8 5 6]
+        int idx = 0;
+        if (needReplace) {
+            for (int i = 0; i < reqPartitionIds.size(); i++) {
+                if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
+                    resultPartitionIds.set(i, newPartitionIds.get(idx++));
+                }
+            }
+        }
+        if (idx != newPartitionIds.size()) {
+            errorStatus.addToErrorMsgs("changed partition number " + idx + " 
is not correct");
+            result.setStatus(errorStatus);
+            LOG.warn("send create partition error status: {}", result);
+            return result;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("replace partition origin ids: ["
+                    + String.join(", ", 
reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+                    + ']');
+            LOG.debug("replace partition result ids: ["
+                    + String.join(", ", 
resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+                    + ']');
+        }
+
+        // build partition & tablets. now all partitions in allReqPartNames 
are replaced an recorded.
+        // so they won't be changed again. if other transaction changing it. 
just let it fail.
+        List<TOlapTablePartition> partitions = new ArrayList<>();
+        List<TTabletLocation> tablets = new ArrayList<>();
         PartitionInfo partitionInfo = olapTable.getPartitionInfo();
-        for (String partitionName : allReqPartNames) {
-            Partition partition = table.getPartition(partitionName);
+        for (long partitionId : resultPartitionIds) {
+            Partition partition = olapTable.getPartition(partitionId);
             TOlapTablePartition tPartition = new TOlapTablePartition();
             tPartition.setId(partition.getId());
 
diff --git 
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
 
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
index 200dd874df9..96b285ea4ca 100644
--- 
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
+++ 
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
@@ -83,11 +83,13 @@ suite("test_iot_auto_detect_concurrent") {
     thread5.join()
     // suppose result: success zero or one
     if (success_status) { // success zero
+        log.info("test 1: success zero")
         result = sql " select count(k0) from test_concurrent_write; "
         assertEquals(result[0][0], 1000)
         result = sql " select count(distinct k0) from test_concurrent_write; "
         assertEquals(result[0][0], 1000)
     } else { // success one
+        log.info("test 1: success one")
         result = sql " select count(k0) from test_concurrent_write; "
         assertEquals(result[0][0], 100)
         result = sql " select count(distinct k0) from test_concurrent_write; "


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to