This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c782a1a6079 [feature](cloud) Support decomission in cloud (#33201) c782a1a6079 is described below commit c782a1a607966ae0396741003759421a57e85892 Author: deardeng <565620...@qq.com> AuthorDate: Wed Apr 24 19:42:07 2024 +0800 [feature](cloud) Support decomission in cloud (#33201) --- .../main/java/org/apache/doris/common/Config.java | 10 +- .../java/org/apache/doris/alter/SystemHandler.java | 4 +- .../java/org/apache/doris/catalog/Database.java | 4 + .../doris/cloud/catalog/CloudClusterChecker.java | 7 +- .../org/apache/doris/cloud/catalog/CloudEnv.java | 7 + .../doris/cloud/catalog/CloudTabletRebalancer.java | 2 +- .../doris/cloud/catalog/CloudUpgradeMgr.java | 153 +++++++++++++++++++++ .../main/java/org/apache/doris/system/Backend.java | 2 +- 8 files changed, 181 insertions(+), 8 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5c003995d1e..4579a95fe61 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2721,7 +2721,7 @@ public class Config extends ConfigBase { public static int cloud_min_balance_tablet_num_per_run = 2; @ConfField(mutable = true, masterOnly = true) - public static boolean cloud_preheating_enabled = false; + public static boolean enable_cloud_warm_up_for_rebalance = true; @ConfField(mutable = true, masterOnly = false) public static String security_checker_class_name = ""; @@ -2763,6 +2763,14 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static long batch_insert_cluster_cache_hotspot_num = 50; + /** + * intervals between be status checks for CloudUpgradeMgr + */ + @ConfField(mutable = true) + public static int cloud_upgrade_mgr_interval_second = 15; + + @ConfField(mutable = true) + public static boolean enable_cloud_running_txn_check = true; //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 57e00f5ab14..394c827e163 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -78,7 +78,9 @@ public class SystemHandler extends AlterHandler { @Override protected void runAfterCatalogReady() { super.runAfterCatalogReady(); - runAlterJobV2(); + if (Config.isNotCloudMode()) { + runAlterJobV2(); + } } // check all decommissioned backends, if there is no available tablet on that backend, drop it. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 5e12fe1d426..76512a35ed0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -470,6 +470,10 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table> return idToTable; } + public List<Long> getTableIds() { + return new ArrayList<>(idToTable.keySet()); + } + // tables must get read or write table in fixed order to avoid potential dead lock public List<Table> getTablesOnIdOrder() { return idToTable.values().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java index 801f3166861..273ec422a51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java @@ -26,6 +26,7 @@ import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; import org.apache.doris.cloud.system.CloudSystemInfoService; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.metric.MetricRepo; @@ -172,14 +173,12 @@ public class CloudClusterChecker extends MasterDaemon { if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) { if (!be.isDecommissioned()) { LOG.info("decommissioned backend: {} status: {}", be, status); - // TODO(merge-cloud): add it when has CloudUpgradeMgr. - /* try { - } catch (AnalysisException e) { + ((CloudEnv) Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(be.getId()); + } catch (UserException e) { LOG.warn("failed to register water shed txn id, decommission be {}", be.getId(), e); } be.setDecommissioned(true); - */ } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 223a78e2b28..551ab504d1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -64,6 +64,7 @@ public class CloudEnv extends Env { private CloudInstanceStatusChecker cloudInstanceStatusChecker; private CloudClusterChecker cloudClusterCheck; + private CloudUpgradeMgr upgradeMgr; private CloudTabletRebalancer cloudTabletRebalancer; private CacheHotspotManager cacheHotspotMgr; @@ -76,12 +77,17 @@ public class CloudEnv extends Env { this.cloudInstanceStatusChecker = new CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo); this.cloudTabletRebalancer = new CloudTabletRebalancer((CloudSystemInfoService) systemInfo); this.cacheHotspotMgr = new CacheHotspotManager((CloudSystemInfoService) systemInfo); + this.upgradeMgr = new CloudUpgradeMgr((CloudSystemInfoService) systemInfo); } public CloudTabletRebalancer getCloudTabletRebalancer() { return this.cloudTabletRebalancer; } + public CloudUpgradeMgr getCloudUpgradeMgr() { + return this.upgradeMgr; + } + @Override protected void startMasterOnlyDaemonThreads() { LOG.info("start cloud Master only daemon threads"); @@ -91,6 +97,7 @@ public class CloudEnv extends Env { if (Config.enable_fetch_cluster_cache_hotspot) { cacheHotspotMgr.start(); } + upgradeMgr.start(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 309ab01859c..262fdaad2f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -799,7 +799,7 @@ public class CloudTabletRebalancer extends MasterDaemon { Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex); CloudReplica cloudReplica = (CloudReplica) pickedTablet.getReplicas().get(0); - if (Config.cloud_preheating_enabled) { + if (Config.enable_cloud_warm_up_for_rebalance) { if (isConflict(srcBe, destBe, cloudReplica, balanceType, futurePartitionToTablets, futureBeToTabletsInTable)) { continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java new file mode 100644 index 00000000000..1cb1aeb2568 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.system.Backend; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; + +public class CloudUpgradeMgr extends MasterDaemon { + + @Getter + @Setter + @RequiredArgsConstructor + private static class DbWithWaterTxn { + private final Long dbId; + private final Long txnId; + } + + @Getter + @Setter + @RequiredArgsConstructor + private static class DbWithWaterTxnInBe { + private final LinkedBlockingQueue<DbWithWaterTxn> dbWithWaterTxnQueue; + private final Long beId; + + } + + private static final Logger LOG = LogManager.getLogger(CloudUpgradeMgr.class); + /* (<(<dbid1, txn1>, <dbid2, txn2>, ... <dbid3, txn3>), be>, <...>, ...) */ + private final LinkedBlockingQueue<DbWithWaterTxnInBe> txnBePairList = new LinkedBlockingQueue<>(); + + private final CloudSystemInfoService cloudSystemInfoService; + + public CloudUpgradeMgr(CloudSystemInfoService cloudSystemInfoService) { + super("cloud upgrade manager", Config.cloud_upgrade_mgr_interval_second * 1000L); + this.cloudSystemInfoService = cloudSystemInfoService; + } + + @Override + protected void runAfterCatalogReady() { + LOG.info("before cloud upgrade mgr txnBePairList size {}", txnBePairList.size()); + for (DbWithWaterTxnInBe txnBe : txnBePairList) { + LinkedBlockingQueue<DbWithWaterTxn> txnList = txnBe.dbWithWaterTxnQueue; + long be = txnBe.beId; + + boolean isFinished = false; + boolean isBeInactive = true; + for (DbWithWaterTxn dbWithWaterTxn : txnList) { + List<Long> tableIdList = getAllTables(); + if (tableIdList.isEmpty()) { + /* no table in this cluster */ + break; + } + try { + if (Config.enable_cloud_running_txn_check) { + isFinished = ((CloudGlobalTransactionMgr) Env.getCurrentGlobalTransactionMgr()) + .isPreviousNonTimeoutTxnFinished(dbWithWaterTxn.txnId, + dbWithWaterTxn.dbId, tableIdList); + } else { + isFinished = true; + } + } catch (AnalysisException e) { + throw new RuntimeException(e); + } + if (!isFinished) { + isBeInactive = false; + LOG.info("BE {} is still active, waiting db {} txn {}", + be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId); + break; + } + } + if (isBeInactive) { + setBeStateInactive(be); + LOG.info("BE {} is inactive", be); + txnBePairList.remove(txnBe); + } + } + LOG.info("finish cloud upgrade mgr"); + } + + /* called after tablets migrating to new BE process complete */ + public void registerWaterShedTxnId(long be) throws UserException { + LinkedBlockingQueue<DbWithWaterTxn> txnIds = new LinkedBlockingQueue<>(); + List<Long> dbids = Env.getCurrentInternalCatalog().getDbIds(); + for (long dbid : dbids) { + txnIds.offer(new DbWithWaterTxn(dbid, Env.getCurrentGlobalTransactionMgr().getNextTransactionId())); + } + txnBePairList.offer(new DbWithWaterTxnInBe(txnIds, be)); + LOG.info("register watershedtxnid {} for BE {}", txnIds.stream() + .map(dbWithWaterTxn -> "(" + dbWithWaterTxn.dbId + ":" + dbWithWaterTxn.txnId + ")") + .collect((Collectors.joining(", ", "{", "}"))), be); + } + + private List<Long> getAllTables() { + List<Long> mergedList = new ArrayList<>(); + + List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds(); + for (Long dbId : dbIds) { + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + continue; + } + db.readLock(); + List<Long> tableList = db.getTableIds(); + db.readUnlock(); + mergedList.addAll(tableList); + } + return mergedList; + } + + public void setBeStateInactive(long beId) { + Backend be = cloudSystemInfoService.getBackend(beId); + if (be == null) { + LOG.warn("cannot get be {} to set inactive state", beId); + return; + } + be.setActive(false); /* now user can get BE inactive status from `show backends;` */ + Env.getCurrentEnv().getEditLog().logModifyBackend(be); + LOG.info("finished to modify backend {} ", be); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index 347fc7663df..4fe258cba21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -734,7 +734,7 @@ public class Backend implements Writable { public String toString() { return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + heartbeatPort + ", alive=" + isAlive.get() + ", lastStartTime=" + TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime - + ", tags: " + tagMap + "]"; + + ", isDecommissioned=" + isDecommissioned + ", tags: " + tagMap + "]"; } public String getHealthyStatus() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org