This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 42bbba1e0aa [branch-2.1](opt)(profile) parallel serialize fragment and add detail schedule profile (#33376) 42bbba1e0aa is described below commit 42bbba1e0aac9e6284f249b7cbefb6641242a27d Author: Mingyu Chen <morning...@163.com> AuthorDate: Wed Apr 10 15:24:57 2024 +0800 [branch-2.1](opt)(profile) parallel serialize fragment and add detail schedule profile (#33376) bp #33379 --- .../doris/common/profile/SummaryProfile.java | 282 ++++++++++----------- .../org/apache/doris/common/util/BrokerUtil.java | 3 +- .../main/java/org/apache/doris/qe/Coordinator.java | 81 ++++-- .../java/org/apache/doris/qe/StmtExecutor.java | 1 + .../org/apache/doris/rpc/BackendServiceProxy.java | 17 +- .../ExternalFileTableValuedFunction.java | 2 +- 6 files changed, 218 insertions(+), 168 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index e9389b48b99..28d179f2e77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -62,14 +62,18 @@ public class SummaryProfile { public static final String INIT_SCAN_NODE_TIME = "Init Scan Node Time"; public static final String FINALIZE_SCAN_NODE_TIME = "Finalize Scan Node Time"; public static final String GET_SPLITS_TIME = "Get Splits Time"; - public static final String GET_PARTITIONS_TIME = "Get PARTITIONS Time"; - public static final String GET_PARTITION_FILES_TIME = "Get PARTITION FILES Time"; + public static final String GET_PARTITIONS_TIME = "Get Partitions Time"; + public static final String GET_PARTITION_FILES_TIME = "Get Partition Files Time"; public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range Time"; public static final String PLAN_TIME = "Plan Time"; public static final String SCHEDULE_TIME = "Schedule Time"; + public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time"; + public static final String FRAGMENT_SERIALIZE_TIME = "Fragment Serialize Time"; + public static final String SEND_FRAGMENT_PHASE1_TIME = "Fragment RPC Phase1 Time"; + public static final String SEND_FRAGMENT_PHASE2_TIME = "Fragment RPC Phase2 Time"; + public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; public static final String FETCH_RESULT_TIME = "Fetch Result Time"; public static final String WRITE_RESULT_TIME = "Write Result Time"; - public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; public static final String PARSE_SQL_TIME = "Parse SQL Time"; public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time"; @@ -77,34 +81,75 @@ public class SummaryProfile { public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time"; public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time"; + public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size"; + public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count"; + // These info will display on FE's web ui table, every one will be displayed as // a column, so that should not // add many columns here. Add to ExcecutionSummary list. public static final ImmutableList<String> SUMMARY_KEYS = ImmutableList.of(PROFILE_ID, TASK_TYPE, START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, SQL_STATEMENT); + // The display order of execution summary items. public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = ImmutableList.of( - PARSE_SQL_TIME, NEREIDS_ANALYSIS_TIME, NEREIDS_REWRITE_TIME, NEREIDS_OPTIMIZE_TIME, NEREIDS_TRANSLATE_TIME, - WORKLOAD_GROUP, ANALYSIS_TIME, - PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME, QUERY_DISTRIBUTED_TIME, - INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME, GET_PARTITIONS_TIME, - GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, SCHEDULE_TIME, FETCH_RESULT_TIME, - WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION, IS_NEREIDS, IS_PIPELINE, - IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID); + PARSE_SQL_TIME, + NEREIDS_ANALYSIS_TIME, + NEREIDS_REWRITE_TIME, + NEREIDS_OPTIMIZE_TIME, + NEREIDS_TRANSLATE_TIME, + WORKLOAD_GROUP, + ANALYSIS_TIME, + PLAN_TIME, + JOIN_REORDER_TIME, + CREATE_SINGLE_NODE_TIME, + QUERY_DISTRIBUTED_TIME, + INIT_SCAN_NODE_TIME, + FINALIZE_SCAN_NODE_TIME, + GET_SPLITS_TIME, + GET_PARTITIONS_TIME, + GET_PARTITION_FILES_TIME, + CREATE_SCAN_RANGE_TIME, + SCHEDULE_TIME, + ASSIGN_FRAGMENT_TIME, + FRAGMENT_SERIALIZE_TIME, + SEND_FRAGMENT_PHASE1_TIME, + SEND_FRAGMENT_PHASE2_TIME, + FRAGMENT_COMPRESSED_SIZE, + FRAGMENT_RPC_COUNT, + WAIT_FETCH_RESULT_TIME, + FETCH_RESULT_TIME, + WRITE_RESULT_TIME, + DORIS_VERSION, + IS_NEREIDS, + IS_PIPELINE, + IS_CACHED, + TOTAL_INSTANCES_NUM, + INSTANCES_NUM_PER_BE, + PARALLEL_FRAGMENT_EXEC_INSTANCE, + TRACE_ID); // Ident of each item. Default is 0, which doesn't need to present in this Map. // Please set this map for new profile items if they need ident. - public static ImmutableMap<String, Integer> EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of( - JOIN_REORDER_TIME, 1, - CREATE_SINGLE_NODE_TIME, 1, - QUERY_DISTRIBUTED_TIME, 1, - INIT_SCAN_NODE_TIME, 1, - FINALIZE_SCAN_NODE_TIME, 1, - GET_SPLITS_TIME, 2, - GET_PARTITIONS_TIME, 3, - GET_PARTITION_FILES_TIME, 3, - CREATE_SCAN_RANGE_TIME, 2 - ); + public static ImmutableMap<String, Integer> EXECUTION_SUMMARY_KEYS_IDENTATION + = ImmutableMap.<String, Integer>builder() + .put(JOIN_REORDER_TIME, 1) + .put(CREATE_SINGLE_NODE_TIME, 1) + .put(QUERY_DISTRIBUTED_TIME, 1) + .put(INIT_SCAN_NODE_TIME, 1) + .put(FINALIZE_SCAN_NODE_TIME, 1) + .put(GET_SPLITS_TIME, 2) + .put(GET_PARTITIONS_TIME, 3) + .put(GET_PARTITION_FILES_TIME, 3) + .put(CREATE_SCAN_RANGE_TIME, 2) + .put(FETCH_RESULT_TIME, 1) + .put(WRITE_RESULT_TIME, 1) + .put(ASSIGN_FRAGMENT_TIME, 1) + .put(FRAGMENT_SERIALIZE_TIME, 1) + .put(SEND_FRAGMENT_PHASE1_TIME, 1) + .put(SEND_FRAGMENT_PHASE2_TIME, 1) + .put(FRAGMENT_COMPRESSED_SIZE, 1) + .put(FRAGMENT_RPC_COUNT, 1) + .build(); private RuntimeProfile summaryProfile; private RuntimeProfile executionSummaryProfile; @@ -136,6 +181,12 @@ public class SummaryProfile { private long createScanRangeFinishTime = -1; // Plan end time private long queryPlanFinishTime = -1; + private long assignFragmentTime = -1; + private long fragmentSerializeTime = -1; + private long fragmentSendPhase1Time = -1; + private long fragmentSendPhase2Time = -1; + private long fragmentCompressedSize = 0; + private long fragmentRpcCount = 0; // Fragment schedule and send end time private long queryScheduleFinishTime = -1; // Query result fetch end time @@ -197,23 +248,47 @@ public class SummaryProfile { executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime()); executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime()); executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime()); - executionSummaryProfile.addInfoString(ANALYSIS_TIME, getPrettyQueryAnalysisFinishTime()); - executionSummaryProfile.addInfoString(PLAN_TIME, getPrettyQueryPlanFinishTime()); - executionSummaryProfile.addInfoString(JOIN_REORDER_TIME, getPrettyQueryJoinReorderFinishTime()); - executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME, getPrettyCreateSingleNodeFinishTime()); - executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME, getPrettyQueryDistributedFinishTime()); - executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME, getPrettyInitScanNodeTime()); - executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME, getPrettyFinalizeScanNodeTime()); - executionSummaryProfile.addInfoString(GET_SPLITS_TIME, getPrettyGetSplitsTime()); - executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME, getPrettyGetPartitionsTime()); - executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, getPrettyGetPartitionFilesTime()); - executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, getPrettyCreateScanRangeTime()); - executionSummaryProfile.addInfoString(SCHEDULE_TIME, getPrettyQueryScheduleFinishTime()); + executionSummaryProfile.addInfoString(ANALYSIS_TIME, + getPrettyTime(queryAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(PLAN_TIME, + getPrettyTime(queryPlanFinishTime, queryAnalysisFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(JOIN_REORDER_TIME, + getPrettyTime(queryJoinReorderFinishTime, queryAnalysisFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME, + getPrettyTime(queryCreateSingleNodeFinishTime, queryJoinReorderFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME, + getPrettyTime(queryDistributedFinishTime, queryCreateSingleNodeFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME, + getPrettyTime(initScanNodeFinishTime, initScanNodeStartTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME, + getPrettyTime(finalizeScanNodeFinishTime, finalizeScanNodeStartTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(GET_SPLITS_TIME, + getPrettyTime(getSplitsFinishTime, getSplitsStartTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME, + getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, + getPrettyTime(getPartitionFilesFinishTime, getPartitionsFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, + getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(SCHEDULE_TIME, + getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME, + getPrettyTime(assignFragmentTime, queryPlanFinishTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME, + getPrettyTime(fragmentSerializeTime, assignFragmentTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE1_TIME, + getPrettyTime(fragmentSendPhase1Time, fragmentSerializeTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE2_TIME, + getPrettyTime(fragmentSendPhase2Time, fragmentSendPhase1Time, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(FRAGMENT_COMPRESSED_SIZE, + RuntimeProfile.printCounter(fragmentCompressedSize, TUnit.BYTES)); + executionSummaryProfile.addInfoString(FRAGMENT_RPC_COUNT, "" + fragmentRpcCount); + executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, + getPrettyTime(queryFetchResultFinishTime, queryScheduleFinishTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(FETCH_RESULT_TIME, RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_MS)); executionSummaryProfile.addInfoString(WRITE_RESULT_TIME, RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS)); - executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime()); } public void setParseSqlStartTime(long parseSqlStartTime) { @@ -320,8 +395,28 @@ public class SummaryProfile { this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime; } - public long getQueryBeginTime() { - return queryBeginTime; + public void setAssignFragmentTime() { + this.assignFragmentTime = TimeUtils.getStartTimeMs(); + } + + public void setFragmentSerializeTime() { + this.fragmentSerializeTime = TimeUtils.getStartTimeMs(); + } + + public void setFragmentSendPhase1Time() { + this.fragmentSendPhase1Time = TimeUtils.getStartTimeMs(); + } + + public void setFragmentSendPhase2Time() { + this.fragmentSendPhase2Time = TimeUtils.getStartTimeMs(); + } + + public void updateFragmentCompressedSize(long size) { + this.fragmentCompressedSize += size; + } + + public void updateFragmentRpcCount(long count) { + this.fragmentRpcCount += count; } public static class SummaryBuilder { @@ -423,128 +518,29 @@ public class SummaryProfile { } public String getPrettyParseSqlTime() { - if (parseSqlStartTime == -1 || parseSqlFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(parseSqlFinishTime - parseSqlStartTime, TUnit.TIME_MS); + return getPrettyTime(parseSqlStartTime, parseSqlFinishTime, TUnit.TIME_MS); } public String getPrettyNereidsAnalysisTime() { - if (nereidsAnalysisFinishTime == -1 || queryAnalysisFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(nereidsAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS); + return getPrettyTime(nereidsAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS); } public String getPrettyNereidsRewriteTime() { - if (nereidsRewriteFinishTime == -1 || nereidsAnalysisFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(nereidsRewriteFinishTime - nereidsAnalysisFinishTime, TUnit.TIME_MS); + return getPrettyTime(nereidsRewriteFinishTime, nereidsAnalysisFinishTime, TUnit.TIME_MS); } public String getPrettyNereidsOptimizeTime() { - if (nereidsOptimizeFinishTime == -1 || nereidsRewriteFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(nereidsOptimizeFinishTime - nereidsRewriteFinishTime, TUnit.TIME_MS); + return getPrettyTime(nereidsOptimizeFinishTime, nereidsRewriteFinishTime, TUnit.TIME_MS); } public String getPrettyNereidsTranslateTime() { - if (nereidsTranslateFinishTime == -1 || nereidsOptimizeFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(nereidsTranslateFinishTime - nereidsOptimizeFinishTime, TUnit.TIME_MS); - } - - private String getPrettyQueryAnalysisFinishTime() { - if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS); - } - - private String getPrettyQueryJoinReorderFinishTime() { - if (queryAnalysisFinishTime == -1 || queryJoinReorderFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryJoinReorderFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS); - } - - private String getPrettyCreateSingleNodeFinishTime() { - if (queryJoinReorderFinishTime == -1 || queryCreateSingleNodeFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryCreateSingleNodeFinishTime - queryJoinReorderFinishTime, TUnit.TIME_MS); - } - - private String getPrettyQueryDistributedFinishTime() { - if (queryCreateSingleNodeFinishTime == -1 || queryDistributedFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryDistributedFinishTime - queryCreateSingleNodeFinishTime, TUnit.TIME_MS); - } - - private String getPrettyInitScanNodeTime() { - if (initScanNodeStartTime == -1 || initScanNodeFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(initScanNodeFinishTime - initScanNodeStartTime, TUnit.TIME_MS); - } - - private String getPrettyFinalizeScanNodeTime() { - if (finalizeScanNodeFinishTime == -1 || finalizeScanNodeStartTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(finalizeScanNodeFinishTime - finalizeScanNodeStartTime, TUnit.TIME_MS); - } - - private String getPrettyGetSplitsTime() { - if (getSplitsFinishTime == -1 || getSplitsStartTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(getSplitsFinishTime - getSplitsStartTime, TUnit.TIME_MS); - } - - private String getPrettyGetPartitionsTime() { - if (getSplitsStartTime == -1 || getPartitionsFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(getPartitionsFinishTime - getSplitsStartTime, TUnit.TIME_MS); - } - - private String getPrettyGetPartitionFilesTime() { - if (getPartitionsFinishTime == -1 || getPartitionFilesFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(getPartitionFilesFinishTime - getPartitionsFinishTime, TUnit.TIME_MS); - } - - private String getPrettyCreateScanRangeTime() { - if (getSplitsFinishTime == -1 || createScanRangeFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(createScanRangeFinishTime - getSplitsFinishTime, TUnit.TIME_MS); - } - - private String getPrettyQueryPlanFinishTime() { - if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS); - } - - private String getPrettyQueryScheduleFinishTime() { - if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_MS); + return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS); } - private String getPrettyQueryFetchResultFinishTime() { - if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) { + private String getPrettyTime(long end, long start, TUnit unit) { + if (start == -1 || end == -1) { return "N/A"; } - return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_MS); + return RuntimeProfile.printCounter(end - start, unit); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index fef91f6f022..a4d8186d1df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -90,8 +90,7 @@ public class BrokerUtil { brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties()); Status st = fileSystem.list(path, rfiles, false); if (!st.ok()) { - throw new UserException(brokerDesc.getName() + " list path failed. path=" + path - + ",msg=" + st.getErrMsg()); + throw new UserException(st.getErrMsg()); } } catch (Exception e) { LOG.warn("{} list path exception, path={}", brokerDesc.getName(), path, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 66b6ae39688..83b428a9d04 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -28,6 +28,7 @@ import org.apache.doris.common.Reference; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.profile.ExecutionProfile; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; import org.apache.doris.common.util.RuntimeProfile; @@ -121,11 +122,14 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; +import com.google.protobuf.ByteString; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TCompactProtocol; import org.jetbrains.annotations.NotNull; import java.security.SecureRandom; @@ -144,8 +148,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -706,6 +712,7 @@ public class Coordinator implements CoordInterface { executionProfile.markInstances(instanceIds); } + updateProfileIfPresent(profile -> profile.setAssignFragmentTime()); if (enablePipelineEngine) { sendPipelineCtx(); } else { @@ -978,35 +985,44 @@ public class Coordinator implements CoordInterface { } // end for fragments // 4. send and wait fragments rpc + // 4.1 serialize fragment + // unsetFields() must be called serially. + beToPipelineExecCtxs.values().stream().forEach(ctxs -> ctxs.unsetFields()); + // serializeFragments() can be called in parallel. + final AtomicLong compressedSize = new AtomicLong(0); + beToPipelineExecCtxs.values().parallelStream().forEach(ctxs -> { + try { + compressedSize.addAndGet(ctxs.serializeFragments()); + } catch (TException e) { + throw new RuntimeException(e); + } + }); + updateProfileIfPresent(profile -> profile.updateFragmentCompressedSize(compressedSize.get())); + updateProfileIfPresent(profile -> profile.setFragmentSerializeTime()); + + // 4.2 send fragments rpc List<Triple<PipelineExecContexts, BackendServiceProxy, Future<InternalService.PExecPlanFragmentResult>>> futures = Lists.newArrayList(); - + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { if (LOG.isDebugEnabled()) { - String infos = ""; - for (PipelineExecContext pec : ctxs.ctxs) { - infos += pec.fragmentId + " "; - } - if (LOG.isDebugEnabled()) { - LOG.debug("query {}, sending pipeline fragments: {} to be {} bprc address {}", - DebugUtil.printId(queryId), infos, ctxs.beId, ctxs.brpcAddr.toString()); - } + LOG.debug(ctxs.debugInfo()); } - - ctxs.unsetFields(); - BackendServiceProxy proxy = BackendServiceProxy.getInstance(); futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execRemoteFragmentsAsync(proxy))); } waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); + updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size())); + updateProfileIfPresent(profile -> profile.setFragmentSendPhase1Time()); if (twoPhaseExecution) { // 5. send and wait execution start rpc futures.clear(); for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) { - BackendServiceProxy proxy = BackendServiceProxy.getInstance(); futures.add(ImmutableTriple.of(ctxs, proxy, ctxs.execPlanFragmentStartAsync(proxy))); } waitPipelineRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); + updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(futures.size())); + updateProfileIfPresent(profile -> profile.setFragmentSendPhase2Time()); } if (context != null && context.getSessionVariable().enableProfile()) { attachInstanceProfileToFragmentProfile(); @@ -3491,6 +3507,7 @@ public class Coordinator implements CoordInterface { List<PipelineExecContext> ctxs = Lists.newArrayList(); boolean twoPhaseExecution = false; int instanceNumber; + ByteString serializedFragments = null; public PipelineExecContexts(long beId, TNetworkAddress brpcAddr, boolean twoPhaseExecution, int instanceNumber) { @@ -3524,15 +3541,10 @@ public class Coordinator implements CoordInterface { } } - public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy) - throws TException { + public Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(BackendServiceProxy proxy) { + Preconditions.checkNotNull(serializedFragments); try { - TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); - for (PipelineExecContext cts : ctxs) { - cts.initiated = true; - paramsList.addToParamsList(cts.rpcParams); - } - return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, twoPhaseExecution); + return proxy.execPlanFragmentsAsync(brpcAddr, serializedFragments, twoPhaseExecution); } catch (RpcException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. @@ -3586,6 +3598,26 @@ public class Coordinator implements CoordInterface { } }; } + + public long serializeFragments() throws TException { + TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); + for (PipelineExecContext cts : ctxs) { + cts.initiated = true; + paramsList.addToParamsList(cts.rpcParams); + } + serializedFragments = ByteString.copyFrom( + new TSerializer(new TCompactProtocol.Factory()).serialize(paramsList)); + return serializedFragments.size(); + } + + public String debugInfo() { + String infos = ""; + for (PipelineExecContext pec : ctxs) { + infos += pec.fragmentId + " "; + } + return String.format("query %s, sending pipeline fragments: %s to be %s bprc address %s", + DebugUtil.printId(queryId), infos, beId, brpcAddr.toString()); + } } // execution parameters for a single fragment, @@ -4018,5 +4050,12 @@ public class Coordinator implements CoordInterface { this.targetFragmentInstanceAddr = host; } } + + private void updateProfileIfPresent(Consumer<SummaryProfile> profileAction) { + Optional.ofNullable(context) + .map(ConnectContext::getExecutor) + .map(StmtExecutor::getSummaryProfile) + .ifPresent(profileAction); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f31fe76bca4..423c24cf967 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -255,6 +255,7 @@ public class StmtExecutor { public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) { Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL)); this.context = context; + this.context.setExecutor(this); this.originStmt = originStmt; this.serializer = context.getMysqlChannel().getSerializer(); this.isProxy = isProxy; diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index af21194263f..15d571cb789 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -197,8 +197,23 @@ public class BackendServiceProxy { } // VERSION 3 means we send TPipelineFragmentParamsList builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3); + return execPlanFragmentsAsync(address, builder.build(), twoPhaseExecution); + } - final InternalService.PExecPlanFragmentRequest pRequest = builder.build(); + public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address, + ByteString serializedFragments, boolean twoPhaseExecution) throws RpcException { + InternalService.PExecPlanFragmentRequest.Builder builder = + InternalService.PExecPlanFragmentRequest.newBuilder(); + builder.setRequest(serializedFragments); + builder.setCompact(true); + // VERSION 3 means we send TPipelineFragmentParamsList + builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3); + return execPlanFragmentsAsync(address, builder.build(), twoPhaseExecution); + } + + public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentsAsync(TNetworkAddress address, + InternalService.PExecPlanFragmentRequest pRequest, boolean twoPhaseExecution) + throws RpcException { MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L); MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) pRequest.getSerializedSize()); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 451b37d1311..260f7b2df44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -168,7 +168,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio try { BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } catch (UserException e) { - throw new AnalysisException("parse file failed, path = " + path, e); + throw new AnalysisException("parse file failed, err: " + e.getMessage(), e); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org