This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1aedb81b23925eb0424acc425a373cd43c7fe4cf Author: Kaijie Chen <c...@apache.org> AuthorDate: Mon Jul 17 16:15:40 2023 +0800 [feature](profile) backport stream load profile into 1.2-lts (#21784) Backport stream load profile into 1.2-lts, see #18364. The commented fields in proto is reserved for future backports. --- be/src/http/action/stream_load.cpp | 7 +++++++ be/src/http/http_common.h | 1 + be/src/runtime/fragment_mgr.cpp | 1 + .../main/java/org/apache/doris/planner/StreamLoadPlanner.java | 1 + .../src/main/java/org/apache/doris/qe/QeProcessorImpl.java | 11 +++++++++-- .../src/main/java/org/apache/doris/task/LoadTaskInfo.java | 4 ++++ .../src/main/java/org/apache/doris/task/StreamLoadTask.java | 9 +++++++++ gensrc/thrift/FrontendService.thrift | 6 ++++++ 8 files changed, 38 insertions(+), 2 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 5f036374c2..b9f2700968 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -572,6 +572,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_trim_double_quotes(false); } } + if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { + if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { + request.__set_enable_profile(true); + } else { + request.__set_enable_profile(false); + } + } #ifndef BE_TEST // plan this load diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index e61c828fb4..9c9dbf9296 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -52,6 +52,7 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM = "send_batch_parallelism"; static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; +static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 2b069d78a2..3bed90a59e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -327,6 +327,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil RuntimeState* runtime_state = _executor.runtime_state(); DCHECK(runtime_state != nullptr); + params.__set_query_type(runtime_state->query_type()); if (runtime_state->query_type() == TQueryType::LOAD && !done && status.ok()) { // this is a load plan, and load is not finished, just make a brief report params.__set_loaded_rows(runtime_state->num_rows_load_total()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 634379ed7d..c61fd96976 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -259,6 +259,7 @@ public class StreamLoadPlanner { queryOptions.setLoadMemLimit(taskInfo.getMemLimit()); queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load); queryOptions.setBeExecVersion(Config.be_exec_version); + queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); params.setQueryOptions(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 1bf11ece97..7e79fc47ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TStatus; @@ -188,8 +189,14 @@ public final class QeProcessorImpl implements QeProcessor { final TReportExecStatusResult result = new TReportExecStatusResult(); final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { - result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); - LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); + // There is no QueryInfo for StreamLoad, so we return OK + if (params.query_type == TQueryType.LOAD) { + result.setStatus(new TStatus(TStatusCode.OK)); + } else { + result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); + } + LOG.info("ReportExecStatus() runtime error, query {} with type {} does not exist", + DebugUtil.printId(params.query_id), params.query_type); return result; } try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index e384394653..47d18ef205 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -101,6 +101,10 @@ public interface LoadTaskInfo { return false; } + default boolean getEnableProfile() { + return false; + } + class ImportColumnDescs { public List<ImportColumnDesc> descs = Lists.newArrayList(); public boolean isColumnDescsRewrited = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 2a80ceef90..cae47eca28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -83,6 +83,7 @@ public class StreamLoadTask implements LoadTaskInfo { private String headerType = ""; private List<String> hiddenColumns; private boolean trimDoubleQuotes = false; + private boolean enableProfile = false; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, TFileCompressType compressType) { @@ -257,6 +258,11 @@ public class StreamLoadTask implements LoadTaskInfo { return trimDoubleQuotes; } + @Override + public boolean getEnableProfile() { + return enableProfile; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType(), @@ -359,6 +365,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetTrimDoubleQuotes()) { trimDoubleQuotes = request.isTrimDoubleQuotes(); } + if (request.isSetEnableProfile()) { + enableProfile = request.isEnableProfile(); + } } // used for stream load diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 32c73a5663..3634a11333 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -407,6 +407,10 @@ struct TReportExecStatusParams { 17: optional i64 loaded_bytes 18: optional list<Types.TErrorTabletInfo> errorTabletInfos + + // 19: optional i32 fragment_id + + 20: optional PaloInternalService.TQueryType query_type } struct TFeResult { @@ -547,6 +551,8 @@ struct TStreamLoadPutRequest { 40: optional PlanNodes.TFileCompressType compress_type 41: optional i64 file_size // only for stream load with parquet or orc 42: optional bool trim_double_quotes // trim double quotes for csv + // 43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set. + 44: optional bool enable_profile } struct TStreamLoadPutResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org