This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 461c4dfaae [fix](tablet clone) fix single replica load failed during 
migration (#22077)
461c4dfaae is described below

commit 461c4dfaae3a6cb4b79e07df9f3bc955a2dc9458
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Thu Jul 27 20:38:03 2023 +0800

    [fix](tablet clone) fix single replica load failed during migration (#22077)
---
 .../java/org/apache/doris/catalog/Replica.java     | 29 ++++++++---
 .../main/java/org/apache/doris/catalog/Tablet.java | 21 +++-----
 .../org/apache/doris/clone/TabletSchedCtx.java     | 10 +++-
 .../org/apache/doris/clone/TabletScheduler.java    | 57 +++++++++++++++-------
 .../doris/transaction/DatabaseTransactionMgr.java  | 29 +++++++++++
 .../doris/transaction/GlobalTransactionMgr.java    | 13 +++++
 .../apache/doris/transaction/TransactionState.java |  4 --
 .../java/org/apache/doris/clone/RebalanceTest.java |  4 +-
 8 files changed, 120 insertions(+), 47 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index ef32ee979b..c444cdf1db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -129,9 +129,16 @@ public class Replica implements Writable {
     private long furtherRepairSetTime = -1;
     private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 
20min
 
-    // if this watermarkTxnId is set, which means before deleting a replica,
-    // we should ensure that all txns on this replicas are finished.
-    private long watermarkTxnId = -1;
+
+    /* Decommission a backend B, steps are as follow:
+     * 1. wait peer backends catchup with B;
+     * 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load 
data now.
+     * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. 
B can't load data now.
+     * 4. wait txn before postWatermarkTxnId finished, delete B.
+     *
+     */
+    private long preWatermarkTxnId = -1;
+    private long postWatermarkTxnId = -1;
 
     public Replica() {
     }
@@ -568,12 +575,20 @@ public class Replica implements Writable {
         }
     }
 
-    public void setWatermarkTxnId(long watermarkTxnId) {
-        this.watermarkTxnId = watermarkTxnId;
+    public void setPreWatermarkTxnId(long preWatermarkTxnId) {
+        this.preWatermarkTxnId = preWatermarkTxnId;
+    }
+
+    public long getPreWatermarkTxnId() {
+        return preWatermarkTxnId;
+    }
+
+    public void setPostWatermarkTxnId(long postWatermarkTxnId) {
+        this.postWatermarkTxnId = postWatermarkTxnId;
     }
 
-    public long getWatermarkTxnId() {
-        return watermarkTxnId;
+    public long getPostWatermarkTxnId() {
+        return postWatermarkTxnId;
     }
 
     public boolean isAlive() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 0e93a4dd0f..af83e1ba8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -211,19 +211,7 @@ public class Tablet extends MetaObject implements Writable 
{
     }
 
     public List<Long> getNormalReplicaBackendIds() {
-        List<Long> beIds = Lists.newArrayList();
-        SystemInfoService infoService = Env.getCurrentSystemInfo();
-        for (Replica replica : replicas) {
-            if (replica.isBad()) {
-                continue;
-            }
-
-            ReplicaState state = replica.getState();
-            if (infoService.checkBackendAlive(replica.getBackendId()) && 
state.canLoad()) {
-                beIds.add(replica.getBackendId());
-            }
-        }
-        return beIds;
+        return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
     }
 
     // return map of (BE id -> path hash) of normal replicas
@@ -232,12 +220,17 @@ public class Tablet extends MetaObject implements 
Writable {
         Multimap<Long, Long> map = HashMultimap.create();
         SystemInfoService infoService = Env.getCurrentSystemInfo();
         for (Replica replica : replicas) {
+            if (!infoService.checkBackendAlive(replica.getBackendId())) {
+                continue;
+            }
+
             if (replica.isBad()) {
                 continue;
             }
 
             ReplicaState state = replica.getState();
-            if (infoService.checkBackendLoadAvailable(replica.getBackendId()) 
&& state.canLoad()) {
+            if (state.canLoad()
+                    || (state == ReplicaState.DECOMMISSION && 
replica.getPostWatermarkTxnId() < 0)) {
                 map.put(replica.getBackendId(), replica.getPathHash());
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index a3734dc9e0..876b8d10fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -748,7 +748,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
             //
             // If we do not reset this replica state to NORMAL, the tablet's 
health status will be in VERSION_INCOMPLETE
             // forever, because the replica in the DECOMMISSION state will not 
receive the load task.
-            chosenReplica.setWatermarkTxnId(-1);
+            chosenReplica.setPreWatermarkTxnId(-1);
+            chosenReplica.setPostWatermarkTxnId(-1);
             chosenReplica.setState(ReplicaState.NORMAL);
             setDecommissionTime(-1);
             LOG.info("choose replica {} on backend {} of tablet {} as dest 
replica for version incomplete,"
@@ -1142,6 +1143,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
         value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60  
* 1000L;
         value += 5000L * (failedSchedCounter / 10);
 
+        if (schedFailedCode == SubCode.WAITING_DECOMMISSION) {
+            value += 5 * 1000L;
+        }
+
         if (type == Type.BALANCE) {
             value += 30 * 60 * 1000L;
         }
@@ -1200,7 +1205,8 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 // any intermediate state it set during the scheduling process.
                 if (replica.getState() == ReplicaState.DECOMMISSION) {
                     replica.setState(ReplicaState.NORMAL);
-                    replica.setWatermarkTxnId(-1);
+                    replica.setPreWatermarkTxnId(-1);
+                    replica.setPostWatermarkTxnId(-1);
                     LOG.debug("reset replica {} on backend {} of tablet {} 
state from DECOMMISSION to NORMAL",
                             replica.getId(), replica.getBackendId(), tabletId);
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index ab50bb095b..de168634e3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1065,7 +1065,7 @@ public class TabletScheduler extends MasterDaemon {
         if (matchupReplicaCount <= 1) {
             LOG.info("can not delete only one replica, tabletId = {} replicaId 
= {}", tabletCtx.getTabletId(),
                      replica.getId());
-            throw new SchedException(Status.FINISHED, "the only one latest 
replia can not be dropped, tabletId = "
+            throw new SchedException(Status.UNRECOVERABLE, "the only one 
latest replia can not be dropped, tabletId = "
                                         + tabletCtx.getTabletId() + ", 
replicaId = " + replica.getId());
         }
 
@@ -1080,25 +1080,46 @@ public class TabletScheduler extends MasterDaemon {
          *      If all are finished, which means this replica is
          *      safe to be deleted.
          */
-        if (!force && !Config.enable_force_drop_redundant_replica && 
replica.getState().canLoad()
-                && replica.getWatermarkTxnId() == -1 && 
!FeConstants.runningUnitTest) {
-            long nextTxnId = Env.getCurrentGlobalTransactionMgr()
-                    .getTransactionIDGenerator().getNextTransactionId();
-            replica.setWatermarkTxnId(nextTxnId);
-            replica.setState(ReplicaState.DECOMMISSION);
-            // set priority to normal because it may wait for a long time. 
Remain it as VERY_HIGH may block other task.
-            tabletCtx.setPriority(Priority.NORMAL);
-            LOG.info("set replica {} on backend {} of tablet {} state to 
DECOMMISSION due to reason {}",
-                    replica.getId(), replica.getBackendId(), 
tabletCtx.getTabletId(), reason);
-            throw new SchedException(Status.SCHEDULE_FAILED, 
SubCode.WAITING_DECOMMISSION,
-                    "set watermark txn " + nextTxnId);
-        } else if (replica.getState() == ReplicaState.DECOMMISSION && 
replica.getWatermarkTxnId() != -1) {
-            long watermarkTxnId = replica.getWatermarkTxnId();
+        if (!force && !Config.enable_force_drop_redundant_replica
+                && !FeConstants.runningUnitTest
+                && (replica.getState().canLoad() || replica.getState() == 
ReplicaState.DECOMMISSION)) {
+            if (replica.getState() != ReplicaState.DECOMMISSION) {
+                replica.setState(ReplicaState.DECOMMISSION);
+                // set priority to normal because it may wait for a long time.
+                // Remain it as VERY_HIGH may block other task.
+                tabletCtx.setPriority(Priority.NORMAL);
+                LOG.info("set replica {} on backend {} of tablet {} state to 
DECOMMISSION due to reason {}",
+                        replica.getId(), replica.getBackendId(), 
tabletCtx.getTabletId(), reason);
+            }
+
+            long preWatermarkTxnId = replica.getPreWatermarkTxnId();
+            if (preWatermarkTxnId == -1) {
+                preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
+                        .getTransactionIDGenerator().getNextTransactionId();
+                replica.setPreWatermarkTxnId(preWatermarkTxnId);
+            }
+
+            long postWatermarkTxnId = replica.getPostWatermarkTxnId();
+            if (postWatermarkTxnId == -1) {
+                try {
+                    if 
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId,
+                            tabletCtx.getDbId(), tabletCtx.getTblId(), 
tabletCtx.getPartitionId())) {
+                        throw new SchedException(Status.SCHEDULE_FAILED, 
SubCode.WAITING_DECOMMISSION,
+                                "wait txn before pre watermark txn " + 
preWatermarkTxnId + " to be finished");
+                    }
+                } catch (AnalysisException e) {
+                    throw new SchedException(Status.UNRECOVERABLE, 
e.getMessage());
+                }
+                postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr()
+                        .getTransactionIDGenerator().getNextTransactionId();
+                replica.setPostWatermarkTxnId(postWatermarkTxnId);
+            }
+
             try {
-                if 
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
-                        tabletCtx.getDbId(), 
Lists.newArrayList(tabletCtx.getTblId()))) {
+                if 
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId,
+                        tabletCtx.getDbId(), tabletCtx.getTblId(), 
tabletCtx.getPartitionId())) {
                     throw new SchedException(Status.SCHEDULE_FAILED, 
SubCode.WAITING_DECOMMISSION,
-                            "wait txn before " + watermarkTxnId + " to be 
finished");
+                            "wait txn before post watermark txn  " + 
postWatermarkTxnId + " to be finished");
                 }
             } catch (AnalysisException e) {
                 throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9114186162..293b49368a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -1709,6 +1709,35 @@ public class DatabaseTransactionMgr {
         return true;
     }
 
+    public boolean isPreviousTransactionsFinished(long endTransactionId, long 
tableId, long partitionId) {
+        readLock();
+        try {
+            for (Map.Entry<Long, TransactionState> entry : 
idToRunningTransactionState.entrySet()) {
+                TransactionState transactionState = entry.getValue();
+                if (entry.getKey() > endTransactionId
+                        || 
transactionState.getTransactionStatus().isFinalStatus()
+                        || transactionState.getDbId() != dbId
+                        || 
!transactionState.getTableIdList().contains(tableId)) {
+                    continue;
+                }
+
+                if (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED) {
+                    TableCommitInfo tableCommitInfo = 
transactionState.getTableCommitInfo(tableId);
+                    // txn not contains this partition
+                    if (tableCommitInfo != null
+                            && 
tableCommitInfo.getIdToPartitionCommitInfo().get(partitionId) == null) {
+                        continue;
+                    }
+                }
+
+                return false;
+            }
+        } finally {
+            readUnlock();
+        }
+        return true;
+    }
+
     /**
      * check if there exists a intersection between the source tableId list 
and target tableId list
      * if one of them is null or empty, that means that we don't know related 
tables in tableList,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 25b3f30259..d08b1a4981 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -425,6 +425,19 @@ public class GlobalTransactionMgr implements Writable {
         }
     }
 
+    /**
+     * Check whether a load job for a partition already exists before
+     * checking all `TransactionId` related with this load job have finished.
+     * finished
+     *
+     * @throws AnalysisException is database does not exist anymore
+     */
+    public boolean isPreviousTransactionsFinished(long endTransactionId, long 
dbId, long tableId,
+            long partitionId) throws AnalysisException {
+        DatabaseTransactionMgr dbTransactionMgr = 
getDatabaseTransactionMgr(dbId);
+        return 
dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableId, 
partitionId);
+    }
+
     /**
      * The txn cleaner will run at a fixed interval and try to delete expired 
and timeout txns:
      * expired: txn is in VISIBLE or ABORTED, and is expired.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 10ce3542f6..b15329e6b0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -523,10 +523,6 @@ public class TransactionState implements Writable {
         return this.idToTableCommitInfos.get(tableId);
     }
 
-    public void removeTable(long tableId) {
-        this.idToTableCommitInfos.remove(tableId);
-    }
-
     public void setTxnCommitAttachment(TxnCommitAttachment 
txnCommitAttachment) {
         this.txnCommitAttachment = txnCommitAttachment;
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index 05929ff8ce..c36ef531c2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -307,8 +307,8 @@ public class RebalanceTest {
             Replica decommissionedReplica = replicas.stream()
                     .filter(r -> r.getState() == 
Replica.ReplicaState.DECOMMISSION)
                     .collect(MoreCollectors.onlyElement());
-            // expected watermarkTxnId is 111
-            Assert.assertEquals(111, 
decommissionedReplica.getWatermarkTxnId());
+            Assert.assertEquals(111, 
decommissionedReplica.getPreWatermarkTxnId());
+            Assert.assertEquals(112, 
decommissionedReplica.getPostWatermarkTxnId());
         });
 
         // Delete replica should change invertedIndex too


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to