This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c782a1a6079 [feature](cloud) Support decomission in cloud (#33201)
c782a1a6079 is described below

commit c782a1a607966ae0396741003759421a57e85892
Author: deardeng <565620...@qq.com>
AuthorDate: Wed Apr 24 19:42:07 2024 +0800

    [feature](cloud) Support decomission in cloud (#33201)
---
 .../main/java/org/apache/doris/common/Config.java  |  10 +-
 .../java/org/apache/doris/alter/SystemHandler.java |   4 +-
 .../java/org/apache/doris/catalog/Database.java    |   4 +
 .../doris/cloud/catalog/CloudClusterChecker.java   |   7 +-
 .../org/apache/doris/cloud/catalog/CloudEnv.java   |   7 +
 .../doris/cloud/catalog/CloudTabletRebalancer.java |   2 +-
 .../doris/cloud/catalog/CloudUpgradeMgr.java       | 153 +++++++++++++++++++++
 .../main/java/org/apache/doris/system/Backend.java |   2 +-
 8 files changed, 181 insertions(+), 8 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5c003995d1e..4579a95fe61 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2721,7 +2721,7 @@ public class Config extends ConfigBase {
     public static int cloud_min_balance_tablet_num_per_run = 2;
 
     @ConfField(mutable = true, masterOnly = true)
-    public static boolean cloud_preheating_enabled = false;
+    public static boolean enable_cloud_warm_up_for_rebalance = true;
 
     @ConfField(mutable = true, masterOnly = false)
     public static String security_checker_class_name = "";
@@ -2763,6 +2763,14 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static long batch_insert_cluster_cache_hotspot_num = 50;
 
+    /**
+     * intervals between be status checks for CloudUpgradeMgr
+     */
+    @ConfField(mutable = true)
+    public static int cloud_upgrade_mgr_interval_second = 15;
+
+    @ConfField(mutable = true)
+    public static boolean enable_cloud_running_txn_check = true;
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
index 57e00f5ab14..394c827e163 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java
@@ -78,7 +78,9 @@ public class SystemHandler extends AlterHandler {
     @Override
     protected void runAfterCatalogReady() {
         super.runAfterCatalogReady();
-        runAlterJobV2();
+        if (Config.isNotCloudMode()) {
+            runAlterJobV2();
+        }
     }
 
     // check all decommissioned backends, if there is no available tablet on 
that backend, drop it.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 5e12fe1d426..76512a35ed0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -470,6 +470,10 @@ public class Database extends MetaObject implements 
Writable, DatabaseIf<Table>
         return idToTable;
     }
 
+    public List<Long> getTableIds() {
+        return new ArrayList<>(idToTable.keySet());
+    }
+
     // tables must get read or write table in fixed order to avoid potential 
dead lock
     public List<Table> getTablesOnIdOrder() {
         return idToTable.values().stream()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 801f3166861..273ec422a51 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -26,6 +26,7 @@ import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.metric.MetricRepo;
@@ -172,14 +173,12 @@ public class CloudClusterChecker extends MasterDaemon {
             if (status == Cloud.NodeStatusPB.NODE_STATUS_DECOMMISSIONING) {
                 if (!be.isDecommissioned()) {
                     LOG.info("decommissioned backend: {} status: {}", be, 
status);
-                    // TODO(merge-cloud): add it when has CloudUpgradeMgr.
-                    /*
                     try {
-                    } catch (AnalysisException e) {
+                        ((CloudEnv) 
Env.getCurrentEnv()).getCloudUpgradeMgr().registerWaterShedTxnId(be.getId());
+                    } catch (UserException e) {
                         LOG.warn("failed to register water shed txn id, 
decommission be {}", be.getId(), e);
                     }
                     be.setDecommissioned(true);
-                     */
                 }
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
index 223a78e2b28..551ab504d1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java
@@ -64,6 +64,7 @@ public class CloudEnv extends Env {
 
     private CloudInstanceStatusChecker cloudInstanceStatusChecker;
     private CloudClusterChecker cloudClusterCheck;
+    private CloudUpgradeMgr upgradeMgr;
 
     private CloudTabletRebalancer cloudTabletRebalancer;
     private CacheHotspotManager cacheHotspotMgr;
@@ -76,12 +77,17 @@ public class CloudEnv extends Env {
         this.cloudInstanceStatusChecker = new 
CloudInstanceStatusChecker((CloudSystemInfoService) systemInfo);
         this.cloudTabletRebalancer = new 
CloudTabletRebalancer((CloudSystemInfoService) systemInfo);
         this.cacheHotspotMgr = new 
CacheHotspotManager((CloudSystemInfoService) systemInfo);
+        this.upgradeMgr = new CloudUpgradeMgr((CloudSystemInfoService) 
systemInfo);
     }
 
     public CloudTabletRebalancer getCloudTabletRebalancer() {
         return this.cloudTabletRebalancer;
     }
 
+    public CloudUpgradeMgr getCloudUpgradeMgr() {
+        return this.upgradeMgr;
+    }
+
     @Override
     protected void startMasterOnlyDaemonThreads() {
         LOG.info("start cloud Master only daemon threads");
@@ -91,6 +97,7 @@ public class CloudEnv extends Env {
         if (Config.enable_fetch_cluster_cache_hotspot) {
             cacheHotspotMgr.start();
         }
+        upgradeMgr.start();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 309ab01859c..262fdaad2f0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -799,7 +799,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
             Tablet pickedTablet = beToTablets.get(srcBe).get(randomIndex);
             CloudReplica cloudReplica = (CloudReplica) 
pickedTablet.getReplicas().get(0);
 
-            if (Config.cloud_preheating_enabled) {
+            if (Config.enable_cloud_warm_up_for_rebalance) {
                 if (isConflict(srcBe, destBe, cloudReplica, balanceType, 
futurePartitionToTablets,
                         futureBeToTabletsInTable)) {
                     continue;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
new file mode 100644
index 00000000000..1cb1aeb2568
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudUpgradeMgr.java
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.Backend;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+public class CloudUpgradeMgr extends MasterDaemon {
+
+    @Getter
+    @Setter
+    @RequiredArgsConstructor
+    private static class DbWithWaterTxn {
+        private final Long dbId;
+        private final Long txnId;
+    }
+
+    @Getter
+    @Setter
+    @RequiredArgsConstructor
+    private static class DbWithWaterTxnInBe {
+        private final LinkedBlockingQueue<DbWithWaterTxn> dbWithWaterTxnQueue;
+        private final Long beId;
+
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(CloudUpgradeMgr.class);
+    /*   (<(<dbid1, txn1>, <dbid2, txn2>, ... <dbid3, txn3>), be>, <...>, ...) 
*/
+    private final LinkedBlockingQueue<DbWithWaterTxnInBe> txnBePairList = new 
LinkedBlockingQueue<>();
+
+    private final CloudSystemInfoService cloudSystemInfoService;
+
+    public CloudUpgradeMgr(CloudSystemInfoService cloudSystemInfoService) {
+        super("cloud upgrade manager", 
Config.cloud_upgrade_mgr_interval_second * 1000L);
+        this.cloudSystemInfoService = cloudSystemInfoService;
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.info("before cloud upgrade mgr txnBePairList size {}", 
txnBePairList.size());
+        for (DbWithWaterTxnInBe txnBe : txnBePairList) {
+            LinkedBlockingQueue<DbWithWaterTxn> txnList = 
txnBe.dbWithWaterTxnQueue;
+            long be = txnBe.beId;
+
+            boolean isFinished = false;
+            boolean isBeInactive = true;
+            for (DbWithWaterTxn dbWithWaterTxn : txnList) {
+                List<Long> tableIdList = getAllTables();
+                if (tableIdList.isEmpty()) {
+                    /* no table in this cluster */
+                    break;
+                }
+                try {
+                    if (Config.enable_cloud_running_txn_check) {
+                        isFinished = ((CloudGlobalTransactionMgr) 
Env.getCurrentGlobalTransactionMgr())
+                                
.isPreviousNonTimeoutTxnFinished(dbWithWaterTxn.txnId,
+                                    dbWithWaterTxn.dbId, tableIdList);
+                    } else {
+                        isFinished = true;
+                    }
+                } catch (AnalysisException e) {
+                    throw new RuntimeException(e);
+                }
+                if (!isFinished) {
+                    isBeInactive = false;
+                    LOG.info("BE {} is still active, waiting db {} txn {}",
+                            be, dbWithWaterTxn.dbId, dbWithWaterTxn.txnId);
+                    break;
+                }
+            }
+            if (isBeInactive) {
+                setBeStateInactive(be);
+                LOG.info("BE {} is inactive", be);
+                txnBePairList.remove(txnBe);
+            }
+        }
+        LOG.info("finish cloud upgrade mgr");
+    }
+
+    /* called after tablets migrating to new BE process complete */
+    public void registerWaterShedTxnId(long be) throws UserException {
+        LinkedBlockingQueue<DbWithWaterTxn> txnIds = new 
LinkedBlockingQueue<>();
+        List<Long> dbids = Env.getCurrentInternalCatalog().getDbIds();
+        for (long dbid : dbids) {
+            txnIds.offer(new DbWithWaterTxn(dbid, 
Env.getCurrentGlobalTransactionMgr().getNextTransactionId()));
+        }
+        txnBePairList.offer(new DbWithWaterTxnInBe(txnIds, be));
+        LOG.info("register watershedtxnid {} for BE {}", txnIds.stream()
+                .map(dbWithWaterTxn -> "(" + dbWithWaterTxn.dbId + ":" + 
dbWithWaterTxn.txnId + ")")
+                .collect((Collectors.joining(", ", "{", "}"))), be);
+    }
+
+    private List<Long> getAllTables() {
+        List<Long> mergedList = new ArrayList<>();
+
+        List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+            db.readLock();
+            List<Long> tableList = db.getTableIds();
+            db.readUnlock();
+            mergedList.addAll(tableList);
+        }
+        return mergedList;
+    }
+
+    public void setBeStateInactive(long beId) {
+        Backend be = cloudSystemInfoService.getBackend(beId);
+        if (be == null) {
+            LOG.warn("cannot get be {} to set inactive state", beId);
+            return;
+        }
+        be.setActive(false); /* now user can get BE inactive status from `show 
backends;` */
+        Env.getCurrentEnv().getEditLog().logModifyBackend(be);
+        LOG.info("finished to modify backend {} ", be);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index 347fc7663df..4fe258cba21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -734,7 +734,7 @@ public class Backend implements Writable {
     public String toString() {
         return "Backend [id=" + id + ", host=" + host + ", heartbeatPort=" + 
heartbeatPort + ", alive=" + isAlive.get()
                 + ", lastStartTime=" + 
TimeUtils.longToTimeString(lastStartTime) + ", process epoch=" + lastStartTime
-                + ", tags: " + tagMap + "]";
+                + ", isDecommissioned=" + isDecommissioned + ", tags: " + 
tagMap + "]";
     }
 
     public String getHealthyStatus() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to