This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 461c4dfaae [fix](tablet clone) fix single replica load failed during migration (#22077) 461c4dfaae is described below commit 461c4dfaae3a6cb4b79e07df9f3bc955a2dc9458 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Thu Jul 27 20:38:03 2023 +0800 [fix](tablet clone) fix single replica load failed during migration (#22077) --- .../java/org/apache/doris/catalog/Replica.java | 29 ++++++++--- .../main/java/org/apache/doris/catalog/Tablet.java | 21 +++----- .../org/apache/doris/clone/TabletSchedCtx.java | 10 +++- .../org/apache/doris/clone/TabletScheduler.java | 57 +++++++++++++++------- .../doris/transaction/DatabaseTransactionMgr.java | 29 +++++++++++ .../doris/transaction/GlobalTransactionMgr.java | 13 +++++ .../apache/doris/transaction/TransactionState.java | 4 -- .../java/org/apache/doris/clone/RebalanceTest.java | 4 +- 8 files changed, 120 insertions(+), 47 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index ef32ee979b..c444cdf1db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -129,9 +129,16 @@ public class Replica implements Writable { private long furtherRepairSetTime = -1; private static final long FURTHER_REPAIR_TIMEOUT_MS = 20 * 60 * 1000L; // 20min - // if this watermarkTxnId is set, which means before deleting a replica, - // we should ensure that all txns on this replicas are finished. - private long watermarkTxnId = -1; + + /* Decommission a backend B, steps are as follow: + * 1. wait peer backends catchup with B; + * 2. B change state to DECOMMISSION, set preWatermarkTxnId. B can load data now. + * 3. wait txn before preWatermarkTxnId finished, set postWatermarkTxnId. B can't load data now. + * 4. wait txn before postWatermarkTxnId finished, delete B. + * + */ + private long preWatermarkTxnId = -1; + private long postWatermarkTxnId = -1; public Replica() { } @@ -568,12 +575,20 @@ public class Replica implements Writable { } } - public void setWatermarkTxnId(long watermarkTxnId) { - this.watermarkTxnId = watermarkTxnId; + public void setPreWatermarkTxnId(long preWatermarkTxnId) { + this.preWatermarkTxnId = preWatermarkTxnId; + } + + public long getPreWatermarkTxnId() { + return preWatermarkTxnId; + } + + public void setPostWatermarkTxnId(long postWatermarkTxnId) { + this.postWatermarkTxnId = postWatermarkTxnId; } - public long getWatermarkTxnId() { - return watermarkTxnId; + public long getPostWatermarkTxnId() { + return postWatermarkTxnId; } public boolean isAlive() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index 0e93a4dd0f..af83e1ba8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -211,19 +211,7 @@ public class Tablet extends MetaObject implements Writable { } public List<Long> getNormalReplicaBackendIds() { - List<Long> beIds = Lists.newArrayList(); - SystemInfoService infoService = Env.getCurrentSystemInfo(); - for (Replica replica : replicas) { - if (replica.isBad()) { - continue; - } - - ReplicaState state = replica.getState(); - if (infoService.checkBackendAlive(replica.getBackendId()) && state.canLoad()) { - beIds.add(replica.getBackendId()); - } - } - return beIds; + return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet()); } // return map of (BE id -> path hash) of normal replicas @@ -232,12 +220,17 @@ public class Tablet extends MetaObject implements Writable { Multimap<Long, Long> map = HashMultimap.create(); SystemInfoService infoService = Env.getCurrentSystemInfo(); for (Replica replica : replicas) { + if (!infoService.checkBackendAlive(replica.getBackendId())) { + continue; + } + if (replica.isBad()) { continue; } ReplicaState state = replica.getState(); - if (infoService.checkBackendLoadAvailable(replica.getBackendId()) && state.canLoad()) { + if (state.canLoad() + || (state == ReplicaState.DECOMMISSION && replica.getPostWatermarkTxnId() < 0)) { map.put(replica.getBackendId(), replica.getPathHash()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index a3734dc9e0..876b8d10fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -748,7 +748,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // // If we do not reset this replica state to NORMAL, the tablet's health status will be in VERSION_INCOMPLETE // forever, because the replica in the DECOMMISSION state will not receive the load task. - chosenReplica.setWatermarkTxnId(-1); + chosenReplica.setPreWatermarkTxnId(-1); + chosenReplica.setPostWatermarkTxnId(-1); chosenReplica.setState(ReplicaState.NORMAL); setDecommissionTime(-1); LOG.info("choose replica {} on backend {} of tablet {} as dest replica for version incomplete," @@ -1142,6 +1143,10 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60 * 1000L; value += 5000L * (failedSchedCounter / 10); + if (schedFailedCode == SubCode.WAITING_DECOMMISSION) { + value += 5 * 1000L; + } + if (type == Type.BALANCE) { value += 30 * 60 * 1000L; } @@ -1200,7 +1205,8 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // any intermediate state it set during the scheduling process. if (replica.getState() == ReplicaState.DECOMMISSION) { replica.setState(ReplicaState.NORMAL); - replica.setWatermarkTxnId(-1); + replica.setPreWatermarkTxnId(-1); + replica.setPostWatermarkTxnId(-1); LOG.debug("reset replica {} on backend {} of tablet {} state from DECOMMISSION to NORMAL", replica.getId(), replica.getBackendId(), tabletId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index ab50bb095b..de168634e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -1065,7 +1065,7 @@ public class TabletScheduler extends MasterDaemon { if (matchupReplicaCount <= 1) { LOG.info("can not delete only one replica, tabletId = {} replicaId = {}", tabletCtx.getTabletId(), replica.getId()); - throw new SchedException(Status.FINISHED, "the only one latest replia can not be dropped, tabletId = " + throw new SchedException(Status.UNRECOVERABLE, "the only one latest replia can not be dropped, tabletId = " + tabletCtx.getTabletId() + ", replicaId = " + replica.getId()); } @@ -1080,25 +1080,46 @@ public class TabletScheduler extends MasterDaemon { * If all are finished, which means this replica is * safe to be deleted. */ - if (!force && !Config.enable_force_drop_redundant_replica && replica.getState().canLoad() - && replica.getWatermarkTxnId() == -1 && !FeConstants.runningUnitTest) { - long nextTxnId = Env.getCurrentGlobalTransactionMgr() - .getTransactionIDGenerator().getNextTransactionId(); - replica.setWatermarkTxnId(nextTxnId); - replica.setState(ReplicaState.DECOMMISSION); - // set priority to normal because it may wait for a long time. Remain it as VERY_HIGH may block other task. - tabletCtx.setPriority(Priority.NORMAL); - LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}", - replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason); - throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION, - "set watermark txn " + nextTxnId); - } else if (replica.getState() == ReplicaState.DECOMMISSION && replica.getWatermarkTxnId() != -1) { - long watermarkTxnId = replica.getWatermarkTxnId(); + if (!force && !Config.enable_force_drop_redundant_replica + && !FeConstants.runningUnitTest + && (replica.getState().canLoad() || replica.getState() == ReplicaState.DECOMMISSION)) { + if (replica.getState() != ReplicaState.DECOMMISSION) { + replica.setState(ReplicaState.DECOMMISSION); + // set priority to normal because it may wait for a long time. + // Remain it as VERY_HIGH may block other task. + tabletCtx.setPriority(Priority.NORMAL); + LOG.info("set replica {} on backend {} of tablet {} state to DECOMMISSION due to reason {}", + replica.getId(), replica.getBackendId(), tabletCtx.getTabletId(), reason); + } + + long preWatermarkTxnId = replica.getPreWatermarkTxnId(); + if (preWatermarkTxnId == -1) { + preWatermarkTxnId = Env.getCurrentGlobalTransactionMgr() + .getTransactionIDGenerator().getNextTransactionId(); + replica.setPreWatermarkTxnId(preWatermarkTxnId); + } + + long postWatermarkTxnId = replica.getPostWatermarkTxnId(); + if (postWatermarkTxnId == -1) { + try { + if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(preWatermarkTxnId, + tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) { + throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION, + "wait txn before pre watermark txn " + preWatermarkTxnId + " to be finished"); + } + } catch (AnalysisException e) { + throw new SchedException(Status.UNRECOVERABLE, e.getMessage()); + } + postWatermarkTxnId = Env.getCurrentGlobalTransactionMgr() + .getTransactionIDGenerator().getNextTransactionId(); + replica.setPostWatermarkTxnId(postWatermarkTxnId); + } + try { - if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId, - tabletCtx.getDbId(), Lists.newArrayList(tabletCtx.getTblId()))) { + if (!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(postWatermarkTxnId, + tabletCtx.getDbId(), tabletCtx.getTblId(), tabletCtx.getPartitionId())) { throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_DECOMMISSION, - "wait txn before " + watermarkTxnId + " to be finished"); + "wait txn before post watermark txn " + postWatermarkTxnId + " to be finished"); } } catch (AnalysisException e) { throw new SchedException(Status.UNRECOVERABLE, e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 9114186162..293b49368a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -1709,6 +1709,35 @@ public class DatabaseTransactionMgr { return true; } + public boolean isPreviousTransactionsFinished(long endTransactionId, long tableId, long partitionId) { + readLock(); + try { + for (Map.Entry<Long, TransactionState> entry : idToRunningTransactionState.entrySet()) { + TransactionState transactionState = entry.getValue(); + if (entry.getKey() > endTransactionId + || transactionState.getTransactionStatus().isFinalStatus() + || transactionState.getDbId() != dbId + || !transactionState.getTableIdList().contains(tableId)) { + continue; + } + + if (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED) { + TableCommitInfo tableCommitInfo = transactionState.getTableCommitInfo(tableId); + // txn not contains this partition + if (tableCommitInfo != null + && tableCommitInfo.getIdToPartitionCommitInfo().get(partitionId) == null) { + continue; + } + } + + return false; + } + } finally { + readUnlock(); + } + return true; + } + /** * check if there exists a intersection between the source tableId list and target tableId list * if one of them is null or empty, that means that we don't know related tables in tableList, diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java index 25b3f30259..d08b1a4981 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java @@ -425,6 +425,19 @@ public class GlobalTransactionMgr implements Writable { } } + /** + * Check whether a load job for a partition already exists before + * checking all `TransactionId` related with this load job have finished. + * finished + * + * @throws AnalysisException is database does not exist anymore + */ + public boolean isPreviousTransactionsFinished(long endTransactionId, long dbId, long tableId, + long partitionId) throws AnalysisException { + DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId); + return dbTransactionMgr.isPreviousTransactionsFinished(endTransactionId, tableId, partitionId); + } + /** * The txn cleaner will run at a fixed interval and try to delete expired and timeout txns: * expired: txn is in VISIBLE or ABORTED, and is expired. diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 10ce3542f6..b15329e6b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -523,10 +523,6 @@ public class TransactionState implements Writable { return this.idToTableCommitInfos.get(tableId); } - public void removeTable(long tableId) { - this.idToTableCommitInfos.remove(tableId); - } - public void setTxnCommitAttachment(TxnCommitAttachment txnCommitAttachment) { this.txnCommitAttachment = txnCommitAttachment; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index 05929ff8ce..c36ef531c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -307,8 +307,8 @@ public class RebalanceTest { Replica decommissionedReplica = replicas.stream() .filter(r -> r.getState() == Replica.ReplicaState.DECOMMISSION) .collect(MoreCollectors.onlyElement()); - // expected watermarkTxnId is 111 - Assert.assertEquals(111, decommissionedReplica.getWatermarkTxnId()); + Assert.assertEquals(111, decommissionedReplica.getPreWatermarkTxnId()); + Assert.assertEquals(112, decommissionedReplica.getPostWatermarkTxnId()); }); // Delete replica should change invertedIndex too --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org