This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 54a8c83f6a187d2c2392594fbf62cad292c48966
Author: zhengyu <freeman.zhang1...@gmail.com>
AuthorDate: Wed Mar 6 21:57:51 2024 +0800

    [enhancement](cloud) add CloudTabletStatMgr to capture stats in cloud mode 
(#31818)
    
    Signed-off-by: freemandealer <freeman.zhang1...@gmail.com>
---
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../apache/doris/catalog/CloudTabletStatMgr.java   | 273 +++++++++++++++++++++
 .../java/org/apache/doris/catalog/Database.java    |   5 +
 .../main/java/org/apache/doris/catalog/Env.java    |   9 +-
 .../java/org/apache/doris/catalog/Replica.java     |  17 ++
 5 files changed, 306 insertions(+), 1 deletion(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index cbbf9368720..418122745d4 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2583,6 +2583,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int cloud_cold_read_percent = 10; // 10%
 
+    @ConfField(mutable = true)
+    public static int get_tablet_stat_batch_size = 1000;
+
     // The original meta read lock is not enough to keep a snapshot of 
partition versions,
     // so the execution of `createScanRangeLocations` are delayed to 
`Coordinator::exec`,
     // to help to acquire a snapshot of partition versions.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
new file mode 100644
index 00000000000..136c9264a4e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsRequest;
+import org.apache.doris.cloud.proto.Cloud.GetTabletStatsResponse;
+import org.apache.doris.cloud.proto.Cloud.MetaServiceCode;
+import org.apache.doris.cloud.proto.Cloud.TabletIndexPB;
+import org.apache.doris.cloud.proto.Cloud.TabletStatsPB;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.rpc.RpcException;
+
+import lombok.Getter;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ForkJoinPool;
+
+/*
+ * CloudTabletStatMgr is for collecting tablet(replica) statistics from 
backends.
+ * Each FE will collect by itself.
+ */
+public class CloudTabletStatMgr extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CloudTabletStatMgr.class);
+
+    private ForkJoinPool taskPool = new 
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+
+    // <(dbId, tableId) -> CloudTableStats>
+    private ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> 
cloudTableStatsMap = new ConcurrentHashMap<>();
+
+    public CloudTabletStatMgr() {
+        super("cloud tablet stat mgr", 
Config.tablet_stat_update_interval_second * 1000);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.info("cloud tablet stat begin");
+        long start = System.currentTimeMillis();
+
+        List<GetTabletStatsRequest> reqList = new 
ArrayList<GetTabletStatsRequest>();
+        GetTabletStatsRequest.Builder builder = 
GetTabletStatsRequest.newBuilder();
+        List<Long> dbIds = Env.getCurrentInternalCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+
+            List<Table> tableList = db.getTables();
+            for (Table table : tableList) {
+                if (table.getType() != TableType.OLAP) {
+                    continue;
+                }
+
+                table.readLock();
+                try {
+                    OlapTable tbl = (OlapTable) table;
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                            for (Long tabletId : index.getTabletIdsInOrder()) {
+                                Tablet tablet = index.getTablet(tabletId);
+                                TabletIndexPB.Builder tabletBuilder = 
TabletIndexPB.newBuilder();
+                                tabletBuilder.setDbId(dbId);
+                                tabletBuilder.setTableId(table.getId());
+                                tabletBuilder.setIndexId(index.getId());
+                                
tabletBuilder.setPartitionId(partition.getId());
+                                tabletBuilder.setTabletId(tablet.getId());
+                                builder.addTabletIdx(tabletBuilder);
+
+                                if (builder.getTabletIdxCount() >= 
Config.get_tablet_stat_batch_size) {
+                                    reqList.add(builder.build());
+                                    builder = 
GetTabletStatsRequest.newBuilder();
+                                }
+                            }
+                        }
+                    } // partitions
+                } finally {
+                    table.readUnlock();
+                }
+            } // tables
+        } // end for dbs
+
+        if (builder.getTabletIdxCount() > 0) {
+            reqList.add(builder.build());
+        }
+
+        for (GetTabletStatsRequest req : reqList) {
+            GetTabletStatsResponse resp;
+            try {
+                resp = getTabletStats(req);
+            } catch (RpcException e) {
+                LOG.info("get tablet stats exception:", e);
+                continue;
+            }
+
+            if (resp.getStatus().getCode() != MetaServiceCode.OK) {
+                continue;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                int i = 0;
+                for (TabletIndexPB idx : req.getTabletIdxList()) {
+                    LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id: 
{} size: {}",
+                            idx.getDbId(), idx.getTableId(), idx.getIndexId(), 
idx.getTabletId(),
+                            resp.getTabletStats(i++).getDataSize());
+                }
+            }
+            updateTabletStat(resp);
+        }
+
+        LOG.info("finished to get tablet stat of all backends. cost: {} ms",
+                (System.currentTimeMillis() - start));
+
+        // after update replica in all backends, update index row num
+        start = System.currentTimeMillis();
+        ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> 
newCloudTableStatsMap = new ConcurrentHashMap<>();
+        for (Long dbId : dbIds) {
+            Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+
+            List<Table> tableList = db.getTables();
+            for (Table table : tableList) {
+                if (table.getType() != TableType.OLAP) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) table;
+
+                String dbName = db.getName();
+                Long tableId = table.getId();
+                String tableName = table.getName();
+
+                Long tableDataSize = 0L;
+                Long tableRowsetCount = 0L;
+                Long tableSegmentCount = 0L;
+                Long tableRowCount = 0L;
+
+                if (!table.writeLockIfExist()) {
+                    continue;
+                }
+
+                try {
+                    for (Partition partition : olapTable.getAllPartitions()) {
+                        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                            long indexRowCount = 0L;
+                            for (Tablet tablet : index.getTablets()) {
+                                long tabletDataSize = 0L;
+                                long tabletRowsetCount = 0L;
+                                long tabletSegmentCount = 0L;
+                                long tabletRowCount = 0L;
+
+                                for (Replica replica : tablet.getReplicas()) {
+                                    if (replica.getDataSize() > 
tabletDataSize) {
+                                        tabletDataSize = replica.getDataSize();
+                                    }
+
+                                    if (replica.getRowsetCount() > 
tabletRowsetCount) {
+                                        tabletRowsetCount = 
replica.getRowsetCount();
+                                    }
+
+                                    if (replica.getSegmentCount() > 
tabletSegmentCount) {
+                                        tabletSegmentCount = 
replica.getSegmentCount();
+                                    }
+
+                                    if (replica.getRowCount() > 
tabletRowCount) {
+                                        tabletRowCount = replica.getRowCount();
+                                    }
+                                }
+
+                                tableDataSize += tabletDataSize;
+                                tableRowsetCount += tabletRowsetCount;
+                                tableSegmentCount += tabletSegmentCount;
+                                tableRowCount += tabletRowCount;
+
+                                indexRowCount += tabletRowCount;
+                            } // end for tablets
+                            index.setRowCount(indexRowCount);
+                        } // end for indices
+                    } // end for partitions
+                    LOG.debug("finished to set row num for table: {} in 
database: {}",
+                             table.getName(), db.getFullName());
+                } finally {
+                    table.writeUnlock();
+                }
+
+                newCloudTableStatsMap.put(Pair.of(dbId, tableId), new 
CloudTableStats(dbName, tableName,
+                        tableDataSize, tableRowsetCount, tableSegmentCount, 
tableRowCount));
+            }
+        }
+        this.cloudTableStatsMap = newCloudTableStatsMap;
+        LOG.info("finished to update index row num of all databases. cost: {} 
ms",
+                (System.currentTimeMillis() - start));
+    }
+
+    private void updateTabletStat(GetTabletStatsResponse response) {
+        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        for (TabletStatsPB stat : response.getTabletStatsList()) {
+            if (invertedIndex.getTabletMeta(stat.getIdx().getTabletId()) != 
null) {
+                List<Replica> replicas = 
invertedIndex.getReplicasByTabletId(stat.getIdx().getTabletId());
+                if (replicas != null && !replicas.isEmpty() && replicas.get(0) 
!= null) {
+                    replicas.get(0).updateCloudStat(stat.getDataSize(), 
stat.getNumRowsets(),
+                            stat.getNumSegments(), stat.getNumRows());
+                }
+            }
+        }
+    }
+
+    private GetTabletStatsResponse getTabletStats(GetTabletStatsRequest 
request)
+            throws RpcException {
+        GetTabletStatsResponse response;
+        try {
+            response = MetaServiceProxy.getInstance().getTabletStats(request);
+        } catch (RpcException e) {
+            LOG.info("get tablet stat get exception:", e);
+            throw e;
+        }
+        return response;
+    }
+
+    public ConcurrentHashMap<Pair<Long, Long>, CloudTableStats> 
getCloudTableStatsMap() {
+        return this.cloudTableStatsMap;
+    }
+
+    public static class CloudTableStats {
+        @Getter
+        private String dbName;
+        @Getter
+        private String tableName;
+
+        @Getter
+        private Long tableDataSize;
+        @Getter
+        private Long tableRowsetCount;
+        @Getter
+        private Long tableSegmentCount;
+        @Getter
+        private Long tableRowCount;
+
+        public CloudTableStats(String dbName, String tableName, Long 
tableDataSize, Long tableRowsetCount,
+                Long tableSegmentCount, Long tableRowCount) {
+            this.dbName = dbName;
+            this.tableName = tableName;
+            this.tableDataSize = tableDataSize;
+            this.tableRowsetCount = tableRowsetCount;
+            this.tableSegmentCount = tableSegmentCount;
+            this.tableRowCount = tableRowCount;
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index fd709963a7b..d1f33b2b10c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -213,6 +213,11 @@ public class Database extends MetaObject implements 
Writable, DatabaseIf<Table>
         return fullQualifiedName;
     }
 
+    public String getName() {
+        String[] strs = fullQualifiedName.split(":");
+        return strs.length == 2 ? strs[1] : strs[0];
+    }
+
     public void setNameWithLock(String newName) {
         writeLock();
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index effd7b6ca64..38fe843d5e7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -448,6 +448,8 @@ public class Env {
 
     private TabletStatMgr tabletStatMgr;
 
+    private CloudTabletStatMgr cloudTabletStatMgr;
+
     private Auth auth;
     private AccessControllerManager accessManager;
 
@@ -692,6 +694,7 @@ public class Env {
         this.globalTransactionMgr = new GlobalTransactionMgr(this);
 
         this.tabletStatMgr = new TabletStatMgr();
+        this.cloudTabletStatMgr = new CloudTabletStatMgr();
 
         this.auth = new Auth();
         this.accessManager = new AccessControllerManager(auth);
@@ -1670,7 +1673,11 @@ public class Env {
     private void startNonMasterDaemonThreads() {
         // start load manager thread
         loadManager.start();
-        tabletStatMgr.start();
+        if (Config.isNotCloudMode()) {
+            tabletStatMgr.start();
+        } else {
+            cloudTabletStatMgr.start();
+        }
         // load and export job label cleaner thread
         labelCleaner.start();
         // es repository
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 0dad3612023..213fc94ec38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -159,6 +159,8 @@ public class Replica implements Writable {
      */
     private long preWatermarkTxnId = -1;
     private long postWatermarkTxnId = -1;
+    private long segmentCount = 0L;
+    private long rowsetCount = 0L;
 
     private long userDropTime = -1;
 
@@ -241,6 +243,14 @@ public class Replica implements Writable {
         return rowCount;
     }
 
+    public long getSegmentCount() {
+        return segmentCount;
+    }
+
+    public long getRowsetCount() {
+        return rowsetCount;
+    }
+
     public long getLastFailedVersion() {
         return lastFailedVersion;
     }
@@ -334,6 +344,13 @@ public class Replica implements Writable {
         this.versionCount = versionCount;
     }
 
+    public synchronized void updateCloudStat(long dataSize, long rowsetNum, 
long segmentNum, long rowNum) {
+        this.dataSize = dataSize;
+        this.rowsetCount = rowsetNum;
+        this.segmentCount = segmentNum;
+        this.rowCount = rowNum;
+    }
+
     public synchronized void updateVersionInfo(long newVersion, long 
newDataSize, long newRemoteDataSize,
                                                long newRowCount) {
         updateReplicaInfo(newVersion, this.lastFailedVersion, 
this.lastSuccessVersion, newDataSize, newRemoteDataSize,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to