This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3eaaad5b805a25a540d08fd544c51d5327cefe7a
Author: Mryange <59914473+mrya...@users.noreply.github.com>
AuthorDate: Sat Jul 8 21:59:17 2023 +0800

    [refactor](planner) refactor automatically set instance_num (#21640)
    
    refactor automatically set instance_num
---
 be/src/agent/task_worker_pool.cpp                  |  7 +-
 .../org/apache/doris/master/ReportHandler.java     | 41 ++++++---
 .../java/org/apache/doris/qe/SessionVariable.java  |  6 +-
 .../main/java/org/apache/doris/system/Backend.java | 98 +++++++++-------------
 .../org/apache/doris/system/SystemInfoService.java | 22 +++--
 gensrc/thrift/MasterService.thrift                 |  1 +
 6 files changed, 92 insertions(+), 83 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 0102fa885d..f81adb118b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -613,9 +613,10 @@ void 
TaskWorkerPool::_report_disk_state_worker_thread_callback() {
             disk.__set_used(root_path_info.is_used);
             request.disks[root_path_info.path] = disk;
         }
-        int num_cores = config::pipeline_executor_size > 0 ? 
config::pipeline_executor_size
-                                                           : 
CpuInfo::num_cores();
-        request.__set_num_cores(num_cores);
+        request.__set_num_cores(CpuInfo::num_cores());
+        request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
+                                                     ? 
config::pipeline_executor_size
+                                                     : CpuInfo::num_cores());
         _handle_report(request, ReportType::DISK);
     }
     StorageEngine::instance()->deregister_report_listener(this);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d506eb2432..0dd73b3c72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -181,7 +181,8 @@ public class ReportHandler extends Daemon {
         }
 
         ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, 
reportVersion,
-                request.getStoragePolicy(), request.getResource());
+                request.getStoragePolicy(), request.getResource(), 
request.getNumCores(),
+                request.getPipelineExecutorSize());
         try {
             putToQueue(reportTask);
         } catch (Exception e) {
@@ -192,14 +193,8 @@ public class ReportHandler extends Daemon {
             tStatus.setErrorMsgs(errorMsgs);
             return result;
         }
-
         LOG.info("receive report from be {}. type: {}, current queue size: {}",
                 backend.getId(), reportType, reportQueue.size());
-        if (reportType == ReportType.DISK) {
-            Backend.BeInfoCollector beinfoCollector = 
Backend.getBeInfoCollector();
-            int numCores = request.getNumCores();
-            beinfoCollector.addBeInfo(beId, numCores);
-        }
         return result;
     }
 
@@ -236,11 +231,14 @@ public class ReportHandler extends Daemon {
 
         private List<TStoragePolicy> storagePolicies;
         private List<TStorageResource> storageResources;
+        private int cpuCores;
+        private int pipelineExecutorSize;
 
         public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks,
-                          Map<String, TDisk> disks,
-                          Map<Long, TTablet> tablets, long reportVersion,
-                          List<TStoragePolicy> storagePolicies, 
List<TStorageResource> storageResources) {
+                Map<String, TDisk> disks,
+                Map<Long, TTablet> tablets, long reportVersion,
+                List<TStoragePolicy> storagePolicies, List<TStorageResource> 
storageResources, int cpuCores,
+                int pipelineExecutorSize) {
             this.beId = beId;
             this.tasks = tasks;
             this.disks = disks;
@@ -248,6 +246,8 @@ public class ReportHandler extends Daemon {
             this.reportVersion = reportVersion;
             this.storagePolicies = storagePolicies;
             this.storageResources = storageResources;
+            this.cpuCores = cpuCores;
+            this.pipelineExecutorSize = pipelineExecutorSize;
         }
 
         @Override
@@ -257,6 +257,7 @@ public class ReportHandler extends Daemon {
             }
             if (disks != null) {
                 ReportHandler.diskReport(beId, disks);
+                ReportHandler.cpuReport(beId, cpuCores, pipelineExecutorSize);
             }
             if (Config.enable_storage_policy && storagePolicies != null && 
storageResources != null) {
                 storagePolicyReport(beId, storagePolicies, storageResources);
@@ -557,12 +558,30 @@ public class ReportHandler extends Daemon {
             LOG.warn("backend doesn't exist. id: " + backendId);
             return;
         }
-
         backend.updateDisks(backendDisks);
         LOG.info("finished to handle disk report from backend {}, cost: {} ms",
                 backendId, (System.currentTimeMillis() - start));
     }
 
+    private static void cpuReport(long backendId, int cpuCores, int 
pipelineExecutorSize) {
+        LOG.info("begin to handle cpu report from backend {}", backendId);
+        long start = System.currentTimeMillis();
+        Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
+        if (backend == null) {
+            LOG.warn("backend doesn't exist. id: " + backendId);
+            return;
+        }
+        if (backend.updateCpuInfo(cpuCores, pipelineExecutorSize)) {
+            // cpu info is changed
+            LOG.info("new cpu info. backendId: {}, cpucores: {}, 
pipelineExecutorSize: {}", backendId, cpuCores,
+                    pipelineExecutorSize);
+            // log change
+            Env.getCurrentEnv().getEditLog().logBackendStateChange(backend);
+        }
+        LOG.info("finished to handle cpu report from backend {}, cost: {} ms",
+                backendId, (System.currentTimeMillis() - start));
+    }
+
     private static void sync(Map<Long, TTablet> backendTablets, 
ListMultimap<Long, Long> tabletSyncMap,
                              long backendId, long backendReportVersion) {
         TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2461e69b2b..3deda35d43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.SetVar;
 import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ExperimentalUtil.ExperimentalType;
@@ -28,7 +29,6 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.nereids.metrics.Event;
 import org.apache.doris.nereids.metrics.EventSwitchParser;
 import org.apache.doris.qe.VariableMgr.VarAttr;
-import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TResourceLimit;
 import org.apache.doris.thrift.TRuntimeFilterType;
@@ -1398,8 +1398,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public int getParallelExecInstanceNum() {
         if (enablePipelineEngine && parallelPipelineTaskNum == 0) {
-            Backend.BeInfoCollector beinfoCollector = 
Backend.getBeInfoCollector();
-            return beinfoCollector.getParallelExecInstanceNum();
+            int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+            return (size + 1) / 2;
         } else if (enablePipelineEngine) {
             return parallelPipelineTaskNum;
         } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index c6a06d1b13..ba10130d49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -48,7 +48,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -119,6 +118,14 @@ public class Backend implements Writable {
     @SerializedName("tagMap")
     private Map<String, String> tagMap = Maps.newHashMap();
 
+    // cpu cores
+    @SerializedName("cpuCores")
+    private int cpuCores = 1;
+
+    // from config::pipeline_executor_size , default equal cpuCores
+    @SerializedName("pipelineExecutorSize")
+    private int pipelineExecutorSize = 1;
+
     // Counter of heartbeat failure.
     // Once a heartbeat failed, increase this counter by one.
     // And if it reaches Config.max_backend_heartbeat_failure_tolerance_count, 
this backend
@@ -278,6 +285,14 @@ public class Backend implements Writable {
         this.brpcPort = brpcPort;
     }
 
+    public void setCpuCores(int cpuCores) {
+        this.cpuCores = cpuCores;
+    }
+
+    public void setPipelineExecutorSize(int pipelineExecutorSize) {
+        this.pipelineExecutorSize = pipelineExecutorSize;
+    }
+
     public long getLastUpdateMs() {
         return this.lastUpdateMs;
     }
@@ -294,6 +309,14 @@ public class Backend implements Writable {
         this.lastStartTime = currentTime;
     }
 
+    public int getCputCores() {
+        return cpuCores;
+    }
+
+    public int getPipelineExecutorSize() {
+        return pipelineExecutorSize;
+    }
+
     public long getLastMissingHeartbeatTime() {
         return lastMissingHeartbeatTime;
     }
@@ -519,6 +542,20 @@ public class Backend implements Writable {
         }
     }
 
+    public boolean updateCpuInfo(int cpuCores, int pipelineExecutorSize) {
+        boolean isChanged = false;
+
+        if (this.cpuCores != cpuCores) {
+            this.cpuCores = cpuCores;
+            isChanged = true;
+        }
+        if (this.pipelineExecutorSize != pipelineExecutorSize) {
+            this.pipelineExecutorSize = pipelineExecutorSize;
+            isChanged = true;
+        }
+        return isChanged;
+    }
+
     /**
      * In old version, there is only one tag for a Backend, and it is a 
"location" type tag.
      * But in new version, a Backend can have multi tag, so we need to put 
locationTag to
@@ -729,63 +766,4 @@ public class Backend implements Writable {
         return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + 
"}";
     }
 
-    public static BeInfoCollector getBeInfoCollector() {
-        return BeInfoCollector.get();
-    }
-
-    public static class BeInfoCollector {
-        private int numCores = 1;
-        private static volatile BeInfoCollector instance = null;
-        private static final Map<Long, BeInfoCollector> Info = new 
ConcurrentHashMap<>();
-
-        private BeInfoCollector(int numCores) {
-            this.numCores = numCores;
-        }
-
-        public static BeInfoCollector get() {
-            if (instance == null) {
-                synchronized (BeInfoCollector.class) {
-                    if (instance == null) {
-                        instance = new BeInfoCollector(Integer.MAX_VALUE);
-                    }
-                }
-            }
-            return instance;
-        }
-
-        public int getNumCores() {
-            return numCores;
-        }
-
-        public void clear() {
-            Info.clear();
-        }
-
-        public void addBeInfo(long beId, int numCores) {
-            Info.put(beId, new BeInfoCollector(numCores));
-        }
-
-        public void dropBeInfo(long beId) {
-            Info.remove(beId);
-        }
-
-        public int getMinNumCores() {
-            int minNumCores = Integer.MAX_VALUE;
-            for (BeInfoCollector beinfo : Info.values()) {
-                minNumCores = Math.min(minNumCores, beinfo.getNumCores());
-            }
-            return Math.max(1, minNumCores);
-        }
-
-        public int getParallelExecInstanceNum() {
-            if (getMinNumCores() == Integer.MAX_VALUE) {
-                return 1;
-            }
-            return (getMinNumCores() + 1) / 2;
-        }
-
-        public BeInfoCollector getBeInfoCollectorById(long beId) {
-            return Info.get(beId);
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index f93f906cc3..12483ff2f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -247,9 +247,6 @@ public class SystemInfoService {
             throw new DdlException("Backend[" + backendId + "] does not 
exist");
         }
         dropBackend(backend.getHost(), backend.getHeartbeatPort());
-        // update BeInfoCollector
-        Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
-        beinfoCollector.dropBeInfo(backendId);
     }
 
     // final entry of dropping backend
@@ -770,9 +767,6 @@ public class SystemInfoService {
         ImmutableMap<Long, AtomicLong> newIdToReportVersion = 
ImmutableMap.copyOf(copiedReportVersions);
         idToReportVersionRef = newIdToReportVersion;
 
-        // update BeInfoCollector
-        Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
-        beinfoCollector.dropBeInfo(backend.getId());
     }
 
     public void updateBackendState(Backend be) {
@@ -791,6 +785,8 @@ public class SystemInfoService {
             memoryBe.setLastUpdateMs(be.getLastUpdateMs());
             memoryBe.setLastStartTime(be.getLastStartTime());
             memoryBe.setDisks(be.getDisks());
+            memoryBe.setCpuCores(be.getCputCores());
+            memoryBe.setPipelineExecutorSize(be.getPipelineExecutorSize());
         }
     }
 
@@ -963,4 +959,18 @@ public class SystemInfoService {
         List<Backend> bes = getMixBackends();
         return bes.stream().filter(b -> 
b.getLocationTag().equals(tag)).collect(Collectors.toList());
     }
+
+    public int getMinPipelineExecutorSize() {
+        if (idToBackendRef.size() == 0) {
+            return 1;
+        }
+        int minPipelineExecutorSize = Integer.MAX_VALUE;
+        for (Backend be : idToBackendRef.values()) {
+            int size = be.getPipelineExecutorSize();
+            if (size > 0) {
+                minPipelineExecutorSize = Math.min(minPipelineExecutorSize, 
size);
+            }
+        }
+        return minPipelineExecutorSize;
+    }
 }
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 18d95b3854..3643df178b 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -102,6 +102,7 @@ struct TReportRequest {
     9: optional list<AgentService.TStoragePolicy> storage_policy // only id 
and version
     10: optional list<AgentService.TStorageResource> resource // only id and 
version
     11: i32 num_cores
+    12: i32 pipeline_executor_size
 }
 
 struct TMasterResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to