This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.1-tmp by this push: new 16f8afc4083 [refactor](coordinator) split profile logic and instance report logic (#32010) 16f8afc4083 is described below commit 16f8afc4083d84f3ce4a4317e2a558b907ff535d Author: yiguolei <676222...@qq.com> AuthorDate: Wed Apr 3 18:48:19 2024 +0800 [refactor](coordinator) split profile logic and instance report logic (#32010) Co-authored-by: yiguolei <yiguo...@gmail.com> --- .../main/java/org/apache/doris/common/Config.java | 1 + .../main/java/org/apache/doris/common/Status.java | 5 + .../doris/common/profile/ExecutionProfile.java | 309 +++++------ .../org/apache/doris/common/profile/Profile.java | 80 ++- .../doris/common/profile/SummaryProfile.java | 16 +- .../apache/doris/common/util/ProfileManager.java | 82 ++- .../apache/doris/common/util/RuntimeProfile.java | 34 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 21 +- .../apache/doris/load/loadv2/LoadLoadingTask.java | 16 +- .../nereids/trees/plans/commands/LoadCommand.java | 4 +- .../java/org/apache/doris/qe/CoordInterface.java | 2 - .../main/java/org/apache/doris/qe/Coordinator.java | 603 +++++++++++---------- .../java/org/apache/doris/qe/PointQueryExec.java | 6 - .../main/java/org/apache/doris/qe/QeProcessor.java | 2 - .../java/org/apache/doris/qe/QeProcessorImpl.java | 79 ++- .../java/org/apache/doris/qe/SessionVariable.java | 9 + .../java/org/apache/doris/qe/StmtExecutor.java | 51 +- .../org/apache/doris/rpc/BackendServiceClient.java | 3 +- .../org/apache/doris/rpc/BackendServiceProxy.java | 7 +- .../doris/common/util/RuntimeProfileTest.java | 2 +- .../java/org/apache/doris/qe/StmtExecutorTest.java | 4 +- 21 files changed, 715 insertions(+), 621 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 5a6bdf1b6d5..a727c23bb74 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2520,6 +2520,7 @@ public class Config extends ConfigBase { "Whether to enable proxy protocol" }) public static boolean enable_proxy_protocol = false; + public static int profile_async_collect_expire_time_secs = 5; //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java index 1961f9b8cc5..555a82751ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java @@ -128,4 +128,9 @@ public class Status { } } } + + @Override + public String toString() { + return "Status [errorCode=" + errorCode + ", errorMsg=" + errorMsg + "]"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 24bd2355c56..f339da82924 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -17,33 +17,31 @@ package org.apache.doris.common.profile; -import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; -import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUnit; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * ExecutionProfile is used to collect profile of a complete query plan(including query or load). + * root is used to collect profile of a complete query plan(including query or load). * Need to call addToProfileAsChild() to add it to the root profile. * It has the following structure: * Execution Profile: @@ -59,53 +57,62 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class ExecutionProfile { private static final Logger LOG = LogManager.getLogger(ExecutionProfile.class); + private final TUniqueId queryId; + private boolean isFinished = false; + private long startTime = 0L; + private long queryFinishTime = 0L; // The root profile of this execution task - private RuntimeProfile executionProfile; + private RuntimeProfile root; // Profiles for each fragment. And the InstanceProfile is the child of fragment profile. // Which will be added to fragment profile when calling Coordinator::sendFragment() - private List<RuntimeProfile> fragmentProfiles; + // Could not use array list because fragment id is not continuous, planner may cut fragment + // during planning. + private Map<Integer, RuntimeProfile> fragmentProfiles; // Profile for load channels. Only for load job. private RuntimeProfile loadChannelProfile; - // A countdown latch to mark the completion of each instance. - // use for old pipeline - // instance id -> dummy value - private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal; + // FragmentId -> InstanceId -> RuntimeProfile + private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>> fragmentInstancesProfiles; + private boolean isPipelineXProfile = false; - // A countdown latch to mark the completion of each fragment. use for pipelineX - // fragmentId -> dummy value - private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal; - - // fragmentId -> The number of BE without 'done. - private Map<Integer, Integer> befragmentDone; + // use to merge profile from multi be + private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null; - // lock befragmentDone - private ReadWriteLock lock; + // Not serialize this property, it is only used to get profile id. + private SummaryProfile summaryProfile; - // use to merge profile from multi be - private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null; + // BE only has instance id, does not have fragmentid, so should use this map to find fragmentid. + private Map<TUniqueId, PlanFragmentId> instanceIdToFragmentId; + private Map<Integer, Integer> fragmentIdBeNum; + private Map<Integer, Integer> seqNoToFragmentId; - public ExecutionProfile(TUniqueId queryId, int fragmentNum) { - executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); + public ExecutionProfile(TUniqueId queryId, List<PlanFragment> fragments) { + this.queryId = queryId; + root = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); - executionProfile.addChild(fragmentsProfile); - fragmentProfiles = Lists.newArrayList(); - multiBeProfile = Lists.newArrayList(); - for (int i = 0; i < fragmentNum; i++) { - fragmentProfiles.add(new RuntimeProfile("Fragment " + i)); - fragmentsProfile.addChild(fragmentProfiles.get(i)); - multiBeProfile.add(new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>()); + root.addChild(fragmentsProfile); + fragmentProfiles = Maps.newHashMap(); + multiBeProfile = Maps.newHashMap(); + fragmentIdBeNum = Maps.newHashMap(); + seqNoToFragmentId = Maps.newHashMap(); + int i = 0; + for (PlanFragment planFragment : fragments) { + RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i); + fragmentProfiles.put(planFragment.getFragmentId().asInt(), runtimeProfile); + fragmentsProfile.addChild(runtimeProfile); + multiBeProfile.put(planFragment.getFragmentId().asInt(), + new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>()); + fragmentIdBeNum.put(planFragment.getFragmentId().asInt(), 0); + seqNoToFragmentId.put(i, planFragment.getFragmentId().asInt()); + ++i; } loadChannelProfile = new RuntimeProfile("LoadChannels"); - executionProfile.addChild(loadChannelProfile); + root.addChild(loadChannelProfile); + fragmentInstancesProfiles = Maps.newHashMap(); + instanceIdToFragmentId = Maps.newHashMap(); } - public void addMultiBeProfileByPipelineX(int profileFragmentId, TNetworkAddress address, - List<RuntimeProfile> taskProfile) { - multiBeProfile.get(profileFragmentId).put(address, taskProfile); - } - - private List<List<RuntimeProfile>> getMultiBeProfile(int profileFragmentId) { - Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(profileFragmentId); + private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) { + Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId); List<List<RuntimeProfile>> allPipelines = Lists.newArrayList(); int pipelineSize = 0; for (List<RuntimeProfile> profiles : multiPipeline.values()) { @@ -130,7 +137,7 @@ public class ExecutionProfile { for (int i = 0; i < fragmentProfiles.size(); ++i) { RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); fragmentsProfile.addChild(newFragmentProfile); - List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(i); + List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i)); int pipelineIdx = 0; for (List<RuntimeProfile> allPipelineTask : allPipelines) { RuntimeProfile mergedpipelineProfile = new RuntimeProfile( @@ -148,7 +155,7 @@ public class ExecutionProfile { private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) { RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); for (int i = 0; i < fragmentProfiles.size(); ++i) { - RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i); + RuntimeProfile oldFragmentProfile = fragmentProfiles.get(seqNoToFragmentId.get(i)); RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); fragmentsProfile.addChild(newFragmentProfile); List<RuntimeProfile> allInstanceProfiles = new ArrayList<RuntimeProfile>(); @@ -164,7 +171,7 @@ public class ExecutionProfile { } public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { - if (enablePipelineX()) { + if (isPipelineXProfile) { /* * Fragment 0 * ---Pipeline 0 @@ -211,143 +218,143 @@ public class ExecutionProfile { } } - public RuntimeProfile getExecutionProfile() { - return executionProfile; - } - - public RuntimeProfile getLoadChannelProfile() { - return loadChannelProfile; - } - - public List<RuntimeProfile> getFragmentProfiles() { - return fragmentProfiles; + public RuntimeProfile getRoot() { + return root; } - public void addToProfileAsChild(RuntimeProfile rootProfile) { - rootProfile.addChild(executionProfile); + public void setPipelineX() { + this.isPipelineXProfile = true; } - public void markInstances(Set<TUniqueId> instanceIds) { - profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size()); - for (TUniqueId instanceId : instanceIds) { - profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); + // The execution profile is maintained in ProfileManager, if it is finished, then should + // remove it from it as soon as possible. + public void update(long startTime, boolean isFinished) { + if (this.isFinished) { + return; } - } - - private boolean enablePipelineX() { - return profileFragmentDoneSignal != null; - } - - public void markFragments(int fragments) { - profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments); - lock = new ReentrantReadWriteLock(); - befragmentDone = new HashMap<>(); - for (int fragmentId = 0; fragmentId < fragments; fragmentId++) { - profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */); - befragmentDone.put(fragmentId, 0); + this.isFinished = isFinished; + this.startTime = startTime; + if (startTime > 0) { + root.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime)); } - } - public void addFragments(int fragmentId) { - lock.writeLock().lock(); - try { - befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1); - } finally { - lock.writeLock().unlock(); + for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) { + fragmentProfile.sortChildren(); } } - public void update(long startTime, boolean isFinished) { - if (startTime > 0) { - executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime)); - } - // Wait for all backends to finish reporting when writing profile last time. - if (isFinished && profileDoneSignal != null) { - try { - profileDoneSignal.await(2, TimeUnit.SECONDS); - } catch (InterruptedException e1) { - LOG.warn("signal await error", e1); + public void updateProfile(TReportExecStatusParams params, TNetworkAddress address) { + if (isPipelineXProfile) { + int pipelineIdx = 0; + List<RuntimeProfile> taskProfile = Lists.newArrayList(); + for (TDetailedReportParams param : params.detailed_report) { + String name = "Pipeline :" + pipelineIdx + " " + + " (host=" + address + ")"; + RuntimeProfile profile = new RuntimeProfile(name); + taskProfile.add(profile); + if (param.isSetProfile()) { + profile.update(param.profile); + } + if (params.done) { + profile.setIsDone(true); + } + pipelineIdx++; + fragmentProfiles.get(params.fragment_id).addChild(profile); } - } - - if (isFinished && profileFragmentDoneSignal != null) { - try { - profileFragmentDoneSignal.await(2, TimeUnit.SECONDS); - } catch (InterruptedException e1) { - LOG.warn("signal await error", e1); + // TODO ygl: is this right? there maybe multi Backends, what does + // update load profile do??? + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); + } + multiBeProfile.get(params.fragment_id).put(address, taskProfile); + } else { + PlanFragmentId fragmentId = instanceIdToFragmentId.get(params.fragment_instance_id); + if (fragmentId == null) { + LOG.warn("Could not find related fragment for instance {}", + DebugUtil.printId(params.fragment_instance_id)); + return; + } + // Do not use fragment id in params, because non-pipeline engine will set it to -1 + Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId); + if (instanceProfiles == null) { + LOG.warn("Could not find related instances for fragment {}", fragmentId); + return; + } + RuntimeProfile instanceProfile = instanceProfiles.get(params.fragment_instance_id); + if (instanceProfile == null) { + LOG.warn("Could not find related instance {}", params.fragment_instance_id); + return; + } + if (params.isSetProfile()) { + instanceProfile.update(params.profile); + } + if (params.isSetDone() && params.isDone()) { + instanceProfile.setIsDone(true); + } + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); } - } - - for (RuntimeProfile fragmentProfile : fragmentProfiles) { - fragmentProfile.sortChildren(); } } - public void onCancel() { - if (profileDoneSignal != null) { - // count down to zero to notify all objects waiting for this - profileDoneSignal.countDownToZero(new Status()); + // MultiInstances may update the profile concurrently + public synchronized void addInstanceProfile(PlanFragmentId fragmentId, TUniqueId instanceId, + RuntimeProfile instanceProfile) { + Map<TUniqueId, RuntimeProfile> instanceProfiles = fragmentInstancesProfiles.get(fragmentId); + if (instanceProfiles == null) { + instanceProfiles = Maps.newHashMap(); + fragmentInstancesProfiles.put(fragmentId, instanceProfiles); } - if (profileFragmentDoneSignal != null) { - profileFragmentDoneSignal.countDownToZero(new Status()); + RuntimeProfile existingInstanceProfile = instanceProfiles.get(instanceId); + if (existingInstanceProfile == null) { + instanceProfiles.put(instanceId, instanceProfile); + instanceIdToFragmentId.put(instanceId, fragmentId); + fragmentProfiles.get(fragmentId.asInt()).addChild(instanceProfile); + return; } } - public void markOneInstanceDone(TUniqueId fragmentInstanceId) { - if (profileDoneSignal != null) { - if (!profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) { - LOG.warn("Mark instance {} done failed", DebugUtil.printId(fragmentInstanceId)); - } - } + public synchronized void addFragmentBackend(PlanFragmentId fragmentId, Long backendId) { + fragmentIdBeNum.put(fragmentId.asInt(), fragmentIdBeNum.get(fragmentId.asInt()) + 1); } - public void markOneFragmentDone(int fragmentId) { - if (profileFragmentDoneSignal != null) { - lock.writeLock().lock(); - try { - befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) - 1); - if (befragmentDone.get(fragmentId) == 0) { - if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) { - LOG.warn("Mark fragment {} done failed", fragmentId); - } - } - } finally { - lock.writeLock().unlock(); - } - } + public TUniqueId getQueryId() { + return queryId; } - public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException { - if (profileDoneSignal == null) { - return true; + // Check all fragments's child, if all finished, then this execution profile is finished + public boolean isCompleted() { + for (Entry<Integer, RuntimeProfile> element : fragmentProfiles.entrySet()) { + RuntimeProfile fragmentProfile = element.getValue(); + // If any fragment is empty, it means BE does not report the profile, then the total + // execution profile is not completed. + if (fragmentProfile.isEmpty() + || fragmentProfile.getChildList().size() < fragmentIdBeNum.get(element.getKey())) { + return false; + } + for (Pair<RuntimeProfile, Boolean> runtimeProfile : fragmentProfile.getChildList()) { + // If any child instance profile is not ready, then return false. + if (!(runtimeProfile.first.getIsDone() || runtimeProfile.first.getIsCancel())) { + return false; + } + } } - return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS); + return true; } - public boolean awaitAllFragmentsDone(long waitTimeS) throws InterruptedException { - if (profileFragmentDoneSignal == null) { - return true; - } - return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS); + public long getQueryFinishTime() { + return queryFinishTime; } - public boolean isAllInstancesDone() { - if (profileDoneSignal == null) { - return true; - } - return profileDoneSignal.getCount() == 0; + public void setQueryFinishTime(long queryFinishTime) { + this.queryFinishTime = queryFinishTime; } - public boolean isAllFragmentsDone() { - if (profileFragmentDoneSignal == null) { - return true; - } - return profileFragmentDoneSignal.getCount() == 0; + public SummaryProfile getSummaryProfile() { + return summaryProfile; } - public void addInstanceProfile(int fragmentId, RuntimeProfile instanceProfile) { - Preconditions.checkArgument(fragmentId < fragmentProfiles.size(), - fragmentId + " vs. " + fragmentProfiles.size()); - fragmentProfiles.get(fragmentId).addChild(instanceProfile); + public void setSummaryProfile(SummaryProfile summaryProfile) { + this.summaryProfile = summaryProfile; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 5f3ed601630..12ba687bfd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -45,11 +45,16 @@ import java.util.Map; * ExecutionProfile1: Fragment 0: Fragment 1: ... * ExecutionProfile2: Fragment 0: Fragment 1: ... * + * ExecutionProfile: Fragment 0: Fragment 1: ... + * And also summary profile contains plan information, but execution profile is for + * be execution time. + * StmtExecutor(Profile) ---> Coordinator(ExecutionProfile) */ public class Profile { private static final Logger LOG = LogManager.getLogger(Profile.class); private static final int MergedProfileLevel = 1; - private RuntimeProfile rootProfile; + private final String name; + private final boolean isPipelineX; private SummaryProfile summaryProfile; private List<ExecutionProfile> executionProfiles = Lists.newArrayList(); private boolean isFinished; @@ -57,51 +62,57 @@ public class Profile { private int profileLevel = 3; - public Profile(String name, boolean isEnable) { - this.rootProfile = new RuntimeProfile(name); - this.summaryProfile = new SummaryProfile(rootProfile); + public Profile(String name, boolean isEnable, int profileLevel, boolean isPipelineX) { + this.name = name; + this.isPipelineX = isPipelineX; + this.summaryProfile = new SummaryProfile(); // if disabled, just set isFinished to true, so that update() will do nothing this.isFinished = !isEnable; + this.profileLevel = profileLevel; } + // For load task, the profile contains many execution profiles public void addExecutionProfile(ExecutionProfile executionProfile) { if (executionProfile == null) { LOG.warn("try to set a null excecution profile, it is abnormal", new Exception()); return; } + if (this.isPipelineX) { + executionProfile.setPipelineX(); + } + executionProfile.setSummaryProfile(summaryProfile); this.executionProfiles.add(executionProfile); - executionProfile.addToProfileAsChild(rootProfile); } - public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished, - int profileLevel, Planner planner, boolean isPipelineX) { + public List<ExecutionProfile> getExecutionProfiles() { + return this.executionProfiles; + } + + // This API will also add the profile to ProfileManager, so that we could get the profile from ProfileManager. + // isFinished ONLY means the cooridnator or stmtexecutor is finished. + public synchronized void updateSummary(long startTime, Map<String, String> summaryInfo, boolean isFinished, + Planner planner) { try { if (this.isFinished) { return; } summaryProfile.update(summaryInfo); for (ExecutionProfile executionProfile : executionProfiles) { + // Tell execution profile the start time executionProfile.update(startTime, isFinished); } - rootProfile.computeTimeInProfile(); // Nerids native insert not set planner, so it is null if (planner != null) { this.planNodeMap = planner.getExplainStringMap(); } - rootProfile.setIsPipelineX(isPipelineX); ProfileManager.getInstance().pushProfile(this); this.isFinished = isFinished; - this.profileLevel = profileLevel; } catch (Throwable t) { LOG.warn("update profile failed", t); throw t; } } - public RuntimeProfile getRootProfile() { - return this.rootProfile; - } - public SummaryProfile getSummaryProfile() { return summaryProfile; } @@ -110,7 +121,7 @@ public class Profile { StringBuilder builder = new StringBuilder(); // add summary to builder summaryProfile.prettyPrint(builder); - LOG.info(builder.toString()); + waitProfileCompleteIfNeeded(); // Only generate merged profile for select, insert into select. // Not support broker load now. if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) { @@ -125,7 +136,7 @@ public class Profile { try { for (ExecutionProfile executionProfile : executionProfiles) { builder.append("\n"); - executionProfile.getExecutionProfile().prettyPrint(builder, ""); + executionProfile.getRoot().prettyPrint(builder, ""); } } catch (Throwable aggProfileException) { LOG.warn("build profile failed", aggProfileException); @@ -134,7 +145,44 @@ public class Profile { return builder.toString(); } + // If the query is already finished, and user wants to get the profile, we should check + // if BE has reported all profiles, if not, sleep 2s. + private void waitProfileCompleteIfNeeded() { + if (!this.isFinished) { + return; + } + boolean allCompleted = true; + for (ExecutionProfile executionProfile : executionProfiles) { + if (!executionProfile.isCompleted()) { + allCompleted = false; + break; + } + } + if (!allCompleted) { + try { + Thread.currentThread().sleep(2000); + } catch (InterruptedException e) { + // Do nothing + } + } + } + + private RuntimeProfile composeRootProfile() { + + RuntimeProfile rootProfile = new RuntimeProfile(name); + rootProfile.setIsPipelineX(isPipelineX); + rootProfile.addChild(summaryProfile.getSummary()); + rootProfile.addChild(summaryProfile.getExecutionSummary()); + for (ExecutionProfile executionProfile : executionProfiles) { + rootProfile.addChild(executionProfile.getRoot()); + } + rootProfile.computeTimeInProfile(); + return rootProfile; + } + public String getProfileBrief() { + waitProfileCompleteIfNeeded(); + RuntimeProfile rootProfile = composeRootProfile(); Gson gson = new GsonBuilder().setPrettyPrinting().create(); return gson.toJson(rootProfile.toBrief()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index e9389b48b99..0d07e865d02 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -144,12 +144,22 @@ public class SummaryProfile { private long queryFetchResultConsumeTime = 0; private long queryWriteResultConsumeTime = 0; - public SummaryProfile(RuntimeProfile rootProfile) { + public SummaryProfile() { summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME); executionSummaryProfile = new RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME); init(); - rootProfile.addChild(summaryProfile); - rootProfile.addChild(executionSummaryProfile); + } + + public String getProfileId() { + return this.summaryProfile.getInfoString(PROFILE_ID); + } + + public RuntimeProfile getSummary() { + return summaryProfile; + } + + public RuntimeProfile getExecutionSummary() { + return executionSummaryProfile; } private void init() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 8644fea6221..70bb21a27e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -21,12 +21,14 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.profile.MultiProfileTreeBuilder; import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -104,6 +106,9 @@ public class ProfileManager { // record the order of profiles by queryId private Deque<String> queryIdDeque; private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId to RuntimeProfile + // Sometimes one Profile is related with multiple execution profiles(Brokerload), so that + // execution profile's query id is not related with Profile's query id. + private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles; public static ProfileManager getInstance() { if (INSTANCE == null) { @@ -122,23 +127,56 @@ public class ProfileManager { writeLock = lock.writeLock(); queryIdDeque = new LinkedList<>(); queryIdToProfileMap = new ConcurrentHashMap<>(); + queryIdToExecutionProfiles = Maps.newHashMap(); } - public ProfileElement createElement(Profile profile) { + private ProfileElement createElement(Profile profile) { ProfileElement element = new ProfileElement(profile); element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings()); - MultiProfileTreeBuilder builder = new MultiProfileTreeBuilder(profile.getRootProfile()); + // Not init builder any more, we will not maintain it since 2.1.0, because the structure + // assume that the execution profiles structure is already known before execution. But in + // PipelineX Engine, it will changed during execution. + return element; + } + + public void addExecutionProfile(ExecutionProfile executionProfile) { + if (executionProfile == null) { + return; + } + writeLock.lock(); try { - builder.build(); - } catch (Exception e) { - element.errMsg = e.getMessage(); + if (queryIdToExecutionProfiles.containsKey(executionProfile.getQueryId())) { + return; + } + queryIdToExecutionProfiles.put(executionProfile.getQueryId(), executionProfile); if (LOG.isDebugEnabled()) { - LOG.debug("failed to build profile tree", e); + LOG.debug("Add execution profile {} to profile manager", + DebugUtil.printId(executionProfile.getQueryId())); } - return element; + // Check if there are some query profiles that not finish collecting, should + // remove them to release memory. + if (queryIdToExecutionProfiles.size() > 2 * Config.max_query_profile_num) { + List<ExecutionProfile> finishOrExpireExecutionProfiles = Lists.newArrayList(); + for (ExecutionProfile tmpProfile : queryIdToExecutionProfiles.values()) { + if (System.currentTimeMillis() - tmpProfile.getQueryFinishTime() + > Config.profile_async_collect_expire_time_secs * 1000) { + finishOrExpireExecutionProfiles.add(tmpProfile); + } + } + for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) { + queryIdToExecutionProfiles.remove(tmp.getQueryId()); + if (LOG.isDebugEnabled()) { + LOG.debug("Remove expired execution profile {}", DebugUtil.printId(tmp.getQueryId())); + } + } + } + } finally { + writeLock.unlock(); } - element.builder = builder; - return element; + } + + public ExecutionProfile getExecutionProfile(TUniqueId queryId) { + return this.queryIdToExecutionProfiles.get(queryId); } public void pushProfile(Profile profile) { @@ -148,14 +186,13 @@ public class ProfileManager { ProfileElement element = createElement(profile); // 'insert into' does have job_id, put all profiles key with query_id - String key = element.infoStrings.get(SummaryProfile.PROFILE_ID); + String key = element.profile.getSummaryProfile().getProfileId(); // check when push in, which can ensure every element in the list has QUERY_ID column, // so there is no need to check when remove element from list. if (Strings.isNullOrEmpty(key)) { LOG.warn("the key or value of Map is null, " + "may be forget to insert 'QUERY_ID' or 'JOB_ID' column into infoStrings"); } - writeLock.lock(); // a profile may be updated multiple times in queryIdToProfileMap, // and only needs to be inserted into the queryIdDeque for the first time. @@ -163,7 +200,13 @@ public class ProfileManager { try { if (!queryIdDeque.contains(key)) { if (queryIdDeque.size() >= Config.max_query_profile_num) { - queryIdToProfileMap.remove(queryIdDeque.getFirst()); + ProfileElement profileElementRemoved = queryIdToProfileMap.remove(queryIdDeque.getFirst()); + // If the Profile object is removed from manager, then related execution profile is also useless. + if (profileElementRemoved != null) { + for (ExecutionProfile executionProfile : profileElementRemoved.profile.getExecutionProfiles()) { + this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId()); + } + } queryIdDeque.removeFirst(); } queryIdDeque.addLast(key); @@ -173,6 +216,21 @@ public class ProfileManager { } } + public void removeProfile(String profileId) { + writeLock.lock(); + try { + ProfileElement profileElementRemoved = queryIdToProfileMap.remove(profileId); + // If the Profile object is removed from manager, then related execution profile is also useless. + if (profileElementRemoved != null) { + for (ExecutionProfile executionProfile : profileElementRemoved.profile.getExecutionProfiles()) { + this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId()); + } + } + } finally { + writeLock.unlock(); + } + } + public List<List<String>> getAllQueries() { return getQueryWithType(null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index eab9b3b6734..372b84fa3f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -26,6 +26,7 @@ import org.apache.doris.thrift.TRuntimeProfileTree; import org.apache.doris.thrift.TUnit; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -69,7 +70,8 @@ public class RuntimeProfile { private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock(); private List<String> planNodeInfos = Lists.newArrayList(); - private String name; + // name should not changed. + private final String name; private Long timestamp = -1L; @@ -85,22 +87,24 @@ public class RuntimeProfile { private int nodeid = -1; public RuntimeProfile(String name) { - this(); + this.localTimePercent = 0; + if (Strings.isNullOrEmpty(name)) { + throw new RuntimeException("Profile name must not be null"); + } this.name = name; this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1); + this.counterMap.put("TotalTime", counterTotalTime); } public RuntimeProfile(String name, int nodeId) { - this(); + this.localTimePercent = 0; + if (Strings.isNullOrEmpty(name)) { + throw new RuntimeException("Profile name must not be null"); + } this.name = name; this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3); - this.nodeid = nodeId; - } - - public RuntimeProfile() { - this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1); - this.localTimePercent = 0; this.counterMap.put("TotalTime", counterTotalTime); + this.nodeid = nodeId; } public void setIsCancel(Boolean isCancel) { @@ -143,10 +147,6 @@ public class RuntimeProfile { this.isPipelineX = isPipelineX; } - public boolean getIsPipelineX() { - return this.isPipelineX; - } - public Map<String, Counter> getCounterMap() { return counterMap; } @@ -155,6 +155,10 @@ public class RuntimeProfile { return childList; } + public boolean isEmpty() { + return childList.isEmpty(); + } + public Map<String, RuntimeProfile> getChildMap() { return childMap; } @@ -750,10 +754,6 @@ public class RuntimeProfile { } } - public void setName(String name) { - this.name = name; - } - // Returns the value to which the specified key is mapped; // or null if this map contains no mapping for the key. public String getInfoString(String key) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 27503d4cc78..02f9bb0a3e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -199,7 +199,9 @@ public class BrokerLoadJob extends BulkLoadJob { // divide job into broker loading task by table List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList(); if (enableProfile) { - this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true); + this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true, + Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), + false); } ProgressManager progressManager = Env.getCurrentProgressManager(); progressManager.registerProgressSimple(String.valueOf(id)); @@ -329,16 +331,6 @@ public class BrokerLoadJob extends BulkLoadJob { } } - private void writeProfile() { - if (!enableProfile) { - return; - } - jobProfile.update(createTimestamp, getSummaryInfo(true), true, - Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), null, false); - // jobProfile has been pushed into ProfileManager, remove reference in brokerLoadJob - jobProfile = null; - } - private Map<String, String> getSummaryInfo(boolean isFinished) { long currentTimestamp = System.currentTimeMillis(); SummaryBuilder builder = new SummaryBuilder(); @@ -410,7 +402,12 @@ public class BrokerLoadJob extends BulkLoadJob { @Override public void afterVisible(TransactionState txnState, boolean txnOperated) { super.afterVisible(txnState, txnOperated); - writeProfile(); + if (!enableProfile) { + return; + } + jobProfile.updateSummary(createTimestamp, getSummaryInfo(true), true, null); + // jobProfile has been pushed into ProfileManager, remove reference in brokerLoadJob + jobProfile = null; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 94fb49d6c85..ef7a07cb9a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -172,8 +172,11 @@ public class LoadLoadingTask extends LoadTask { } try { - QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); + QeProcessorImpl.INSTANCE.registerQuery(loadId, new QeProcessorImpl.QueryInfo(curCoordinator)); actualExecute(curCoordinator, timeoutS); + if (this.jobProfile != null) { + curCoordinator.getExecutionProfile().update(beginTime, true); + } } finally { QeProcessorImpl.INSTANCE.unregisterQuery(loadId); } @@ -198,8 +201,6 @@ public class LoadLoadingTask extends LoadTask { ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos() .stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList()))); curCoordinator.getErrorTabletInfos().clear(); - // Create profile of this task and add to the job profile. - createProfile(curCoordinator); } else { throw new LoadException(status.getErrorMsg()); } @@ -212,15 +213,6 @@ public class LoadLoadingTask extends LoadTask { return jobDeadlineMs - System.currentTimeMillis(); } - private void createProfile(Coordinator coord) { - if (jobProfile == null) { - // No need to gather profile - return; - } - // Summary profile - coord.getExecutionProfile().update(beginTime, true); - } - @Override public void updateRetryInfo() { super.updateRetryInfo(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index cea0efc6fe7..45c96bd742a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -127,7 +127,9 @@ public class LoadCommand extends Command implements ForwardWithSync { if (!Config.enable_nereids_load) { throw new AnalysisException("Fallback to legacy planner temporary."); } - this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile); + this.profile = new Profile("Query", ctx.getSessionVariable().enableProfile, + ctx.getSessionVariable().profileLevel, + ctx.getSessionVariable().getEnablePipelineXEngine()); profile.getSummaryProfile().setQueryBeginTime(); if (sourceInfos.size() == 1) { plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)), diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java index 50a6bd6495b..5718e68c6b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java @@ -25,8 +25,6 @@ public interface CoordInterface { public RowBatch getNext() throws Exception; - public int getInstanceTotalNum(); - public void cancel(Types.PPlanFragmentCancelReason cancelReason); // When call exec or get next data finished, should call this method to release diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1b99da152f9..64b07e518a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -23,9 +23,11 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; +import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; +import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; @@ -80,7 +82,6 @@ import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TBrokerScanRange; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDescriptorTable; -import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; @@ -122,6 +123,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; @@ -142,13 +146,13 @@ import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import java.util.stream.Stream; public class Coordinator implements CoordInterface { private static final Logger LOG = LogManager.getLogger(Coordinator.class); @@ -158,6 +162,9 @@ public class Coordinator implements CoordInterface { // Random is used to shuffle instances of partitioned private static final Random instanceRandom = new SecureRandom(); + private static ExecutorService backendRpcCallbackExecutor = ThreadPoolManager.newDaemonProfileThreadPool(32, 100, + "backend-rpc-callback", true); + // Overall status of the entire query; set to the first reported fragment error // status or to CANCELLED, if Cancel() is called. Status queryStatus = new Status(); @@ -188,15 +195,12 @@ public class Coordinator implements CoordInterface { // coordinator still needs to wait for cleanup on remote fragments (e.g. queries // with limit) // Once this is set to true, errors from remote fragments are ignored. - private boolean returnedAllResults; - - private List<RuntimeProfile> fragmentProfile; + private boolean returnedAllResults = false; // populated in computeFragmentExecParams() private final Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap(); private final List<PlanFragment> fragments; - private int instanceTotalNum; private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap(); private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = Maps.newHashMap(); @@ -267,6 +271,15 @@ public class Coordinator implements CoordInterface { private StatsErrorEstimator statsErrorEstimator; + // A countdown latch to mark the completion of each instance. + // use for old pipeline + // instance id -> dummy value + private MarkedCountDownLatch<TUniqueId, Long> instancesDoneLatch = null; + + // A countdown latch to mark the completion of each fragment. use for pipelineX + // fragmentid -> backendid + private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null; + public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) { this.tWorkloadGroups = tWorkloadGroups; } @@ -335,7 +348,7 @@ public class Coordinator implements CoordInterface { nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = planner.getRuntimeFilters(); - this.executionProfile = new ExecutionProfile(queryId, fragments.size()); + this.executionProfile = new ExecutionProfile(queryId, fragments); } @@ -357,7 +370,7 @@ public class Coordinator implements CoordInterface { this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); - this.executionProfile = new ExecutionProfile(queryId, fragments.size()); + this.executionProfile = new ExecutionProfile(queryId, fragments); } private void setFromUserProperty(ConnectContext connectContext) { @@ -510,11 +523,6 @@ public class Coordinator implements CoordInterface { return result; } - @Override - public int getInstanceTotalNum() { - return instanceTotalNum; - } - // Initialize private void prepare() { for (PlanFragment fragment : fragments) { @@ -600,7 +608,6 @@ public class Coordinator implements CoordInterface { Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); - executionProfile.markInstances(instanceIds); List<TExecPlanFragmentParams> tExecPlanFragmentParams = ((FragmentExecParams) this.fragmentExecParamsMap.values().toArray()[0]).toThrift(0); TExecPlanFragmentParams fragmentParams = tExecPlanFragmentParams.get(0); @@ -708,11 +715,6 @@ public class Coordinator implements CoordInterface { Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } - if (enablePipelineXEngine) { - executionProfile.markFragments(fragments.size()); - } else { - executionProfile.markInstances(instanceIds); - } if (enablePipelineEngine) { sendPipelineCtx(); @@ -775,7 +777,6 @@ public class Coordinator implements CoordInterface { // 1. set up exec states int instanceNum = params.instanceExecParams.size(); Preconditions.checkState(instanceNum > 0); - instanceTotalNum += instanceNum; List<TExecPlanFragmentParams> tParams = params.toThrift(backendIdx); // 2. update memory limit for colocate join @@ -802,8 +803,7 @@ public class Coordinator implements CoordInterface { for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID, - executionProfile.getLoadChannelProfile()); + tParam, this.addressToBackendID, executionProfile); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -870,9 +870,6 @@ public class Coordinator implements CoordInterface { } waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); } - if (context != null && context.getSessionVariable().enableProfile()) { - attachInstanceProfileToFragmentProfile(); - } } finally { unlock(); } @@ -891,8 +888,9 @@ public class Coordinator implements CoordInterface { int backendIdx = 0; int profileFragmentId = 0; beToPipelineExecCtxs.clear(); - // If #fragments > 1 and BE amount is bigger than 1, use twoPhaseExecution with - // exec_plan_fragments_prepare and exec_plan_fragments_start, + // fragment:backend + List<Pair<PlanFragmentId, Long>> backendFragments = Lists.newArrayList(); + // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, // else use exec_plan_fragments directly. // we choose #fragments > 1 because in some cases // we need ensure that A fragment is already prepared to receive data before B fragment sends data. @@ -918,12 +916,10 @@ public class Coordinator implements CoordInterface { needCheckBackendState = true; } - Map<TUniqueId, RuntimeProfile> fragmentInstancesMap = new HashMap<TUniqueId, RuntimeProfile>(); + Map<TUniqueId, Boolean> fragmentInstancesMap = new HashMap<TUniqueId, Boolean>(); for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) { for (TPipelineInstanceParams instanceParam : entry.getValue().local_params) { - String name = "Instance " + DebugUtil.printId(instanceParam.fragment_instance_id) - + " (host=" + entry.getKey() + ")"; - fragmentInstancesMap.put(instanceParam.fragment_instance_id, new RuntimeProfile(name)); + fragmentInstancesMap.put(instanceParam.fragment_instance_id, false); } } @@ -932,10 +928,10 @@ public class Coordinator implements CoordInterface { // So that we can use one RPC to send all fragment instances of a BE. for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) { Long backendId = this.addressToBackendID.get(entry.getKey()); + backendFragments.add(Pair.of(fragment.getFragmentId(), backendId)); PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(), - profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap, - executionProfile.getLoadChannelProfile(), this.enablePipelineXEngine, - this.executionProfile); + entry.getValue(), backendId, fragmentInstancesMap, + this.enablePipelineXEngine, executionProfile); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -988,6 +984,14 @@ public class Coordinator implements CoordInterface { profileFragmentId += 1; } // end for fragments + // Init the mark done in order to track the finished state of the query + if (this.enablePipelineXEngine) { + fragmentsDoneLatch = new MarkedCountDownLatch<>(backendFragments.size()); + for (Pair<PlanFragmentId, Long> pair : backendFragments) { + fragmentsDoneLatch.addMark(pair.first.asInt(), pair.second); + } + } + // 4. send and wait fragments rpc List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList(); @@ -1019,9 +1023,6 @@ public class Coordinator implements CoordInterface { } waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); } - if (context != null && context.getSessionVariable().enableProfile()) { - attachInstanceProfileToFragmentProfile(); - } } finally { unlock(); } @@ -1446,8 +1447,10 @@ public class Coordinator implements CoordInterface { lock(); try { if (!queryStatus.ok()) { - // we can't cancel twice - return; + // Print an error stack here to know why send cancel again. + LOG.warn("Query {} already in abnormal status {}, but received cancel again," + + "so that send cancel to BE again", + DebugUtil.printId(queryId), queryStatus.toString(), new Exception()); } else { queryStatus.setStatus(Status.CANCELLED); } @@ -1459,6 +1462,15 @@ public class Coordinator implements CoordInterface { } } + private void cancelLatch() { + if (instancesDoneLatch != null) { + instancesDoneLatch.countDownToZero(new Status()); + } + if (fragmentsDoneLatch != null) { + fragmentsDoneLatch.countDownToZero(new Status()); + } + } + private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) { if (null != receiver) { receiver.cancel(cancelReason.toString()); @@ -1468,7 +1480,7 @@ public class Coordinator implements CoordInterface { return; } cancelRemoteFragmentsAsync(cancelReason); - executionProfile.onCancel(); + cancelLatch(); } private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { @@ -1507,6 +1519,12 @@ public class Coordinator implements CoordInterface { } } + // Init instancesDoneLatch, it will be used to track if the instances has finished for insert stmt + instancesDoneLatch = new MarkedCountDownLatch<>(instanceIds.size()); + for (TUniqueId instanceId : instanceIds) { + instancesDoneLatch.addMark(instanceId, -1L /* value is meaningless */); + } + // compute multi cast fragment params computeMultiCastFragmentParams(); @@ -2437,31 +2455,30 @@ public class Coordinator implements CoordInterface { public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineXEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (!ctx.updateProfile(params)) { + if (ctx == null || !ctx.updatePipelineStatus(params)) { return; } - // print fragment instance profile - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - ctx.printProfile(builder); - if (LOG.isDebugEnabled()) { - LOG.debug("profile for query_id={} fragment_id={}\n{}", - DebugUtil.printId(queryId), - params.getFragmentId(), - builder.toString()); - } - } - Status status = new Status(params.status); // for now, abort the query if we see any error except if the error is cancelled // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.getErrorMsg()); - updateStatus(status); + if (!status.ok()) { + if (returnedAllResults && status.isCancelled()) { + LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" + + " is reporting failed status {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), + status.toString()); + } else { + LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," + + " error message: {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), status.toString()); + updateStatus(status); + } } if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); @@ -2489,39 +2506,36 @@ public class Coordinator implements CoordInterface { if (ctx.done) { if (LOG.isDebugEnabled()) { LOG.debug("Query {} fragment {} is marked done", - DebugUtil.printId(queryId), ctx.profileFragmentId); + DebugUtil.printId(queryId), ctx.fragmentId); } - executionProfile.markOneFragmentDone(ctx.profileFragmentId); + fragmentsDoneLatch.markedCountDown(params.getFragmentId(), params.getBackendId()); } } else if (enablePipelineEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (!ctx.updateProfile(params)) { + if (ctx == null || !ctx.updatePipelineStatus(params)) { return; } - // print fragment instance profile - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - ctx.printProfile(builder); - if (LOG.isDebugEnabled()) { - LOG.debug("profile for query_id={} instance_id={}\n{}", - DebugUtil.printId(queryId), - DebugUtil.printId(params.getFragmentInstanceId()), - builder.toString()); - } - } - Status status = new Status(params.status); // for now, abort the query if we see any error except if the error is cancelled // and returned_all_results_ is true. // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) - if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { - LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," - + " error message: {}", - DebugUtil.printId(queryId), params.getFragmentId(), - DebugUtil.printId(params.getFragmentInstanceId()), - params.getBackendId(), status.getErrorMsg()); - updateStatus(status); + if (!status.ok()) { + if (returnedAllResults && status.isCancelled()) { + LOG.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}" + + " is reporting failed status {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), + status.toString()); + } else { + LOG.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={}," + + " error message: {}", + DebugUtil.printId(queryId), params.getFragmentId(), + DebugUtil.printId(params.getFragmentInstanceId()), + params.getBackendId(), status.toString()); + updateStatus(status); + } } // params.isDone() should be promised. @@ -2530,7 +2544,7 @@ public class Coordinator implements CoordInterface { // The last report causes the counter to decrease to zero, // but it is possible that the report without commit-info triggered the commit operation, // resulting in the data not being published. - if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone() && params.isDone()) { + if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) && params.isDone()) { if (params.isSetDeltaUrls()) { updateDeltas(params.getDeltaUrls()); } @@ -2556,7 +2570,7 @@ public class Coordinator implements CoordInterface { LOG.debug("Query {} instance {} is marked done", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); } - executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); + instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); } else { if (LOG.isDebugEnabled()) { LOG.debug("Query {} instance {} is not marked done", @@ -2571,22 +2585,14 @@ public class Coordinator implements CoordInterface { return; } BackendExecState execState = backendExecStates.get(params.backend_num); - if (!execState.updateProfile(params)) { + if (!execState.updateInstanceStatus(params)) { + // Has to return here, to avoid out of order report messages. For example, + // the first message is done, then we update commit messages, but the new + // message is running, then we will also update commit messages. It will + // lead to data corrupt. return; } - // print fragment instance profile - if (LOG.isDebugEnabled()) { - StringBuilder builder = new StringBuilder(); - execState.printProfile(builder); - if (LOG.isDebugEnabled()) { - LOG.debug("profile for query_id={} instance_id={}\n{}", - DebugUtil.printId(queryId), - DebugUtil.printId(params.getFragmentInstanceId()), - builder.toString()); - } - } - Status status = new Status(params.status); // for now, abort the query if we see any error except if the error is cancelled // and returned_all_results_ is true. @@ -2595,11 +2601,11 @@ public class Coordinator implements CoordInterface { if (status.isCancelled() && returnedAllResults) { LOG.warn("Query {} has returned all results, its instance {} is reporting failed status {}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.getErrorMsg()); + status.toString()); } else { LOG.warn("Instance {} of query {} report failed status, error msg: {}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), - status.getErrorMsg()); + status.toString()); updateStatus(status); } } @@ -2632,7 +2638,7 @@ public class Coordinator implements CoordInterface { if (params.isSetHivePartitionUpdates()) { updateHivePartitionUpdates(params.getHivePartitionUpdates()); } - executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); + instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L); } } @@ -2667,10 +2673,10 @@ public class Coordinator implements CoordInterface { long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime); boolean awaitRes = false; try { - if (enablePipelineXEngine) { - awaitRes = executionProfile.awaitAllFragmentsDone(waitTime); + if (fragmentsDoneLatch != null) { + awaitRes = fragmentsDoneLatch.await(waitTime, TimeUnit.SECONDS); } else { - awaitRes = executionProfile.awaitAllInstancesDone(waitTime); + awaitRes = instancesDoneLatch.await(waitTime, TimeUnit.SECONDS); } } catch (InterruptedException e) { // Do nothing @@ -2714,10 +2720,10 @@ public class Coordinator implements CoordInterface { } public boolean isDone() { - if (enablePipelineXEngine) { - return executionProfile.isAllFragmentsDone(); + if (fragmentsDoneLatch != null) { + return fragmentsDoneLatch.getCount() == 0; } else { - return executionProfile.isAllInstancesDone(); + return instancesDoneLatch.getCount() == 0; } } @@ -3024,20 +3030,17 @@ public class Coordinator implements CoordInterface { PlanFragmentId fragmentId; boolean initiated; volatile boolean done; - boolean hasCanceled; - int profileFragmentId; - RuntimeProfile instanceProfile; - RuntimeProfile loadChannelProfile; TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; long lastMissingHeartbeatTime = -1; TUniqueId instanceId; + private boolean hasCancelled = false; + private boolean cancelInProcess = false; - public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId, + public BackendExecState(PlanFragmentId fragmentId, int instanceId, TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID, - RuntimeProfile loadChannelProfile) { - this.profileFragmentId = profileFragmentId; + ExecutionProfile executionProfile) { this.fragmentId = fragmentId; this.rpcParams = rpcParams; this.initiated = false; @@ -3047,12 +3050,10 @@ public class Coordinator implements CoordInterface { this.address = fi.host; this.backend = idToBackend.get(addressToBackendID.get(address)); this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - - String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; - this.loadChannelProfile = loadChannelProfile; - this.instanceProfile = new RuntimeProfile(name); - this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); + String profileName = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; + RuntimeProfile instanceProfile = new RuntimeProfile(profileName); + executionProfile.addInstanceProfile(fragmentId, fi.instanceId, instanceProfile); } /** @@ -3070,19 +3071,12 @@ public class Coordinator implements CoordInterface { this.rpcParams.setIsSimplifiedParam(true); } - // update profile. - // return true if profile is updated. Otherwise, return false. - public synchronized boolean updateProfile(TReportExecStatusParams params) { + // update the instance status, if it is already finished, then not update any more. + public synchronized boolean updateInstanceStatus(TReportExecStatusParams params) { if (this.done) { // duplicate packet return false; } - if (params.isSetProfile()) { - instanceProfile.update(params.profile); - } - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); - } this.done = params.done; if (statsErrorEstimator != null) { statsErrorEstimator.updateExactReturnedRows(params); @@ -3090,54 +3084,75 @@ public class Coordinator implements CoordInterface { return true; } - public synchronized void printProfile(StringBuilder builder) { - this.instanceProfile.computeTimeInProfile(); - this.instanceProfile.prettyPrint(builder, ""); - } - // cancel the fragment instance. // return true if cancel success. Otherwise, return false - public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { - LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}," + public synchronized void cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { + LOG.warn("cancelRemoteFragments initiated={} done={} backend: {}," + " fragment instance id={}, reason: {}", - this.initiated, this.done, this.hasCanceled, backend.getId(), + this.initiated, this.done, backend.getId(), DebugUtil.printId(fragmentInstanceId()), cancelReason.name()); try { if (!this.initiated) { - return false; + return; } // don't cancel if it is already finished if (this.done) { - return false; + return; } - if (this.hasCanceled) { - return false; + if (this.hasCancelled || this.cancelInProcess) { + LOG.info("Fragment instance has already been cancelled {} or under cancel {}." + + " initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + this.hasCancelled, this.cancelInProcess, + this.initiated, this.done, backend.getId(), + DebugUtil.printId(fragmentInstanceId()), cancelReason.name()); + return; } - try { - BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, - fragmentInstanceId(), cancelReason); + ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult = + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + fragmentInstanceId(), cancelReason); + Futures.addCallback(cancelResult, new FutureCallback<InternalService.PCancelPlanFragmentResult>() { + public void onSuccess(InternalService.PCancelPlanFragmentResult result) { + cancelInProcess = false; + if (result.hasStatus()) { + Status status = new Status(); + status.setPstatus(result.getStatus()); + if (status.getErrorCode() == TStatusCode.OK) { + hasCancelled = true; + } else { + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(fragmentInstanceId()), status.toString()); + } + } + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(fragmentInstanceId()), "without status"); + } + + public void onFailure(Throwable t) { + cancelInProcess = false; + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(fragmentInstanceId()), cancelReason.name(), t); + } + }, backendRpcCallbackExecutor); + cancelInProcess = true; } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage()); } - this.hasCanceled = true; } catch (Exception e) { LOG.warn("catch a exception", e); - return false; - } - return true; - } - - public synchronized boolean computeTimeInProfile(int maxFragmentId) { - if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) { - LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); - return false; + return; } - instanceProfile.computeTimeInProfile(); - return true; + return; } public boolean isBackendStateHealthy() { @@ -3163,16 +3178,12 @@ public class Coordinator implements CoordInterface { TPipelineFragmentParams rpcParams; PlanFragmentId fragmentId; boolean initiated; - volatile boolean done; - boolean hasCanceled; + boolean done; // use for pipeline - Map<TUniqueId, RuntimeProfile> fragmentInstancesMap; + Map<TUniqueId, Boolean> fragmentInstancesMap; // use for pipelineX - List<RuntimeProfile> taskProfile; boolean enablePipelineX; - RuntimeProfile loadChannelProfile; - int profileFragmentId; TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; @@ -3180,19 +3191,17 @@ public class Coordinator implements CoordInterface { long profileReportProgress = 0; long beProcessEpoch = 0; private final int numInstances; - final ExecutionProfile executionProfile; + private boolean hasCancelled = false; + private boolean cancelInProcess = false; - public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, + public PipelineExecContext(PlanFragmentId fragmentId, TPipelineFragmentParams rpcParams, Long backendId, - Map<TUniqueId, RuntimeProfile> fragmentInstancesMap, - RuntimeProfile loadChannelProfile, boolean enablePipelineX, final ExecutionProfile executionProfile) { - this.profileFragmentId = profileFragmentId; + Map<TUniqueId, Boolean> fragmentInstancesMap, + boolean enablePipelineX, ExecutionProfile executionProfile) { this.fragmentId = fragmentId; this.rpcParams = rpcParams; this.numInstances = rpcParams.local_params.size(); this.fragmentInstancesMap = fragmentInstancesMap; - this.taskProfile = new ArrayList<RuntimeProfile>(); - this.loadChannelProfile = loadChannelProfile; this.initiated = false; this.done = false; @@ -3202,27 +3211,18 @@ public class Coordinator implements CoordInterface { this.brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); this.beProcessEpoch = backend.getProcessEpoch(); - this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); this.enablePipelineX = enablePipelineX; - this.executionProfile = executionProfile; - if (enablePipelineX) { - executionProfile.addFragments(profileFragmentId); - } - } - - public Stream<RuntimeProfile> profileStream() { - if (enablePipelineX) { - return taskProfile.stream(); + if (this.enablePipelineX) { + executionProfile.addFragmentBackend(fragmentId, backendId); + } else { + for (TPipelineInstanceParams instanceParam : rpcParams.local_params) { + String profileName = "Instance " + DebugUtil.printId(instanceParam.fragment_instance_id) + + " (host=" + address + ")"; + executionProfile.addInstanceProfile(fragmentId, instanceParam.fragment_instance_id, + new RuntimeProfile(profileName)); + } } - return fragmentInstancesMap.values().stream(); - } - - public void attachPipelineProfileToFragmentProfile() { - profileStream() - .forEach(p -> executionProfile.addInstanceProfile(this.profileFragmentId, p)); - executionProfile.addMultiBeProfileByPipelineX(profileFragmentId, address, - taskProfile); } /** @@ -3243,46 +3243,33 @@ public class Coordinator implements CoordInterface { // update profile. // return true if profile is updated. Otherwise, return false. - public synchronized boolean updateProfile(TReportExecStatusParams params) { + // Has to use synchronized to ensure there are not concurrent update threads. Or the done + // state maybe update wrong and will lose data. see https://github.com/apache/doris/pull/29802/files. + public synchronized boolean updatePipelineStatus(TReportExecStatusParams params) { + // The fragment or instance is not finished, not need update + if (!params.done) { + return false; + } if (enablePipelineX) { - taskProfile.clear(); - int pipelineIdx = 0; - for (TDetailedReportParams param : params.detailed_report) { - String name = "Pipeline :" + pipelineIdx + " " - + " (host=" + address + ")"; - RuntimeProfile profile = new RuntimeProfile(name); - taskProfile.add(profile); - if (param.isSetProfile()) { - profile.update(param.profile); - } - if (params.done) { - profile.setIsDone(true); - } - pipelineIdx++; - } - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); - } - this.done = params.done; - attachPipelineProfileToFragmentProfile(); - return this.done; - } else { - RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id); - if (params.done && profile.getIsDone()) { + if (this.done) { // duplicate packet return false; } - - if (params.isSetProfile()) { - profile.update(params.profile); - } - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); + this.done = true; + return true; + } else { + // could not find the related instances, not update and return false, to indicate + // that the caller should not update any more. + if (!fragmentInstancesMap.containsKey(params.fragment_instance_id)) { + return false; } - if (params.done) { - profile.setIsDone(true); - profileReportProgress++; + Boolean instanceDone = fragmentInstancesMap.get(params.fragment_instance_id); + if (instanceDone) { + // duplicate packet + return false; } + fragmentInstancesMap.put(params.fragment_instance_id, true); + profileReportProgress++; if (profileReportProgress == numInstances) { this.done = true; } @@ -3290,32 +3277,57 @@ public class Coordinator implements CoordInterface { } } - public synchronized void printProfile(StringBuilder builder) { - this.profileStream().forEach(p -> { - p.computeTimeInProfile(); - p.prettyPrint(builder, ""); - }); - } - - // cancel all fragment instances. - // return true if cancel success. Otherwise, return false - - private synchronized boolean cancelFragment(Types.PPlanFragmentCancelReason cancelReason) { - for (RuntimeProfile profile : taskProfile) { - profile.setIsCancel(true); - } + // Just send the cancel message to BE, not care about the result, because there is no retry + // logic in upper logic. + private synchronized void cancelFragment(Types.PPlanFragmentCancelReason cancelReason) { if (LOG.isDebugEnabled()) { - LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}," + LOG.debug("cancelRemoteFragments initiated={} done={} backend: {}," + " fragment id={} query={}, reason: {}", - this.initiated, this.done, this.hasCanceled, backend.getId(), - this.profileFragmentId, + this.initiated, this.done, backend.getId(), + this.fragmentId, DebugUtil.printId(queryId), cancelReason.name()); } + + if (this.hasCancelled || this.cancelInProcess) { + LOG.info("Frangment has already been cancelled. Query {} backend: {}, fragment id={}", + DebugUtil.printId(queryId), backend.getId(), this.fragmentId); + return; + } try { try { - BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress, + ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult = + BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress, this.fragmentId, queryId, cancelReason); - this.hasCanceled = true; + Futures.addCallback(cancelResult, new FutureCallback<InternalService.PCancelPlanFragmentResult>() { + public void onSuccess(InternalService.PCancelPlanFragmentResult result) { + cancelInProcess = false; + if (result.hasStatus()) { + Status status = new Status(); + status.setPstatus(result.getStatus()); + if (status.getErrorCode() == TStatusCode.OK) { + hasCancelled = true; + } else { + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + fragmentId, status.toString()); + } + } + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + fragmentId, "without status"); + } + + public void onFailure(Throwable t) { + cancelInProcess = false; + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + fragmentId, cancelReason.name(), t); + } + }, backendRpcCallbackExecutor); + cancelInProcess = true; } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); @@ -3323,78 +3335,91 @@ public class Coordinator implements CoordInterface { } } catch (Exception e) { LOG.warn("catch a exception", e); - return false; + return; } - return true; + return; } - private synchronized boolean cancelInstance(Types.PPlanFragmentCancelReason cancelReason) { + // Just send the cancel logic to BE, not care about the result, and there is no retry logic + // in upper logic. + private synchronized void cancelInstance(Types.PPlanFragmentCancelReason cancelReason) { for (TPipelineInstanceParams localParam : rpcParams.local_params) { - LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend:{}," + LOG.warn("cancelRemoteFragments initiated={} done={} backend:{}," + " fragment instance id={} query={}, reason: {}", - this.initiated, this.done, this.hasCanceled, backend.getId(), + this.initiated, this.done, backend.getId(), DebugUtil.printId(localParam.fragment_instance_id), DebugUtil.printId(queryId), cancelReason.name()); - - RuntimeProfile profile = fragmentInstancesMap.get(localParam.fragment_instance_id); - if (profile.getIsDone() || profile.getIsCancel()) { - continue; + if (this.hasCancelled || this.cancelInProcess) { + LOG.info("fragment instance has already been cancelled {} or in process {}. " + + "initiated={} done={} backend:{}," + + " fragment instance id={} query={}, reason: {}", + this.hasCancelled, this.cancelInProcess, + this.initiated, this.done, backend.getId(), + DebugUtil.printId(localParam.fragment_instance_id), + DebugUtil.printId(queryId), cancelReason.name()); + return; } - - this.hasCanceled = true; try { - try { - BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, - localParam.fragment_instance_id, cancelReason); - } catch (RpcException e) { - LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), - brpcAddress.getPort()); - SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage()); - } + ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult = + BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, + localParam.fragment_instance_id, cancelReason); + Futures.addCallback(cancelResult, new FutureCallback<InternalService.PCancelPlanFragmentResult>() { + public void onSuccess(InternalService.PCancelPlanFragmentResult result) { + cancelInProcess = false; + if (result.hasStatus()) { + Status status = new Status(); + status.setPstatus(result.getStatus()); + if (status.getErrorCode() == TStatusCode.OK) { + hasCancelled = true; + } else { + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(localParam.fragment_instance_id), status.toString()); + } + } + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(localParam.fragment_instance_id), "without status"); + } + + public void onFailure(Throwable t) { + cancelInProcess = false; + LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {}," + + "fragment instance id={}, reason: {}", + DebugUtil.printId(queryId), initiated, done, backend.getId(), + DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name(), t); + } + }, backendRpcCallbackExecutor); + cancelInProcess = true; } catch (Exception e) { LOG.warn("catch a exception", e); - return false; + return; } } - if (!this.hasCanceled) { - return false; - } - for (int i = 0; i < this.numInstances; i++) { - fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true); - } - return true; + return; } /// TODO: refactor rpcParams - public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { + public synchronized void cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { if (!this.initiated) { LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId)); - return false; + return; } // don't cancel if it is already finished if (this.done) { LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId)); - return false; - } - if (this.hasCanceled) { - LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId)); - return false; + return; } if (this.enablePipelineX) { - return cancelFragment(cancelReason); + cancelFragment(cancelReason); + return; } else { - return cancelInstance(cancelReason); - } - } - - public synchronized boolean computeTimeInProfile(int maxFragmentId) { - if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) { - LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); - return false; + cancelInstance(cancelReason); + return; } - // profile.computeTimeInProfile(); - return true; } public boolean isBackendStateHealthy() { @@ -4020,22 +4045,6 @@ public class Coordinator implements CoordInterface { return result; } - private void attachInstanceProfileToFragmentProfile() { - if (enablePipelineEngine) { - for (PipelineExecContext ctx : pipelineExecContexts.values()) { - if (!enablePipelineXEngine) { - ctx.profileStream() - .forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p)); - } - } - } else { - for (BackendExecState backendExecState : backendExecStates) { - executionProfile.addInstanceProfile(backendExecState.profileFragmentId, - backendExecState.instanceProfile); - } - } - } - // Runtime filter target fragment instance param static class FRuntimeFilterTargetParam { public TUniqueId targetFragmentInstanceId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java index 4b0302cc403..f41e9a4d896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java @@ -168,12 +168,6 @@ public class PointQueryExec implements CoordInterface { requestBuilder.addKeyTuples(kBuilder); } - @Override - public int getInstanceTotalNum() { - // TODO - return 1; - } - @Override public void cancel(Types.PPlanFragmentCancelReason cancelReason) { // Do nothing diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java index 44999ecef64..c5aff2c9d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java @@ -31,8 +31,6 @@ public interface QeProcessor { TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr); - void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException; - void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) throws UserException; void registerInstances(TUniqueId queryId, Integer instancesNum) throws UserException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 03144fc797c..a62f1b66f08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -23,6 +23,7 @@ import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.ProfileManager; import org.apache.doris.metric.MetricRepo; import org.apache.doris.resource.workloadgroup.QueueToken.TokenState; import org.apache.doris.thrift.TNetworkAddress; @@ -53,6 +54,7 @@ public final class QeProcessorImpl implements QeProcessor { private Map<TUniqueId, Integer> queryToInstancesNum; private Map<String, AtomicInteger> userToInstancesCount; + private ExecutorService writeProfileExecutor; public static final QeProcessor INSTANCE; @@ -60,15 +62,13 @@ public final class QeProcessorImpl implements QeProcessor { INSTANCE = new QeProcessorImpl(); } - private ExecutorService writeProfileExecutor; - private QeProcessorImpl() { coordinatorMap = new ConcurrentHashMap<>(); - // write profile to ProfileManager when query is running. - writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 100, - "profile-write-pool", true); queryToInstancesNum = new ConcurrentHashMap<>(); userToInstancesCount = new ConcurrentHashMap<>(); + // write profile to ProfileManager when query is running. + writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(3, 100, + "profile-write-pool", true); } @Override @@ -90,11 +90,6 @@ public final class QeProcessorImpl implements QeProcessor { return res; } - @Override - public void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException { - registerQuery(queryId, new QueryInfo(coord)); - } - @Override public void registerQuery(TUniqueId queryId, QueryInfo info) throws UserException { if (LOG.isDebugEnabled()) { @@ -104,6 +99,10 @@ public final class QeProcessorImpl implements QeProcessor { if (result != null) { throw new UserException("queryId " + queryId + " already exists"); } + + // Should add the execution profile to profile manager, BE will report the profile to FE and FE + // will update it in ProfileManager + ProfileManager.getInstance().addExecutionProfile(info.getCoord().getExecutionProfile()); } @Override @@ -145,7 +144,18 @@ public final class QeProcessorImpl implements QeProcessor { if (LOG.isDebugEnabled()) { LOG.debug("Deregister query id {}", DebugUtil.printId(queryId)); } - + ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(queryId); + if (executionProfile != null) { + executionProfile.setQueryFinishTime(System.currentTimeMillis()); + if (queryInfo.connectContext != null) { + long autoProfileThresholdMs = queryInfo.connectContext + .getSessionVariable().getAutoProfileThresholdMs(); + if (autoProfileThresholdMs > 0 && System.currentTimeMillis() - queryInfo.getStartExecTime() + < autoProfileThresholdMs) { + ProfileManager.getInstance().removeProfile(executionProfile.getSummaryProfile().getProfileId()); + } + } + } if (queryInfo.getConnectContext() != null && !Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser()) ) { @@ -187,7 +197,7 @@ public final class QeProcessorImpl implements QeProcessor { .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase()) .catalog(context.getDefaultCatalog()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) - .profile(info.getCoord().getExecutionProfile().getExecutionProfile()) + .profile(info.getCoord().getExecutionProfile().getRoot()) .isReportSucc(context.getSessionVariable().enableProfile()).build(); querySet.put(queryIdStr, item); } @@ -196,13 +206,25 @@ public final class QeProcessorImpl implements QeProcessor { @Override public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr) { - if (params.isSetProfile()) { + if (params.isSetProfile() || params.isSetLoadChannelProfile()) { LOG.info("ReportExecStatus(): fragment_instance_id={}, query id={}, backend num: {}, ip: {}", DebugUtil.printId(params.fragment_instance_id), DebugUtil.printId(params.query_id), params.backend_num, beAddr); if (LOG.isDebugEnabled()) { LOG.debug("params: {}", params); } + ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(params.query_id); + if (executionProfile != null) { + // Update profile may cost a lot of time, use a seperate pool to deal with it. + writeProfileExecutor.submit(new Runnable() { + @Override + public void run() { + executionProfile.updateProfile(params, beAddr); + } + }); + } else { + LOG.info("Could not find execution profile with query id {}", DebugUtil.printId(params.query_id)); + } } final TReportExecStatusResult result = new TReportExecStatusResult(); @@ -229,12 +251,9 @@ public final class QeProcessorImpl implements QeProcessor { } try { info.getCoord().updateFragmentExecStatus(params); - if (params.isSetProfile()) { - writeProfileExecutor.submit(new WriteProfileTask(params, info)); - } } catch (Exception e) { LOG.warn("Exception during handle report, response: {}, query: {}, instance: {}", result.toString(), - DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id)); + DebugUtil.printId(params.query_id), DebugUtil.printId(params.fragment_instance_id), e); return result; } result.setStatus(new TStatus(TStatusCode.OK)); @@ -266,6 +285,7 @@ public final class QeProcessorImpl implements QeProcessor { private final ConnectContext connectContext; private final Coordinator coord; private final String sql; + private long registerTimeMs = 0L; // from Export, Pull load, Insert public QueryInfo(Coordinator coord) { @@ -277,6 +297,7 @@ public final class QeProcessorImpl implements QeProcessor { this.connectContext = connectContext; this.coord = coord; this.sql = sql; + this.registerTimeMs = System.currentTimeMillis(); } public ConnectContext getConnectContext() { @@ -295,7 +316,7 @@ public final class QeProcessorImpl implements QeProcessor { if (coord.getQueueToken() != null) { return coord.getQueueToken().getQueueEndTime(); } - return -1; + return registerTimeMs; } public long getQueueStartTime() { @@ -319,26 +340,4 @@ public final class QeProcessorImpl implements QeProcessor { return null; } } - - private class WriteProfileTask implements Runnable { - private TReportExecStatusParams params; - - private QueryInfo queryInfo; - - WriteProfileTask(TReportExecStatusParams params, QueryInfo queryInfo) { - this.params = params; - this.queryInfo = queryInfo; - } - - @Override - public void run() { - QueryInfo info = coordinatorMap.get(params.query_id); - if (info == null) { - return; - } - - ExecutionProfile executionProfile = info.getCoord().getExecutionProfile(); - executionProfile.update(-1, false); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7ae0b9c6301..20a9ad42bd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -84,6 +84,7 @@ public class SessionVariable implements Serializable, Writable { public static final String MAX_EXECUTION_TIME = "max_execution_time"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; + public static final String AUTO_PROFILE_THRESHOLD_MS = "auto_profile_threshold_ms"; public static final String SQL_MODE = "sql_mode"; public static final String WORKLOAD_VARIABLE = "workload_group"; public static final String RESOURCE_VARIABLE = "resource_group"; @@ -629,6 +630,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true) public boolean enableProfile = false; + // if true, need report to coordinator when plan fragment execute successfully. + @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true) + public int autoProfileThresholdMs = -1; + @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external") public boolean runtimeFilterPruneForExternal = true; @@ -1969,6 +1974,10 @@ public class SessionVariable implements Serializable, Writable { return enableProfile; } + public int getAutoProfileThresholdMs() { + return this.autoProfileThresholdMs; + } + public boolean enableSingleDistinctColumnOpt() { return enableSingleDistinctColumnOpt; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f31fe76bca4..6236009a7ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -220,7 +220,6 @@ public class StmtExecutor { private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); public static final int MAX_DATA_TO_SEND_FOR_TXN = 100; public static final String NULL_VALUE_FOR_LOAD = "\\N"; - private final Object writeProfileLock = new Object(); private ConnectContext context; private final StatementContext statementContext; private MysqlSerializer serializer; @@ -260,7 +259,9 @@ public class StmtExecutor { this.isProxy = isProxy; this.statementContext = new StatementContext(context, originStmt); this.context.setStatementContext(statementContext); - this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile); + this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile, + this.context.getSessionVariable().profileLevel, + this.context.getSessionVariable().getEnablePipelineXEngine()); } // for test @@ -290,7 +291,8 @@ public class StmtExecutor { this.statementContext.setParsedStatement(parsedStmt); } this.context.setStatementContext(statementContext); - this.profile = new Profile("Query", context.getSessionVariable().enableProfile()); + this.profile = new Profile("Query", context.getSessionVariable().enableProfile(), + context.getSessionVariable().profileLevel, context.getSessionVariable().getEnablePipelineXEngine()); } public static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException { @@ -993,9 +995,7 @@ public class StmtExecutor { // and ensure the sql is finished normally. For example, if update profile // failed, the insert stmt should be success try { - profile.update(context.startTime, getSummaryInfo(isFinished), isFinished, - context.getSessionVariable().profileLevel, this.planner, - context.getSessionVariable().getEnablePipelineXEngine()); + profile.updateSummary(context.startTime, getSummaryInfo(isFinished), isFinished, this.planner); } catch (Throwable t) { LOG.warn("failed to update profile, ingore this error", t); } @@ -1600,9 +1600,9 @@ public class StmtExecutor { context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); } else { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - profile.addExecutionProfile(coord.getExecutionProfile()); coordBase = coord; } @@ -1610,35 +1610,10 @@ public class StmtExecutor { coordBase.exec(); profile.getSummaryProfile().setQueryScheduleFinishTime(); updateProfile(false); - if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Start to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", - context.getQualifiedUser(), context.getDatabase(), - parsedStmt.getOrigStmt().originStmt.replace("\n", " "), - coordBase.getInstanceTotalNum()); - } - } catch (Exception e) { - LOG.warn("Fail to print fragment concurrency for Query.", e); - } - } if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) { Preconditions.checkState(!context.isReturnResultFromLocal()); profile.getSummaryProfile().setTempStartTime(); - if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, " - + "fragment instance num: {}", - context.getQualifiedUser(), context.getDatabase(), - parsedStmt.getOrigStmt().originStmt.replace("\n", " "), - coordBase.getInstanceTotalNum()); - } - } catch (Exception e) { - LOG.warn("Fail to print fragment concurrency for Query.", e); - } - } return; } @@ -1723,18 +1698,6 @@ public class StmtExecutor { throw e; } finally { coordBase.close(); - if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) { - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}", - context.getQualifiedUser(), context.getDatabase(), - parsedStmt.getOrigStmt().originStmt.replace("\n", " "), - coordBase.getInstanceTotalNum()); - } - } catch (Exception e) { - LOG.warn("Fail to print fragment concurrency for Query.", e); - } - } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index 50c24a42330..3f4bb846767 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -22,6 +22,7 @@ import org.apache.doris.proto.InternalService; import org.apache.doris.proto.PBackendServiceGrpc; import org.apache.doris.thrift.TNetworkAddress; +import com.google.common.util.concurrent.ListenableFuture; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; @@ -82,7 +83,7 @@ public class BackendServiceClient { .execPlanFragmentStart(request); } - public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync( + public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync( InternalService.PCancelPlanFragmentRequest request) { return stub.cancelPlanFragment(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index af21194263f..5a89614bab7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -36,6 +36,7 @@ import org.apache.doris.thrift.TPipelineFragmentParamsList; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -225,7 +226,7 @@ public class BackendServiceProxy { } } - public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(TNetworkAddress address, + public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(TNetworkAddress address, TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException { final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest.newBuilder() @@ -241,8 +242,8 @@ public class BackendServiceProxy { } } - public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address, - PlanFragmentId fragmentId, TUniqueId queryId, + public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync( + TNetworkAddress address, PlanFragmentId fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException { final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest .newBuilder() diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java index 15b4175759c..56ed66c0504 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java @@ -98,7 +98,7 @@ public class RuntimeProfileTest { @Test public void testCounter() { - RuntimeProfile profile = new RuntimeProfile(); + RuntimeProfile profile = new RuntimeProfile("test counter"); profile.addCounter("key", TUnit.UNIT, ""); Assert.assertNotNull(profile.getCounterMap().get("key")); Assert.assertNull(profile.getCounterMap().get("key2")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 29c0adae124..cd9e0cb048a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -33,6 +33,7 @@ import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.common.profile.Profile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -172,7 +173,8 @@ public class StmtExecutorTest { public void testSelect(@Mocked QueryStmt queryStmt, @Mocked SqlParser parser, @Mocked OriginalPlanner planner, - @Mocked Coordinator coordinator) throws Exception { + @Mocked Coordinator coordinator, + @Mocked Profile profile) throws Exception { Env env = Env.getCurrentEnv(); Deencapsulation.setField(env, "canRead", new AtomicBoolean(true)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org