This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new dab01385671 [branch-2.1](insert-overwrite) Fix insert overwrite auto detect transaction safe (#38103) (#38442) dab01385671 is described below commit dab013856719ef899c39cd69026bf2784d1b6c73 Author: zclllhhjj <zhaochan...@selectdb.com> AuthorDate: Mon Jul 29 10:21:03 2024 +0800 [branch-2.1](insert-overwrite) Fix insert overwrite auto detect transaction safe (#38103) (#38442) pick https://github.com/apache/doris/pull/38103 --- .../java/org/apache/doris/catalog/OlapTable.java | 9 +- .../insertoverwrite/InsertOverwriteManager.java | 66 ++++++++++--- .../doris/insertoverwrite/InsertOverwriteUtil.java | 1 + .../insert/InsertOverwriteTableCommand.java | 11 ++- .../apache/doris/service/FrontendServiceImpl.java | 107 ++++++++++++++------- .../test_iot_auto_detect_concurrent.groovy | 2 + 6 files changed, 143 insertions(+), 53 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index b0169571dfc..55ec9938d9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1112,10 +1112,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return Sets.newHashSet(nameToPartition.keySet()); } - public List<String> uncheckedGetPartNamesById(List<Long> partitionIds) { + // for those elements equal in partiton ids, get their names. + public List<String> getEqualPartitionNames(List<Long> partitionIds1, List<Long> partitionIds2) { List<String> names = new ArrayList<String>(); - for (Long id : partitionIds) { - names.add(idToPartition.get(id).getName()); + for (int i = 0; i < partitionIds1.size(); i++) { + if (partitionIds1.get(i).equals(partitionIds2.get(i))) { + names.add(getPartition(partitionIds1.get(i)).getName()); + } } return names; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java index e0c46dde920..81524ae0208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java @@ -58,7 +58,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { // but we only change one time and save the relations in partitionPairs. they're protected by taskLocks @SerializedName(value = "taskLocks") private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap(); - // <groupId, <oldPartId, newPartId>> + // <groupId, <oldPartId, newPartId>>. no need concern which task it belongs to. @SerializedName(value = "partitionPairs") private Map<Long, Map<Long, Long>> partitionPairs = Maps.newConcurrentMap(); @@ -91,7 +91,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { * * @return group id, like a transaction id. */ - public long preRegisterTask() { + public long registerTaskGroup() { long groupId = Env.getCurrentEnv().getNextId(); taskGroups.put(groupId, new ArrayList<Long>()); taskLocks.put(groupId, new ReentrantLock()); @@ -107,44 +107,81 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { taskGroups.get(groupId).add(taskId); } - public List<Long> tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds) { + /** + * this func should in lock scope of getLock(groupId) + * + * @param newIds if have replaced, replace with new. otherwise itself. + */ + public boolean tryReplacePartitionIds(long groupId, List<Long> oldPartitionIds, List<Long> newIds) { Map<Long, Long> relations = partitionPairs.get(groupId); - List<Long> newIds = new ArrayList<Long>(); - for (Long id : oldPartitionIds) { + boolean needReplace = false; + for (int i = 0; i < oldPartitionIds.size(); i++) { + long id = oldPartitionIds.get(i); if (relations.containsKey(id)) { // if we replaced it. then return new one. newIds.add(relations.get(id)); } else { - // otherwise itself. we will deal it soon. newIds.add(id); + needReplace = true; } } - return newIds; + return needReplace; } + // this func should in lock scope of getLock(groupId) public void recordPartitionPairs(long groupId, List<Long> oldIds, List<Long> newIds) { Map<Long, Long> relations = partitionPairs.get(groupId); Preconditions.checkArgument(oldIds.size() == newIds.size()); for (int i = 0; i < oldIds.size(); i++) { relations.put(oldIds.get(i), newIds.get(i)); + if (LOG.isDebugEnabled()) { + LOG.debug("recorded partition pairs: [" + oldIds.get(i) + ", " + newIds.get(i) + "]"); + } } } + // lock is a symbol of TaskGroup exist. if not, means already failed. public ReentrantLock getLock(long groupId) { return taskLocks.get(groupId); } + // When goes into failure, some BE may still not know and send new request. + // it will cause ConcurrentModification or NullPointer. public void taskGroupFail(long groupId) { LOG.info("insert overwrite auto detect partition task group [" + groupId + "] failed"); - for (Long taskId : taskGroups.get(groupId)) { - taskFail(taskId); + ReentrantLock lock = getLock(groupId); + lock.lock(); + try { + // will rollback temp partitions in `taskFail` + for (Long taskId : taskGroups.get(groupId)) { + taskFail(taskId); + } + cleanTaskGroup(groupId); + } finally { + lock.unlock(); } - cleanTaskGroup(groupId); } - public void taskGroupSuccess(long groupId) { + // here we will make all raplacement of this group visiable. if someone fails, nothing happen. + public void taskGroupSuccess(long groupId, OlapTable targetTable) throws DdlException { + try { + Map<Long, Long> relations = partitionPairs.get(groupId); + ArrayList<String> oldNames = new ArrayList<>(); + ArrayList<String> newNames = new ArrayList<>(); + for (Entry<Long, Long> partitionPair : relations.entrySet()) { + oldNames.add(targetTable.getPartition(partitionPair.getKey()).getName()); + newNames.add(targetTable.getPartition(partitionPair.getValue()).getName()); + } + InsertOverwriteUtil.replacePartition(targetTable, oldNames, newNames); + } catch (Exception e) { + LOG.warn("insert overwrite task making replacement failed because " + e.getMessage() + + "all new partition will not be visible and will be recycled by partition GC."); + throw e; + } LOG.info("insert overwrite auto detect partition task group [" + groupId + "] succeed"); for (Long taskId : taskGroups.get(groupId)) { + Env.getCurrentEnv().getEditLog() + .logInsertOverwrite(new InsertOverwriteLog(taskId, tasks.get(taskId), InsertOverwriteOpType.ADD)); taskSuccess(taskId); } cleanTaskGroup(groupId); @@ -164,6 +201,9 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { public void taskFail(long taskId) { LOG.info("insert overwrite task [" + taskId + "] failed"); boolean rollback = rollback(taskId); + if (!rollback) { + LOG.warn("roll back task [" + taskId + "] failed"); + } if (rollback) { removeTask(taskId); } else { @@ -192,6 +232,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { } } + // cancel it. should try to remove them after. private void cancelTask(long taskId) { if (tasks.containsKey(taskId)) { LOG.info("cancel insert overwrite task: {}", tasks.get(taskId)); @@ -201,6 +242,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { } } + // task and partitions has been removed. it's safe to remove task. private void removeTask(long taskId) { if (tasks.containsKey(taskId)) { LOG.info("remove insert overwrite task: {}", tasks.get(taskId)); @@ -222,7 +264,7 @@ public class InsertOverwriteManager extends MasterDaemon implements Writable { try { olapTable = task.getTable(); } catch (DdlException e) { - LOG.warn("can not get table, task: {}", task); + LOG.warn("can not get table, task: {}, reason: {}", task, e.getMessage()); return true; } return InsertOverwriteUtil.dropPartitions(olapTable, task.getTempPartitionNames()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java index c4d3068e09f..a0e04a35bd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java @@ -54,6 +54,7 @@ public class InsertOverwriteUtil { for (int i = 0; i < partitionNames.size(); i++) { Env.getCurrentEnv().addPartitionLike((Database) tableIf.getDatabase(), tableIf.getName(), new AddPartitionLikeClause(tempPartitionNames.get(i), partitionNames.get(i), true)); + LOG.info("successfully add temp partition [{}] for [{}]", tempPartitionNames.get(i), tableIf.getName()); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index bd5af3225f0..75b80ade581 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -169,11 +169,12 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS try { if (isAutoDetectOverwrite()) { // taskId here is a group id. it contains all replace tasks made and registered in rpc process. - taskId = Env.getCurrentEnv().getInsertOverwriteManager().preRegisterTask(); - // When inserting, BE will call to replace partition by FrontendService. FE do the real - // add&replacement and return replace result. So there's no need to do anything else. + taskId = Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup(); + // When inserting, BE will call to replace partition by FrontendService. FE will register new temp + // partitions and return. for transactional, the replacement will really occur when insert successed, + // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. insertInto(ctx, executor, taskId); - Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId); + Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId, (OlapTable) targetTable); } else { List<String> tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); taskId = Env.getCurrentEnv().getInsertOverwriteManager() @@ -184,7 +185,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId); } } catch (Exception e) { - LOG.warn("insert into overwrite failed"); + LOG.warn("insert into overwrite failed with task(or group) id " + taskId); if (isAutoDetectOverwrite()) { Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4935e026f11..77bf3173377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -276,7 +276,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; // Frontend service used to serve all request for this frontend through // thrift protocol @@ -3568,7 +3567,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.info("Receive replace partition request: {}", request); long dbId = request.getDbId(); long tableId = request.getTableId(); - List<Long> partitionIds = request.getPartitionIds(); + List<Long> reqPartitionIds = request.getPartitionIds(); long taskGroupId = request.getOverwriteGroupId(); TReplacePartitionResult result = new TReplacePartitionResult(); TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR); @@ -3607,41 +3606,60 @@ public class FrontendServiceImpl implements FrontendService.Iface { OlapTable olapTable = (OlapTable) table; InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager(); ReentrantLock taskLock = overwriteManager.getLock(taskGroupId); - List<String> allReqPartNames; // all request partitions + if (taskLock == null) { + errorStatus.setErrorMsgs(Lists + .newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed."))); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + + ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6] + ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: [1 2] + ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8] + boolean needReplace = false; try { taskLock.lock(); + // double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed. + if (overwriteManager.getLock(taskGroupId) == null) { + errorStatus.setErrorMsgs(Lists + .newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed."))); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + // we dont lock the table. other thread in this txn will be controled by taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // if we have already replaced, dont do it again, but acquire the recorded new partition directly. // if not by this txn, just let it fail naturally is ok. - List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); - // here if replacedPartIds still have null. this will throw exception. - allReqPartNames = olapTable.uncheckedGetPartNamesById(replacedPartIds); - - List<Long> pendingPartitionIds = IntStream.range(0, partitionIds.size()) - .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced - .mapToObj(partitionIds::get) - .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by others). - if (!pendingPartitionIds.isEmpty()) { - // below two must have same order inner. - List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); - List<String> tempPartitionNames = InsertOverwriteUtil - .generateTempPartitionNames(pendingPartitionNames); + needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds); + // request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace. + if (needReplace) { + // names for [1 2] + List<String> pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds, + resultPartitionIds); + for (String name : pendingPartitionNames) { + pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2] + } - long taskId = overwriteManager.registerTask(dbId, tableId, tempPartitionNames); + // names for [7 8] + List<String> newTempNames = InsertOverwriteUtil + .generateTempPartitionNames(pendingPartitionNames); + // a task means one time insert overwrite + long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames); overwriteManager.registerTaskInGroup(taskGroupId, taskId); - InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); - InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); + InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames); // now temp partitions are bumped up and use new names. we get their ids and record them. - List<Long> newPartitionIds = new ArrayList<Long>(); - for (String newPartName : pendingPartitionNames) { - newPartitionIds.add(olapTable.getPartition(newPartName).getId()); + for (String newPartName : newTempNames) { + newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8] } overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds); + if (LOG.isDebugEnabled()) { LOG.debug("partition replacement: "); for (int i = 0; i < pendingPartitionIds.size(); i++) { - LOG.debug("[" + pendingPartitionIds.get(i) + ", " + newPartitionIds.get(i) + "], "); + LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", " + + newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], "); } } } @@ -3654,15 +3672,38 @@ public class FrontendServiceImpl implements FrontendService.Iface { taskLock.unlock(); } - // build partition & tablets. now all partitions in allReqPartNames are replaced - // an recorded. - // so they won't be changed again. if other transaction changing it. just let it - // fail. - List<TOlapTablePartition> partitions = Lists.newArrayList(); - List<TTabletLocation> tablets = Lists.newArrayList(); + // result: [1 2 5 6], make it [7 8 5 6] + int idx = 0; + if (needReplace) { + for (int i = 0; i < reqPartitionIds.size(); i++) { + if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) { + resultPartitionIds.set(i, newPartitionIds.get(idx++)); + } + } + } + if (idx != newPartitionIds.size()) { + errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct"); + result.setStatus(errorStatus); + LOG.warn("send create partition error status: {}", result); + return result; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("replace partition origin ids: [" + + String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList())) + + ']'); + LOG.debug("replace partition result ids: [" + + String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList())) + + ']'); + } + + // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. + // so they won't be changed again. if other transaction changing it. just let it fail. + List<TOlapTablePartition> partitions = new ArrayList<>(); + List<TTabletLocation> tablets = new ArrayList<>(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); - for (String partitionName : allReqPartNames) { - Partition partition = table.getPartition(partitionName); + for (long partitionId : resultPartitionIds) { + Partition partition = olapTable.getPartition(partitionId); TOlapTablePartition tPartition = new TOlapTablePartition(); tPartition.setId(partition.getId()); diff --git a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy index 200dd874df9..96b285ea4ca 100644 --- a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy +++ b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy @@ -83,11 +83,13 @@ suite("test_iot_auto_detect_concurrent") { thread5.join() // suppose result: success zero or one if (success_status) { // success zero + log.info("test 1: success zero") result = sql " select count(k0) from test_concurrent_write; " assertEquals(result[0][0], 1000) result = sql " select count(distinct k0) from test_concurrent_write; " assertEquals(result[0][0], 1000) } else { // success one + log.info("test 1: success one") result = sql " select count(k0) from test_concurrent_write; " assertEquals(result[0][0], 100) result = sql " select count(distinct k0) from test_concurrent_write; " --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org