This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0a21b0e13d0 branch-3.0: [fix](restore) Add synchronized to avoid 
concurrent modification (#43258)
0a21b0e13d0 is described below

commit 0a21b0e13d08d72f888913ac8bd8c0e97d2ad9d6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 6 18:39:10 2024 +0800

    branch-3.0: [fix](restore) Add synchronized to avoid concurrent 
modification (#43258)
    
    Cherry-picked from #43172
    
    Co-authored-by: walter <w41te...@gmail.com>
---
 .../java/org/apache/doris/backup/BackupJob.java    |   4 +-
 .../java/org/apache/doris/backup/RestoreJob.java   | 206 +++++++++++----------
 .../test_backup_restore_atomic_with_alter.groovy   |   2 +-
 3 files changed, 107 insertions(+), 105 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index dbc7bb08fd4..ab7bfd8a03f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -100,7 +100,7 @@ public class BackupJob extends AbstractJob {
     private List<TableRef> tableRefs = Lists.newArrayList();
 
     @SerializedName("st")
-    private BackupJobState state;
+    private volatile BackupJobState state;
 
     @SerializedName("sft")
     private long snapshotFinishedTime = -1;
@@ -1025,7 +1025,7 @@ public class BackupJob extends AbstractJob {
         LOG.info("finished to cancel backup job. current state: {}. {}", 
curState.name(), this);
     }
 
-    public List<String> getInfo() {
+    public synchronized List<String> getInfo() {
         List<String> info = Lists.newArrayList();
         info.add(String.valueOf(jobId));
         info.add(label);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 1b992cf353a..cdf12c27790 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -62,7 +62,6 @@ import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.DbUtil;
 import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.DynamicPartitionUtil;
 import org.apache.doris.common.util.PropertyAnalyzer;
@@ -132,7 +131,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     public enum RestoreJobState {
         PENDING, // Job is newly created. Check and prepare meta in catalog. 
Create replica if necessary.
                  // Waiting for replica creation finished synchronously, then 
sending snapshot tasks.
-                 // then transfer to SNAPSHOTING
+                 // then transfer to CREATING.
+        CREATING, // Creating replica on BE. Transfer to SNAPSHOTING after all 
replicas created.
         SNAPSHOTING, // Waiting for snapshot finished. Than transfer to 
DOWNLOAD.
         DOWNLOAD, // Send download tasks.
         DOWNLOADING, // Waiting for download finished.
@@ -153,7 +153,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     private boolean allowLoad;
 
     @SerializedName("st")
-    private RestoreJobState state;
+    private volatile RestoreJobState state;
 
     @SerializedName("meta")
     private BackupMeta backupMeta;
@@ -215,6 +215,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     @SerializedName("prop")
     private Map<String, String> properties = Maps.newHashMap();
 
+    private MarkedCountDownLatch<Long, Long> createReplicaTasksLatch = null;
+
     public RestoreJob() {
         super(JobType.RESTORE);
     }
@@ -270,10 +272,6 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         return state;
     }
 
-    public RestoreFileMapping getFileMapping() {
-        return fileMapping;
-    }
-
     public int getMetaVersion() {
         return metaVersion;
     }
@@ -421,7 +419,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
     }
 
     @Override
-    public void run() {
+    public synchronized void run() {
         if (state == RestoreJobState.FINISHED || state == 
RestoreJobState.CANCELLED) {
             return;
         }
@@ -447,8 +445,8 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         checkIfNeedCancel();
 
         if (status.ok()) {
-            if (state != RestoreJobState.PENDING && label.equals(
-                    
DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB", ""))) 
{
+            if (state != RestoreJobState.PENDING && state != 
RestoreJobState.CREATING
+                    && 
label.equals(DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_NON_PENDING_RESTORE_JOB",
 ""))) {
                 LOG.info("pause restore job by debug point: {}", this);
                 return;
             }
@@ -457,6 +455,9 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
                 case PENDING:
                     checkAndPrepareMeta();
                     break;
+                case CREATING:
+                    waitingAllReplicasCreated();
+                    break;
                 case SNAPSHOTING:
                     waitingAllSnapshotsFinished();
                     break;
@@ -486,7 +487,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
      * return true if some restored objs have been dropped.
      */
     private void checkIfNeedCancel() {
-        if (state == RestoreJobState.PENDING) {
+        if (state == RestoreJobState.PENDING || state == 
RestoreJobState.CREATING) {
             return;
         }
 
@@ -956,121 +957,122 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         }
 
         // Send create replica task to BE outside the db lock
-        boolean ok = false;
         int numBatchTasks = batchTaskPerTable.values()
                 .stream()
                 .mapToInt(AgentBatchTask::getTaskNum)
                 .sum();
-        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(numBatchTasks);
-        if (batchTaskPerTable.size() > 0) {
+        createReplicaTasksLatch = new MarkedCountDownLatch<>(numBatchTasks);
+        if (numBatchTasks > 0) {
+            LOG.info("begin to send create replica tasks to BE for restore. 
total {} tasks. {}", numBatchTasks, this);
             for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
                 for (AgentTask task : batchTask.getAllTasks()) {
-                    latch.addMark(task.getBackendId(), task.getTabletId());
-                    ((CreateReplicaTask) task).setLatch(latch);
+                    createReplicaTasksLatch.addMark(task.getBackendId(), 
task.getTabletId());
+                    ((CreateReplicaTask) 
task).setLatch(createReplicaTasksLatch);
                     AgentTaskQueue.addTask(task);
                 }
                 AgentTaskExecutor.submit(batchTask);
             }
-
-            // estimate timeout
-            long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks) / 
1000;
-            try {
-                LOG.info("begin to send create replica tasks to BE for 
restore. total {} tasks. timeout: {}s",
-                        numBatchTasks, timeout);
-                for (long elapsed = 0; elapsed <= timeout; elapsed++) {
-                    if (latch.await(1, TimeUnit.SECONDS)) {
-                        ok = true;
-                        break;
-                    }
-                    if (state != RestoreJobState.PENDING) {  // user cancelled
-                        return;
-                    }
-                    if (elapsed % 5 == 0) {
-                        LOG.info("waiting {} create replica tasks for restore 
to finish, total {} tasks, elapsed {}s",
-                                latch.getCount(), numBatchTasks, elapsed);
-                    }
-                }
-            } catch (InterruptedException e) {
-                LOG.warn("InterruptedException: ", e);
-                ok = false;
-            }
-        } else {
-            ok = true;
         }
 
-        if (ok && latch.getStatus().ok()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("finished to create all restored replicas. {}", 
this);
-            }
-            // add restored partitions.
-            // table should be in State RESTORE, so no other partitions can be
-            // added to or removed from this table during the restore process.
-            for (Pair<String, Partition> entry : restoredPartitions) {
-                OlapTable localTbl = (OlapTable) 
db.getTableNullable(entry.first);
-                localTbl.writeLock();
-                try {
-                    Partition restoredPart = entry.second;
-                    OlapTable remoteTbl = (OlapTable) 
backupMeta.getTable(entry.first);
-                    if (localTbl.getPartitionInfo().getType() == 
PartitionType.RANGE
-                            || localTbl.getPartitionInfo().getType() == 
PartitionType.LIST) {
-
-                        PartitionInfo remotePartitionInfo = 
remoteTbl.getPartitionInfo();
-                        PartitionInfo localPartitionInfo = 
localTbl.getPartitionInfo();
-                        BackupPartitionInfo backupPartitionInfo
-                                = 
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
-                        long remotePartId = backupPartitionInfo.id;
-                        PartitionItem remoteItem = 
remoteTbl.getPartitionInfo().getItem(remotePartId);
-                        DataProperty remoteDataProperty = 
remotePartitionInfo.getDataProperty(remotePartId);
-                        ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
-                        if (reserveReplica) {
-                            restoreReplicaAlloc = 
remotePartitionInfo.getReplicaAllocation(remotePartId);
-                        }
-                        localPartitionInfo.addPartition(restoredPart.getId(), 
false, remoteItem,
-                                remoteDataProperty, restoreReplicaAlloc,
-                                
remotePartitionInfo.getIsInMemory(remotePartId),
-                                
remotePartitionInfo.getIsMutable(remotePartId));
-                    }
-                    localTbl.addPartition(restoredPart);
-                } finally {
-                    localTbl.writeUnlock();
-                }
+        // No log here, PENDING state restore job will redo this method
+        state = RestoreJobState.CREATING;
+    }
 
+    private void waitingAllReplicasCreated() {
+        boolean ok = true;
+        try {
+            if (!createReplicaTasksLatch.await(0, TimeUnit.SECONDS)) {
+                LOG.info("waiting {} create replica tasks for restore to 
finish. {}",
+                        createReplicaTasksLatch.getCount(), this);
+                return;
             }
+        } catch (InterruptedException e) {
+            LOG.warn("InterruptedException, {}", this, e);
+            ok = false;
+        }
 
-            // add restored tables
-            for (Table tbl : restoredTbls) {
-                if (!db.writeLockIfExist()) {
-                    status = new Status(ErrCode.COMMON_ERROR, "Database " + 
db.getFullName()
-                            + " has been dropped");
-                    return;
-                }
-                tbl.writeLock();
-                try {
-                    if (!db.registerTable(tbl)) {
-                        status = new Status(ErrCode.COMMON_ERROR, "Table " + 
tbl.getName()
-                                + " already exist in db: " + db.getFullName());
-                        return;
-                    }
-                } finally {
-                    tbl.writeUnlock();
-                    db.writeUnlock();
-                }
-            }
-        } else {
+        if (!(ok && createReplicaTasksLatch.getStatus().ok())) {
             // only show at most 10 results
-            List<String> subList = latch.getLeftMarks().stream().limit(10)
+            List<String> subList = 
createReplicaTasksLatch.getLeftMarks().stream().limit(10)
                     .map(item -> "(backendId = " + item.getKey() + ", tabletId 
= "  + item.getValue() + ")")
                     .collect(Collectors.toList());
             String idStr = Joiner.on(", ").join(subList);
             String reason = "TIMEDOUT";
-            if (!latch.getStatus().ok()) {
-                reason = latch.getStatus().getErrorMsg();
+            if (!createReplicaTasksLatch.getStatus().ok()) {
+                reason = createReplicaTasksLatch.getStatus().getErrorMsg();
             }
             String errMsg = String.format(
                     "Failed to create replicas for restore: %s, unfinished 
marks: %s", reason, idStr);
             status = new Status(ErrCode.COMMON_ERROR, errMsg);
             return;
         }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("finished to create all restored replicas. {}", this);
+        }
+        allReplicasCreated();
+    }
+
+    private void allReplicasCreated() {
+        Database db = env.getInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            status = new Status(ErrCode.NOT_FOUND, "database " + dbId + " does 
not exist");
+            return;
+        }
+
+        // add restored partitions.
+        // table should be in State RESTORE, so no other partitions can be
+        // added to or removed from this table during the restore process.
+        for (Pair<String, Partition> entry : restoredPartitions) {
+            OlapTable localTbl = (OlapTable) db.getTableNullable(entry.first);
+            localTbl.writeLock();
+            try {
+                Partition restoredPart = entry.second;
+                OlapTable remoteTbl = (OlapTable) 
backupMeta.getTable(entry.first);
+                if (localTbl.getPartitionInfo().getType() == 
PartitionType.RANGE
+                        || localTbl.getPartitionInfo().getType() == 
PartitionType.LIST) {
+
+                    PartitionInfo remotePartitionInfo = 
remoteTbl.getPartitionInfo();
+                    PartitionInfo localPartitionInfo = 
localTbl.getPartitionInfo();
+                    BackupPartitionInfo backupPartitionInfo
+                            = 
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
+                    long remotePartId = backupPartitionInfo.id;
+                    PartitionItem remoteItem = 
remoteTbl.getPartitionInfo().getItem(remotePartId);
+                    DataProperty remoteDataProperty = 
remotePartitionInfo.getDataProperty(remotePartId);
+                    ReplicaAllocation restoreReplicaAlloc = replicaAlloc;
+                    if (reserveReplica) {
+                        restoreReplicaAlloc = 
remotePartitionInfo.getReplicaAllocation(remotePartId);
+                    }
+                    localPartitionInfo.addPartition(restoredPart.getId(), 
false, remoteItem,
+                            remoteDataProperty, restoreReplicaAlloc,
+                            remotePartitionInfo.getIsInMemory(remotePartId),
+                            remotePartitionInfo.getIsMutable(remotePartId));
+                }
+                localTbl.addPartition(restoredPart);
+            } finally {
+                localTbl.writeUnlock();
+            }
+        }
+
+        // add restored tables
+        for (Table tbl : restoredTbls) {
+            if (!db.writeLockIfExist()) {
+                status = new Status(ErrCode.COMMON_ERROR, "Database " + 
db.getFullName() + " has been dropped");
+                return;
+            }
+            tbl.writeLock();
+            try {
+                if (!db.registerTable(tbl)) {
+                    status = new Status(ErrCode.COMMON_ERROR, "Table " + 
tbl.getName()
+                            + " already exist in db: " + db.getFullName());
+                    return;
+                }
+            } finally {
+                tbl.writeUnlock();
+                db.writeUnlock();
+            }
+        }
+
         LOG.info("finished to prepare meta. {}", this);
 
         if (jobInfo.content == null || jobInfo.content == BackupContent.ALL) {
@@ -2211,7 +2213,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         return getInfo(false);
     }
 
-    public List<String> getInfo(boolean isBrief) {
+    public synchronized List<String> getInfo(boolean isBrief) {
         List<String> info = Lists.newArrayList();
         info.add(String.valueOf(jobId));
         info.add(label);
@@ -2270,7 +2272,7 @@ public class RestoreJob extends AbstractJob implements 
GsonPostProcessable {
         return Status.OK;
     }
 
-    public void cancelInternal(boolean isReplay) {
+    private void cancelInternal(boolean isReplay) {
         // We need to clean the residual due to current state
         if (!isReplay) {
             switch (state) {
diff --git 
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
index 46a3ca5b29d..b66ff391f15 100644
--- 
a/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
+++ 
b/regression-test/suites/backup_restore/test_backup_restore_atomic_with_alter.groovy
@@ -112,7 +112,7 @@ suite("test_backup_restore_atomic_with_alter", 
"backup_restore") {
     boolean restore_paused = false
     for (int k = 0; k < 60; k++) {
         def records = sql_return_maparray """ SHOW RESTORE FROM ${dbName} 
WHERE Label = "${snapshotName}" """
-        if (records.size() == 1 && records[0].State != 'PENDING') {
+        if (records.size() == 1 && (records[0].State != 'PENDING' && 
records[0].State != 'CREATING')) {
             restore_paused = true
             break
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to