This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch doris-for-zhongjin in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5868083b0b60e3c2783338054afa4236d649c90b Author: gitccl <60637740+git...@users.noreply.github.com> AuthorDate: Wed Apr 5 01:01:46 2023 +0800 [fix](streamload) fix stream load failed when enable profile (#18364) #18015 enables stream load profile log, however be will encounter rpc fail when loading tpch data(see #18291). This is because when `is_report_success` is true, be will reportExecStatus to fe, but fe cannot find QueryInfo in `coordinatorMap`, thus it will return error to be. --- be/src/common/config.h | 2 -- be/src/http/action/stream_load.cpp | 13 +++++++------ be/src/http/http_common.h | 1 + be/src/runtime/fragment_mgr.cpp | 1 + docs/en/docs/admin-manual/config/be-config.md | 7 ------- .../data-operate/import/import-way/stream-load-manual.md | 6 ++++++ docs/zh-CN/docs/admin-manual/config/be-config.md | 7 ------- .../data-operate/import/import-way/stream-load-manual.md | 6 ++++++ .../java/org/apache/doris/planner/StreamLoadPlanner.java | 3 +-- .../src/main/java/org/apache/doris/qe/QeProcessorImpl.java | 11 +++++++++-- .../src/main/java/org/apache/doris/task/LoadTaskInfo.java | 4 ++++ .../src/main/java/org/apache/doris/task/StreamLoadTask.java | 10 +++++++++- gensrc/thrift/FrontendService.thrift | 3 +++ 13 files changed, 47 insertions(+), 27 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8f34155280..63d785b41c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -420,8 +420,6 @@ CONF_mInt32(stream_load_record_batch_size, "50"); CONF_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records CONF_mInt64(clean_stream_load_record_interval_secs, "1800"); -// Whether to enable stream load profile to be printed to the log, the default is false. -CONF_mBool(enable_stream_load_profile_log, "false"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e69d22fc45..2520b3a5f4 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -581,6 +581,13 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (!http_req->header(HTTP_SKIP_LINES).empty()) { request.__set_skip_lines(std::stoi(http_req->header(HTTP_SKIP_LINES))); } + if (!http_req->header(HTTP_ENABLE_PROFILE).empty()) { + if (iequal(http_req->header(HTTP_ENABLE_PROFILE), "true")) { + request.__set_enable_profile(true); + } else { + request.__set_enable_profile(false); + } + } #ifndef BE_TEST // plan this load @@ -601,12 +608,6 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, return plan_status; } - auto& query_options = ctx->put_result.params.query_options; - if (query_options.__isset.is_report_success && query_options.is_report_success && - !config::enable_stream_load_profile_log) { - query_options.is_report_success = false; - } - VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); // if we not use streaming, we must download total content before we begin // to process this load diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 6dbf93609e..2295731d6a 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -54,6 +54,7 @@ static const std::string HTTP_HIDDEN_COLUMNS = "hidden_columns"; static const std::string HTTP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; static const std::string HTTP_SKIP_LINES = "skip_lines"; static const std::string HTTP_COMMENT = "comment"; +static const std::string HTTP_ENABLE_PROFILE = "enable_profile"; static const std::string HTTP_TWO_PHASE_COMMIT = "two_phase_commit"; static const std::string HTTP_TXN_ID_KEY = "txn_id"; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1558e9b855..f683828c5d 100755 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -354,6 +354,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__set_fragment_id(req.fragment_id); exec_status.set_t_status(¶ms); params.__set_done(req.done); + params.__set_query_type(req.runtime_state->query_type()); DCHECK(req.runtime_state != nullptr); if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) { diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md index 62904d50f8..62091324ac 100644 --- a/docs/en/docs/admin-manual/config/be-config.md +++ b/docs/en/docs/admin-manual/config/be-config.md @@ -769,13 +769,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in * Default value: 100 * Dynamically modifiable: Yes -#### `enable_stream_load_profile_log` - -* Type: bool -* Description: Whether to enable stream load profile to be printed to the log. -* Default value: false -* Dynamically modifiable: Yes - ### Thread #### `delete_worker_count` diff --git a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md index 91f696b3dd..4fa8494eae 100644 --- a/docs/en/docs/data-operate/import/import-way/stream-load-manual.md +++ b/docs/en/docs/data-operate/import/import-way/stream-load-manual.md @@ -181,6 +181,12 @@ The number of rows in the original file = `dpp.abnorm.ALL + dpp.norm.ALL` Stream load import can enable two-stage transaction commit mode: in the stream load process, the data is written and the information is returned to the user. At this time, the data is invisible and the transaction status is `PRECOMMITTED`. After the user manually triggers the commit operation, the data is visible. ++ enable_profile + <version since="1.2.4"> + </version> + + When `enable_profile` is true, the Stream Load profile will be printed to the log. Otherwise it won't print. + Example: 1. Initiate a stream load pre-commit operation diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md index f91c63ec8b..ac3f47043d 100644 --- a/docs/zh-CN/docs/admin-manual/config/be-config.md +++ b/docs/zh-CN/docs/admin-manual/config/be-config.md @@ -783,13 +783,6 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in * 默认值: 100 * 可动态修改:是 -#### `enable_stream_load_profile_log` - -* 类型:bool -* 描述:是否将 stream load profile 打印到日志。 -* 默认值: false -* 可动态修改:是 - ### 线程 #### `delete_worker_count` diff --git a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md index bf47e8c8f1..225d5b8544 100644 --- a/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md +++ b/docs/zh-CN/docs/data-operate/import/import-way/stream-load-manual.md @@ -191,6 +191,12 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的 Stream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为`PRECOMMITTED`,用户手动触发commit操作之后,数据才可见。 +- enable_profile + <version since="1.2.4"> + </version> + + 当 `enable_profile` 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。 + 示例: 1. 发起stream load预提交操作 diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 16b45d683c..f62c6eeee1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -47,7 +47,6 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.planner.external.ExternalFileScanNode; -import org.apache.doris.qe.VariableMgr; import org.apache.doris.service.FrontendOptions; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.PaloInternalServiceVersion; @@ -276,7 +275,7 @@ public class StreamLoadPlanner { queryOptions.setEnableVectorizedEngine(Config.enable_vectorized_load); queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load); queryOptions.setBeExecVersion(Config.be_exec_version); - queryOptions.setIsReportSuccess(VariableMgr.newSessionVariable().enableProfile()); + queryOptions.setIsReportSuccess(taskInfo.getEnableProfile()); params.setQueryOptions(queryOptions); TQueryGlobals queryGlobals = new TQueryGlobals(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index a5611ea4ca..9b20fe4f7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -24,6 +24,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TReportExecStatusResult; import org.apache.doris.thrift.TStatus; @@ -190,8 +191,14 @@ public final class QeProcessorImpl implements QeProcessor { final QueryInfo info = coordinatorMap.get(params.query_id); if (info == null) { - result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); - LOG.info("ReportExecStatus() runtime error, query {} does not exist", DebugUtil.printId(params.query_id)); + // There is no QueryInfo for StreamLoad, so we return OK + if (params.query_type == TQueryType.LOAD) { + result.setStatus(new TStatus(TStatusCode.OK)); + } else { + result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR)); + } + LOG.info("ReportExecStatus() runtime error, query {} with type {} does not exist", + DebugUtil.printId(params.query_id), params.query_type); return result; } try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index cb938e84cb..6e3133fbde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -105,6 +105,10 @@ public interface LoadTaskInfo { return 0; } + default boolean getEnableProfile() { + return false; + } + class ImportColumnDescs { public List<ImportColumnDesc> descs = Lists.newArrayList(); public boolean isColumnDescsRewrited = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 868835fff8..2d28cc3b5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -83,8 +83,8 @@ public class StreamLoadTask implements LoadTaskInfo { private String headerType = ""; private List<String> hiddenColumns; private boolean trimDoubleQuotes = false; - private int skipLines = 0; + private boolean enableProfile = false; public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType, TFileCompressType compressType) { @@ -263,6 +263,11 @@ public class StreamLoadTask implements LoadTaskInfo { return skipLines; } + @Override + public boolean getEnableProfile() { + return enableProfile; + } + public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException { StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(), request.getFileType(), request.getFormatType(), @@ -368,6 +373,9 @@ public class StreamLoadTask implements LoadTaskInfo { if (request.isSetSkipLines()) { skipLines = request.getSkipLines(); } + if (request.isSetEnableProfile()) { + enableProfile = request.isEnableProfile(); + } } // used for stream load diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index e8630b05cb..f1cc722723 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -429,6 +429,8 @@ struct TReportExecStatusParams { 18: optional list<Types.TErrorTabletInfo> errorTabletInfos 19: optional i32 fragment_id + + 20: optional PaloInternalService.TQueryType query_type } struct TFeResult { @@ -570,6 +572,7 @@ struct TStreamLoadPutRequest { 41: optional i64 file_size // only for stream load with parquet or orc 42: optional bool trim_double_quotes // trim double quotes for csv 43: optional i32 skip_lines // csv skip line num, only used when csv header_type is not set. + 44: optional bool enable_profile } struct TStreamLoadPutResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org