This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3eaaad5b805a25a540d08fd544c51d5327cefe7a Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Sat Jul 8 21:59:17 2023 +0800 [refactor](planner) refactor automatically set instance_num (#21640) refactor automatically set instance_num --- be/src/agent/task_worker_pool.cpp | 7 +- .../org/apache/doris/master/ReportHandler.java | 41 ++++++--- .../java/org/apache/doris/qe/SessionVariable.java | 6 +- .../main/java/org/apache/doris/system/Backend.java | 98 +++++++++------------- .../org/apache/doris/system/SystemInfoService.java | 22 +++-- gensrc/thrift/MasterService.thrift | 1 + 6 files changed, 92 insertions(+), 83 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 0102fa885d..f81adb118b 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -613,9 +613,10 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() { disk.__set_used(root_path_info.is_used); request.disks[root_path_info.path] = disk; } - int num_cores = config::pipeline_executor_size > 0 ? config::pipeline_executor_size - : CpuInfo::num_cores(); - request.__set_num_cores(num_cores); + request.__set_num_cores(CpuInfo::num_cores()); + request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 + ? config::pipeline_executor_size + : CpuInfo::num_cores()); _handle_report(request, ReportType::DISK); } StorageEngine::instance()->deregister_report_listener(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index d506eb2432..0dd73b3c72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -181,7 +181,8 @@ public class ReportHandler extends Daemon { } ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, - request.getStoragePolicy(), request.getResource()); + request.getStoragePolicy(), request.getResource(), request.getNumCores(), + request.getPipelineExecutorSize()); try { putToQueue(reportTask); } catch (Exception e) { @@ -192,14 +193,8 @@ public class ReportHandler extends Daemon { tStatus.setErrorMsgs(errorMsgs); return result; } - LOG.info("receive report from be {}. type: {}, current queue size: {}", backend.getId(), reportType, reportQueue.size()); - if (reportType == ReportType.DISK) { - Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); - int numCores = request.getNumCores(); - beinfoCollector.addBeInfo(beId, numCores); - } return result; } @@ -236,11 +231,14 @@ public class ReportHandler extends Daemon { private List<TStoragePolicy> storagePolicies; private List<TStorageResource> storageResources; + private int cpuCores; + private int pipelineExecutorSize; public ReportTask(long beId, Map<TTaskType, Set<Long>> tasks, - Map<String, TDisk> disks, - Map<Long, TTablet> tablets, long reportVersion, - List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources) { + Map<String, TDisk> disks, + Map<Long, TTablet> tablets, long reportVersion, + List<TStoragePolicy> storagePolicies, List<TStorageResource> storageResources, int cpuCores, + int pipelineExecutorSize) { this.beId = beId; this.tasks = tasks; this.disks = disks; @@ -248,6 +246,8 @@ public class ReportHandler extends Daemon { this.reportVersion = reportVersion; this.storagePolicies = storagePolicies; this.storageResources = storageResources; + this.cpuCores = cpuCores; + this.pipelineExecutorSize = pipelineExecutorSize; } @Override @@ -257,6 +257,7 @@ public class ReportHandler extends Daemon { } if (disks != null) { ReportHandler.diskReport(beId, disks); + ReportHandler.cpuReport(beId, cpuCores, pipelineExecutorSize); } if (Config.enable_storage_policy && storagePolicies != null && storageResources != null) { storagePolicyReport(beId, storagePolicies, storageResources); @@ -557,12 +558,30 @@ public class ReportHandler extends Daemon { LOG.warn("backend doesn't exist. id: " + backendId); return; } - backend.updateDisks(backendDisks); LOG.info("finished to handle disk report from backend {}, cost: {} ms", backendId, (System.currentTimeMillis() - start)); } + private static void cpuReport(long backendId, int cpuCores, int pipelineExecutorSize) { + LOG.info("begin to handle cpu report from backend {}", backendId); + long start = System.currentTimeMillis(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + if (backend == null) { + LOG.warn("backend doesn't exist. id: " + backendId); + return; + } + if (backend.updateCpuInfo(cpuCores, pipelineExecutorSize)) { + // cpu info is changed + LOG.info("new cpu info. backendId: {}, cpucores: {}, pipelineExecutorSize: {}", backendId, cpuCores, + pipelineExecutorSize); + // log change + Env.getCurrentEnv().getEditLog().logBackendStateChange(backend); + } + LOG.info("finished to handle cpu report from backend {}, cost: {} ms", + backendId, (System.currentTimeMillis() - start)); + } + private static void sync(Map<Long, TTablet> backendTablets, ListMultimap<Long, Long> tabletSyncMap, long backendId, long backendReportVersion) { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 2461e69b2b..3deda35d43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.SetVar; import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExperimentalUtil.ExperimentalType; @@ -28,7 +29,6 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.nereids.metrics.Event; import org.apache.doris.nereids.metrics.EventSwitchParser; import org.apache.doris.qe.VariableMgr.VarAttr; -import org.apache.doris.system.Backend; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterType; @@ -1398,8 +1398,8 @@ public class SessionVariable implements Serializable, Writable { public int getParallelExecInstanceNum() { if (enablePipelineEngine && parallelPipelineTaskNum == 0) { - Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); - return beinfoCollector.getParallelExecInstanceNum(); + int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize(); + return (size + 1) / 2; } else if (enablePipelineEngine) { return parallelPipelineTaskNum; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index c6a06d1b13..ba10130d49 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -48,7 +48,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -119,6 +118,14 @@ public class Backend implements Writable { @SerializedName("tagMap") private Map<String, String> tagMap = Maps.newHashMap(); + // cpu cores + @SerializedName("cpuCores") + private int cpuCores = 1; + + // from config::pipeline_executor_size , default equal cpuCores + @SerializedName("pipelineExecutorSize") + private int pipelineExecutorSize = 1; + // Counter of heartbeat failure. // Once a heartbeat failed, increase this counter by one. // And if it reaches Config.max_backend_heartbeat_failure_tolerance_count, this backend @@ -278,6 +285,14 @@ public class Backend implements Writable { this.brpcPort = brpcPort; } + public void setCpuCores(int cpuCores) { + this.cpuCores = cpuCores; + } + + public void setPipelineExecutorSize(int pipelineExecutorSize) { + this.pipelineExecutorSize = pipelineExecutorSize; + } + public long getLastUpdateMs() { return this.lastUpdateMs; } @@ -294,6 +309,14 @@ public class Backend implements Writable { this.lastStartTime = currentTime; } + public int getCputCores() { + return cpuCores; + } + + public int getPipelineExecutorSize() { + return pipelineExecutorSize; + } + public long getLastMissingHeartbeatTime() { return lastMissingHeartbeatTime; } @@ -519,6 +542,20 @@ public class Backend implements Writable { } } + public boolean updateCpuInfo(int cpuCores, int pipelineExecutorSize) { + boolean isChanged = false; + + if (this.cpuCores != cpuCores) { + this.cpuCores = cpuCores; + isChanged = true; + } + if (this.pipelineExecutorSize != pipelineExecutorSize) { + this.pipelineExecutorSize = pipelineExecutorSize; + isChanged = true; + } + return isChanged; + } + /** * In old version, there is only one tag for a Backend, and it is a "location" type tag. * But in new version, a Backend can have multi tag, so we need to put locationTag to @@ -729,63 +766,4 @@ public class Backend implements Writable { return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}"; } - public static BeInfoCollector getBeInfoCollector() { - return BeInfoCollector.get(); - } - - public static class BeInfoCollector { - private int numCores = 1; - private static volatile BeInfoCollector instance = null; - private static final Map<Long, BeInfoCollector> Info = new ConcurrentHashMap<>(); - - private BeInfoCollector(int numCores) { - this.numCores = numCores; - } - - public static BeInfoCollector get() { - if (instance == null) { - synchronized (BeInfoCollector.class) { - if (instance == null) { - instance = new BeInfoCollector(Integer.MAX_VALUE); - } - } - } - return instance; - } - - public int getNumCores() { - return numCores; - } - - public void clear() { - Info.clear(); - } - - public void addBeInfo(long beId, int numCores) { - Info.put(beId, new BeInfoCollector(numCores)); - } - - public void dropBeInfo(long beId) { - Info.remove(beId); - } - - public int getMinNumCores() { - int minNumCores = Integer.MAX_VALUE; - for (BeInfoCollector beinfo : Info.values()) { - minNumCores = Math.min(minNumCores, beinfo.getNumCores()); - } - return Math.max(1, minNumCores); - } - - public int getParallelExecInstanceNum() { - if (getMinNumCores() == Integer.MAX_VALUE) { - return 1; - } - return (getMinNumCores() + 1) / 2; - } - - public BeInfoCollector getBeInfoCollectorById(long beId) { - return Info.get(beId); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index f93f906cc3..12483ff2f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -247,9 +247,6 @@ public class SystemInfoService { throw new DdlException("Backend[" + backendId + "] does not exist"); } dropBackend(backend.getHost(), backend.getHeartbeatPort()); - // update BeInfoCollector - Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); - beinfoCollector.dropBeInfo(backendId); } // final entry of dropping backend @@ -770,9 +767,6 @@ public class SystemInfoService { ImmutableMap<Long, AtomicLong> newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions); idToReportVersionRef = newIdToReportVersion; - // update BeInfoCollector - Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector(); - beinfoCollector.dropBeInfo(backend.getId()); } public void updateBackendState(Backend be) { @@ -791,6 +785,8 @@ public class SystemInfoService { memoryBe.setLastUpdateMs(be.getLastUpdateMs()); memoryBe.setLastStartTime(be.getLastStartTime()); memoryBe.setDisks(be.getDisks()); + memoryBe.setCpuCores(be.getCputCores()); + memoryBe.setPipelineExecutorSize(be.getPipelineExecutorSize()); } } @@ -963,4 +959,18 @@ public class SystemInfoService { List<Backend> bes = getMixBackends(); return bes.stream().filter(b -> b.getLocationTag().equals(tag)).collect(Collectors.toList()); } + + public int getMinPipelineExecutorSize() { + if (idToBackendRef.size() == 0) { + return 1; + } + int minPipelineExecutorSize = Integer.MAX_VALUE; + for (Backend be : idToBackendRef.values()) { + int size = be.getPipelineExecutorSize(); + if (size > 0) { + minPipelineExecutorSize = Math.min(minPipelineExecutorSize, size); + } + } + return minPipelineExecutorSize; + } } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 18d95b3854..3643df178b 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -102,6 +102,7 @@ struct TReportRequest { 9: optional list<AgentService.TStoragePolicy> storage_policy // only id and version 10: optional list<AgentService.TStorageResource> resource // only id and version 11: i32 num_cores + 12: i32 pipeline_executor_size } struct TMasterResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org