This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 1aedb81b23925eb0424acc425a373cd43c7fe4cf
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Mon Jul 17 16:15:40 2023 +0800

    [feature](profile) backport stream load profile into 1.2-lts (#21784)
    
    Backport stream load profile into 1.2-lts, see #18364.
    
    The commented fields in proto is reserved for future backports.
---
 be/src/http/action/stream_load.cpp                            |  7 +++++++
 be/src/http/http_common.h                                     |  1 +
 be/src/runtime/fragment_mgr.cpp                               |  1 +
 .../main/java/org/apache/doris/planner/StreamLoadPlanner.java |  1 +
 .../src/main/java/org/apache/doris/qe/QeProcessorImpl.java    | 11 +++++++++--
 .../src/main/java/org/apache/doris/task/LoadTaskInfo.java     |  4 ++++
 .../src/main/java/org/apache/doris/task/StreamLoadTask.java   |  9 +++++++++
 gensrc/thrift/FrontendService.thrift                          |  6 ++++++
 8 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 5f036374c2..b9f2700968 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -572,6 +572,13 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
             request.__set_trim_double_quotes(false);
         }
     }
+    if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) {
+        if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) {
+            request.__set_enable_profile(true);
+        } else {
+            request.__set_enable_profile(false);
+        }
+    }
 
 #ifndef BE_TEST
     // plan this load
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index e61c828fb4..9c9dbf9296 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -52,6 +52,7 @@ static const std::string HTTP_SEND_BATCH_PARALLELISM = 
"send_batch_parallelism";
 static const std::string HTTP_LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
 static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns";
 static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+static const std::string HTTP_ENABLE_PROFILE = "enable_profile";
 
 static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit";
 static const std::string HTTP_TXN_ID_KEY = "txn_id";
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2b069d78a2..3bed90a59e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -327,6 +327,7 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
 
     RuntimeState* runtime_state = _executor.runtime_state();
     DCHECK(runtime_state != nullptr);
+    params.__set_query_type(runtime_state->query_type());
     if (runtime_state->query_type() == TQueryType::LOAD && !done && 
status.ok()) {
         // this is a load plan, and load is not finished, just make a brief 
report
         params.__set_loaded_rows(runtime_state->num_rows_load_total());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 634379ed7d..c61fd96976 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -259,6 +259,7 @@ public class StreamLoadPlanner {
         queryOptions.setLoadMemLimit(taskInfo.getMemLimit());
         queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load);
         queryOptions.setBeExecVersion(Config.be_exec_version);
+        queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
 
         params.setQueryOptions(queryOptions);
         TQueryGlobals queryGlobals = new TQueryGlobals();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 1bf11ece97..7e79fc47ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileWriter;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TReportExecStatusResult;
 import org.apache.doris.thrift.TStatus;
@@ -188,8 +189,14 @@ public final class QeProcessorImpl implements QeProcessor {
         final TReportExecStatusResult result = new TReportExecStatusResult();
         final QueryInfo info = coordinatorMap.get(params.query_id);
         if (info == null) {
-            result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
-            LOG.info("ReportExecStatus() runtime error, query {} does not 
exist", DebugUtil.printId(params.query_id));
+            // There is no QueryInfo for StreamLoad, so we return OK
+            if (params.query_type == TQueryType.LOAD) {
+                result.setStatus(new TStatus(TStatusCode.OK));
+            } else {
+                result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
+            }
+            LOG.info("ReportExecStatus() runtime error, query {} with type {} 
does not exist",
+                    DebugUtil.printId(params.query_id), params.query_type);
             return result;
         }
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index e384394653..47d18ef205 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -101,6 +101,10 @@ public interface LoadTaskInfo {
         return false;
     }
 
+    default boolean getEnableProfile() {
+        return false;
+    }
+
     class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
         public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 2a80ceef90..cae47eca28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -83,6 +83,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private String headerType = "";
     private List<String> hiddenColumns;
     private boolean trimDoubleQuotes = false;
+    private boolean enableProfile = false;
 
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, 
TFileFormatType formatType,
             TFileCompressType compressType) {
@@ -257,6 +258,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return trimDoubleQuotes;
     }
 
+    @Override
+    public boolean getEnableProfile() {
+        return enableProfile;
+    }
+
     public static StreamLoadTask 
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
         StreamLoadTask streamLoadTask = new 
StreamLoadTask(request.getLoadId(), request.getTxnId(),
                 request.getFileType(), request.getFormatType(),
@@ -359,6 +365,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetTrimDoubleQuotes()) {
             trimDoubleQuotes = request.isTrimDoubleQuotes();
         }
+        if (request.isSetEnableProfile()) {
+            enableProfile = request.isEnableProfile();
+        }
     }
 
     // used for stream load
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 32c73a5663..3634a11333 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -407,6 +407,10 @@ struct TReportExecStatusParams {
   17: optional i64 loaded_bytes
 
   18: optional list<Types.TErrorTabletInfo> errorTabletInfos
+
+  // 19: optional i32 fragment_id
+
+  20: optional PaloInternalService.TQueryType query_type
 }
 
 struct TFeResult {
@@ -547,6 +551,8 @@ struct TStreamLoadPutRequest {
     40: optional PlanNodes.TFileCompressType compress_type
     41: optional i64 file_size // only for stream load with parquet or orc
     42: optional bool trim_double_quotes // trim double quotes for csv
+    // 43: optional i32 skip_lines // csv skip line num, only used when csv 
header_type is not set.
+    44: optional bool enable_profile
 }
 
 struct TStreamLoadPutResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to