This is an automated email from the ASF dual-hosted git repository. w41ter pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0a21b0e13d0 branch-3.0: [fix](restore) Add synchronized to avoid concurrent modification (#43258) 0a21b0e13d0 is described below commit 0a21b0e13d08d72f888913ac8bd8c0e97d2ad9d6 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Nov 6 18:39:10 2024 +0800 branch-3.0: [fix](restore) Add synchronized to avoid concurrent modification (#43258) Cherry-picked from #43172 Co-authored-by: walter <w41te...@gmail.com> --- .../java/org/apache/doris/backup/BackupJob.java | 4 +- .../java/org/apache/doris/backup/RestoreJob.java | 206 +++++++++++---------- .../test_backup_restore_atomic_with_alter.groovy | 2 +- 3 files changed, 107 insertions(+), 105 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index dbc7bb08fd4..ab7bfd8a03f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -100,7 +100,7 @@ public class BackupJob extends AbstractJob { private List<TableRef> tableRefs = Lists.newArrayList(); @SerializedName("st") - private BackupJobState state; + private volatile BackupJobState state; @SerializedName("sft") private long snapshotFinishedTime = -1; @@ -1025,7 +1025,7 @@ public class BackupJob extends AbstractJob { LOG.info("finished to cancel backup job. current state: {}. {}", curState.name(), this); } - public List<String> getInfo() { + public synchronized List<String> getInfo() { List<String> info = Lists.newArrayList(); info.add(String.valueOf(jobId)); info.add(label); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 1b992cf353a..cdf12c27790 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -62,7 +62,6 @@ import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.DbUtil; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; @@ -132,7 +131,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { public enum RestoreJobState { PENDING, // Job is newly created. Check and prepare meta in catalog. Create replica if necessary. // Waiting for replica creation finished synchronously, then sending snapshot tasks. - // then transfer to SNAPSHOTING + // then transfer to CREATING. + CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all replicas created. SNAPSHOTING, // Waiting for snapshot finished. Than transfer to DOWNLOAD. DOWNLOAD, // Send download tasks. DOWNLOADING, // Waiting for download finished. @@ -153,7 +153,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { private boolean allowLoad; @SerializedName("st") - private RestoreJobState state; + private volatile RestoreJobState state; @SerializedName("meta") private BackupMeta backupMeta; @@ -215,6 +215,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { @SerializedName("prop") private Map<String, String> properties = Maps.newHashMap(); + private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null; + public RestoreJob() { super(JobType.RESTORE); } @@ -270,10 +272,6 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { return state; } - public RestoreFileMapping getFileMapping() { - return fileMapping; - } - public int getMetaVersion() { return metaVersion; } @@ -421,7 +419,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { } @Override - public void run() { + public synchronized void run() { if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) { return; } @@ -447,8 +445,8 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { checkIfNeedCancel(); if (status.ok()) { - if (state != RestoreJobState.PENDING && label.equals( - DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) { + if (state != RestoreJobState.PENDING && state != RestoreJobState.CREATING + && label.equals(DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) { LOG.info("pause restore job by debug point: {}", this); return; } @@ -457,6 +455,9 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { case PENDING: checkAndPrepareMeta(); break; + case CREATING: + waitingAllReplicasCreated(); + break; case SNAPSHOTING: waitingAllSnapshotsFinished(); break; @@ -486,7 +487,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { * return true if some restored objs have been dropped. */ private void checkIfNeedCancel() { - if (state == RestoreJobState.PENDING) { + if (state == RestoreJobState.PENDING || state == RestoreJobState.CREATING) { return; } @@ -956,121 +957,122 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { } // Send create replica task to BE outside the db lock - boolean ok = false; int numBatchTasks = batchTaskPerTable.values() .stream() .mapToInt(AgentBatchTask::getTaskNum) .sum(); - MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(numBatchTasks); - if (batchTaskPerTable.size() > 0) { + createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks); + if (numBatchTasks > 0) { + LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. {}", numBatchTasks, this); for (AgentBatchTask batchTask : batchTaskPerTable.values()) { for (AgentTask task : batchTask.getAllTasks()) { - latch.addMark(task.getBackendId(), task.getTabletId()); - ((CreateReplicaTask) task).setLatch(latch); + createReplicaTasksLatch.addMark(task.getBackendId(), task.getTabletId()); + ((CreateReplicaTask) task).setLatch(createReplicaTasksLatch); AgentTaskQueue.addTask(task); } AgentTaskExecutor.submit(batchTask); } - - // estimate timeout - long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks) / 1000; - try { - LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}s", - numBatchTasks, timeout); - for (long elapsed = 0; elapsed <= timeout; elapsed++) { - if (latch.await(1, TimeUnit.SECONDS)) { - ok = true; - break; - } - if (state != RestoreJobState.PENDING) { // user cancelled - return; - } - if (elapsed % 5 == 0) { - LOG.info("waiting {} create replica tasks for restore to finish, total {} tasks, elapsed {}s", - latch.getCount(), numBatchTasks, elapsed); - } - } - } catch (InterruptedException e) { - LOG.warn("InterruptedException: ", e); - ok = false; - } - } else { - ok = true; } - if (ok && latch.getStatus().ok()) { - if (LOG.isDebugEnabled()) { - LOG.debug("finished to create all restored replicas. {}", this); - } - // add restored partitions. - // table should be in State RESTORE, so no other partitions can be - // added to or removed from this table during the restore process. - for (Pair<String, Partition> entry : restoredPartitions) { - OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); - localTbl.writeLock(); - try { - Partition restoredPart = entry.second; - OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first); - if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE - || localTbl.getPartitionInfo().getType() == PartitionType.LIST) { - - PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo(); - PartitionInfo localPartitionInfo = localTbl.getPartitionInfo(); - BackupPartitionInfo backupPartitionInfo - = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName()); - long remotePartId = backupPartitionInfo.id; - PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId); - DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId); - ReplicaAllocation restoreReplicaAlloc = replicaAlloc; - if (reserveReplica) { - restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId); - } - localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem, - remoteDataProperty, restoreReplicaAlloc, - remotePartitionInfo.getIsInMemory(remotePartId), - remotePartitionInfo.getIsMutable(remotePartId)); - } - localTbl.addPartition(restoredPart); - } finally { - localTbl.writeUnlock(); - } + // No log here, PENDING state restore job will redo this method + state = RestoreJobState.CREATING; + } + private void waitingAllReplicasCreated() { + boolean ok = true; + try { + if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) { + LOG.info("waiting {} create replica tasks for restore to finish. {}", + createReplicaTasksLatch.getCount(), this); + return; } + } catch (InterruptedException e) { + LOG.warn("InterruptedException, {}", this, e); + ok = false; + } - // add restored tables - for (Table tbl : restoredTbls) { - if (!db.writeLockIfExist()) { - status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName() - + " has been dropped"); - return; - } - tbl.writeLock(); - try { - if (!db.registerTable(tbl)) { - status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName() - + " already exist in db: " + db.getFullName()); - return; - } - } finally { - tbl.writeUnlock(); - db.writeUnlock(); - } - } - } else { + if (!(ok && createReplicaTasksLatch.getStatus().ok())) { // only show at most 10 results - List<String> subList = latch.getLeftMarks().stream().limit(10) + List<String> subList = createReplicaTasksLatch.getLeftMarks().stream().limit(10) .map(item -> "(backendId = " + item.getKey() + ", tabletId = " + item.getValue() + ")") .collect(Collectors.toList()); String idStr = Joiner.on(", ").join(subList); String reason = "TIMEDOUT"; - if (!latch.getStatus().ok()) { - reason = latch.getStatus().getErrorMsg(); + if (!createReplicaTasksLatch.getStatus().ok()) { + reason = createReplicaTasksLatch.getStatus().getErrorMsg(); } String errMsg = String.format( "Failed to create replicas for restore: %s, unfinished marks: %s", reason, idStr); status = new Status(ErrCode.COMMON_ERROR, errMsg); return; } + + if (LOG.isDebugEnabled()) { + LOG.debug("finished to create all restored replicas. {}", this); + } + allReplicasCreated(); + } + + private void allReplicasCreated() { + Database db = env.getInternalCatalog().getDbNullable(dbId); + if (db == null) { + status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does not exist"); + return; + } + + // add restored partitions. + // table should be in State RESTORE, so no other partitions can be + // added to or removed from this table during the restore process. + for (Pair<String, Partition> entry : restoredPartitions) { + OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first); + localTbl.writeLock(); + try { + Partition restoredPart = entry.second; + OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first); + if (localTbl.getPartitionInfo().getType() == PartitionType.RANGE + || localTbl.getPartitionInfo().getType() == PartitionType.LIST) { + + PartitionInfo remotePartitionInfo = remoteTbl.getPartitionInfo(); + PartitionInfo localPartitionInfo = localTbl.getPartitionInfo(); + BackupPartitionInfo backupPartitionInfo + = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName()); + long remotePartId = backupPartitionInfo.id; + PartitionItem remoteItem = remoteTbl.getPartitionInfo().getItem(remotePartId); + DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId); + ReplicaAllocation restoreReplicaAlloc = replicaAlloc; + if (reserveReplica) { + restoreReplicaAlloc = remotePartitionInfo.getReplicaAllocation(remotePartId); + } + localPartitionInfo.addPartition(restoredPart.getId(), false, remoteItem, + remoteDataProperty, restoreReplicaAlloc, + remotePartitionInfo.getIsInMemory(remotePartId), + remotePartitionInfo.getIsMutable(remotePartId)); + } + localTbl.addPartition(restoredPart); + } finally { + localTbl.writeUnlock(); + } + } + + // add restored tables + for (Table tbl : restoredTbls) { + if (!db.writeLockIfExist()) { + status = new Status(ErrCode.COMMON_ERROR, "Database " + db.getFullName() + " has been dropped"); + return; + } + tbl.writeLock(); + try { + if (!db.registerTable(tbl)) { + status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName() + + " already exist in db: " + db.getFullName()); + return; + } + } finally { + tbl.writeUnlock(); + db.writeUnlock(); + } + } + LOG.info("finished to prepare meta. {}", this); if (jobInfo.content == null || jobInfo.content == BackupContent.ALL) { @@ -2211,7 +2213,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { return getInfo(false); } - public List<String> getInfo(boolean isBrief) { + public synchronized List<String> getInfo(boolean isBrief) { List<String> info = Lists.newArrayList(); info.add(String.valueOf(jobId)); info.add(label); @@ -2270,7 +2272,7 @@ public class RestoreJob extends AbstractJob implements GsonPostProcessable { return Status.OK; } - public void cancelInternal(boolean isReplay) { + private void cancelInternal(boolean isReplay) { // We need to clean the residual due to current state if (!isReplay) { switch (state) { diff --git a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy index 46a3ca5b29d..b66ff391f15 100644 --- a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy +++ b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy @@ -112,7 +112,7 @@ suite("test_backup_restore_atomic_with_alter", "backup_restore") { boolean restore_paused = false for (int k = 0; k < 60; k++) { def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} WHERE Label = "${snapshotName}" """ - if (records.size() == 1 && records[0].State != 'PENDING') { + if (records.size() == 1 && (records[0].State != 'PENDING' && records[0].State != 'CREATING')) { restore_paused = true break } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org