This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7754791146e [improvement](disk balance) Prevent duplicate disk balance
tasks afte… (#25990)
7754791146e is described below
commit 7754791146e36efd53b5fe271189615c1206f9e2
Author: deardeng <[email protected]>
AuthorDate: Fri Nov 10 10:14:42 2023 +0800
[improvement](disk balance) Prevent duplicate disk balance tasks afte…
(#25990)
---
be/src/agent/task_worker_pool.cpp | 10 ++++++---
.../org/apache/doris/clone/DiskRebalancer.java | 26 ++++++++++++++++++++++
.../org/apache/doris/clone/TabletScheduler.java | 13 +++++++++++
3 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 6717af3ce4b..698a71aec16 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1949,6 +1949,10 @@ void
StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
EngineStorageMigrationTask engine_task(tablet, dest_store);
status = StorageEngine::instance()->execute_task(&engine_task);
}
+ // fe should ignore this err
+ if (status.is<FILE_ALREADY_EXIST>()) {
+ status = Status::OK();
+ }
if (!status.ok()) {
LOG_WARNING("failed to migrate storage medium")
.tag("signature", agent_task_req.signature)
@@ -2011,8 +2015,9 @@ Status
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
*dest_store = stores[0];
}
if (tablet->data_dir()->path() == (*dest_store)->path()) {
- return Status::InternalError("tablet is already on specified path {}",
- tablet->data_dir()->path());
+ LOG_WARNING("tablet is already on specified path").tag("path",
tablet->data_dir()->path());
+ return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on
specified path: {}",
+
tablet->data_dir()->path());
}
// check local disk capacity
@@ -2021,7 +2026,6 @@ Status
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
return Status::InternalError("reach the capacity limit of path {},
tablet_size={}",
(*dest_store)->path(), tablet_size);
}
-
return Status::OK();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 5edca914441..0a7ce1b8f54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -120,6 +120,7 @@ public class DiskRebalancer extends Rebalancer {
@Override
protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium) {
+ LOG.info("dx test enter selectAlternativeTabletsForCluster");
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
// get classification of backends
@@ -185,7 +186,17 @@ public class DiskRebalancer extends Rebalancer {
Set<Long> pathHigh = Sets.newHashSet();
// we only select tablets from available high load path
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
+ LOG.info("dx test select before low={} mid={} high={} medium={}",
pathLow, pathMid, pathHigh, medium);
// check if BE has low and high paths for balance after
reclassification
+ pathHigh.add(-2606726262674133323L);
+ pathHigh.add(384536254535458899L);
+ pathHigh.add(528047762753362128L);
+ pathLow.add(1252949013258184268L);
+ pathMid.remove(384536254535458899L);
+ pathMid.remove(528047762753362128L);
+ pathMid.remove(-2606726262674133323L);
+ pathMid.remove(1252949013258184268L);
+ LOG.info("dx test select after low={} mid={} high={} medium={}",
pathLow, pathMid, pathHigh, medium);
if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
continue;
}
@@ -273,6 +284,7 @@ public class DiskRebalancer extends Rebalancer {
medium, alternativeTablets.size(),
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
}
+ LOG.info("dx test out selectAlternativeTabletsForCluster,
alternativeTablets={}", alternativeTablets);
return alternativeTablets;
}
@@ -284,6 +296,7 @@ public class DiskRebalancer extends Rebalancer {
*/
@Override
public void completeSchedCtx(TabletSchedCtx tabletCtx) throws
SchedException {
+ LOG.info("dx test enter completeSchedCtx");
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
if (clusterStat == null) {
throw new SchedException(Status.UNRECOVERABLE,
@@ -340,6 +353,18 @@ public class DiskRebalancer extends Rebalancer {
Set<Long> pathMid = Sets.newHashSet();
Set<Long> pathHigh = Sets.newHashSet();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh,
tabletCtx.getStorageMedium());
+ LOG.info("dx test complete before low={} mid={} high={} medium={}",
+ pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
+ pathHigh.add(-2606726262674133323L);
+ pathHigh.add(384536254535458899L);
+ pathHigh.add(528047762753362128L);
+ pathLow.add(1252949013258184268L);
+ pathMid.remove(384536254535458899L);
+ pathMid.remove(528047762753362128L);
+ pathMid.remove(-2606726262674133323L);
+ pathMid.remove(1252949013258184268L);
+ LOG.info("dx test complete after low={} mid={} high={} medium={}",
+ pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
if (pathHigh.contains(replica.getPathHash())) {
pathLow.addAll(pathMid);
} else if (!pathMid.contains(replica.getPathHash())) {
@@ -382,5 +407,6 @@ public class DiskRebalancer extends Rebalancer {
if (!setDest) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find low
load path");
}
+ LOG.info("dx test out completeSchedCtx");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1d4592501f2..beee677d2cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -79,6 +79,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
@@ -615,6 +616,17 @@ public class TabletScheduler extends MasterDaemon {
}
}
+ public void updateDestPathHash(TabletSchedCtx tabletCtx) {
+ // find dest replica
+ Optional<Replica> destReplica = tabletCtx.getReplicas()
+ .stream().filter(replica -> replica.getBackendId() ==
tabletCtx.getDestBackendId()).findAny();
+ if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
+ LOG.info("dx test success report old {} : new {}",
+ destReplica.get().getPathHash(),
tabletCtx.getDestPathHash());
+ destReplica.get().setPathHash(tabletCtx.getDestPathHash());
+ }
+ }
+
public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
PathSlot pathSlot = backendsWorkingSlots.get(beId);
if (pathSlot == null) {
@@ -1642,6 +1654,7 @@ public class TabletScheduler extends MasterDaemon {
// if we have a success task, then stat must be refreshed before
schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(),
tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(),
tabletCtx.getDestPathHash());
+ updateDestPathHash(tabletCtx);
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED,
Status.FINISHED, "finished");
} else {
finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
Status.UNRECOVERABLE,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]