This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 549bc3e288e [fix](pipelinex) fix fragment instance progress reports (#40325) (#40987) 549bc3e288e is described below commit 549bc3e288e003856e9c233f0d38855b621e6e2e Author: Kaijie Chen <c...@apache.org> AuthorDate: Thu Sep 19 23:58:38 2024 +0800 [fix](pipelinex) fix fragment instance progress reports (#40325) (#40987) backport #40325 --- be/src/runtime/fragment_mgr.cpp | 14 ++++++++++---- .../main/java/org/apache/doris/qe/Coordinator.java | 21 ++++++++++++++++----- gensrc/thrift/FrontendService.thrift | 7 +++++++ 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 829440f339e..e5cd7c7cb8d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -402,7 +402,6 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { int64_t num_rows_load_success = 0; int64_t num_rows_load_filtered = 0; int64_t num_rows_load_unselected = 0; - int64_t num_finished_ranges = 0; if (req.runtime_state->num_rows_load_total() > 0 || req.runtime_state->num_rows_load_filtered() > 0 || req.runtime_state->num_finished_range() > 0) { @@ -411,7 +410,11 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { num_rows_load_success = req.runtime_state->num_rows_load_success(); num_rows_load_filtered = req.runtime_state->num_rows_load_filtered(); num_rows_load_unselected = req.runtime_state->num_rows_load_unselected(); - num_finished_ranges = req.runtime_state->num_finished_range(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id()); + t.__set_num_finished_range(req.runtime_state->num_finished_range()); + params.fragment_instance_reports.push_back(t); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 || @@ -420,11 +423,14 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { num_rows_load_success += rs->num_rows_load_success(); num_rows_load_filtered += rs->num_rows_load_filtered(); num_rows_load_unselected += rs->num_rows_load_unselected(); - num_finished_ranges += rs->num_finished_range(); + params.__isset.fragment_instance_reports = true; + TFragmentInstanceReport t; + t.__set_fragment_instance_id(rs->fragment_instance_id()); + t.__set_num_finished_range(rs->num_finished_range()); + params.fragment_instance_reports.push_back(t); } } } - params.__set_finished_scan_ranges(num_finished_ranges); params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered)); params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ef67084741b..cc1b36ac941 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -93,6 +93,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList; import org.apache.doris.thrift.TExternalScanRange; import org.apache.doris.thrift.TFileScanRange; import org.apache.doris.thrift.TFileScanRangeParams; +import org.apache.doris.thrift.TFragmentInstanceReport; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TIcebergCommitData; import org.apache.doris.thrift.TNetworkAddress; @@ -2821,11 +2822,21 @@ public class Coordinator implements CoordInterface { } if (params.isSetLoadedRows() && jobId != -1) { - Env.getCurrentEnv().getLoadManager().updateJobProgress( - jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), - params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); - Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), - params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); + if (params.isSetFragmentInstanceReports()) { + for (TFragmentInstanceReport report : params.getFragmentInstanceReports()) { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), report.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), report.getFragmentInstanceId(), report.getNumFinishedRange()); + } + } else { + Env.getCurrentEnv().getLoadManager().updateJobProgress( + jobId, params.getBackendId(), params.getQueryId(), params.getFragmentInstanceId(), + params.getLoadedRows(), params.getLoadedBytes(), params.isDone()); + Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId), + params.getQueryId(), params.getFragmentInstanceId(), params.getFinishedScanRanges()); + } } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 61dce73400b..5480a84cf69 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -417,6 +417,11 @@ struct TReportWorkloadRuntimeStatusParams { 2: optional map<string, TQueryStatistics> query_statistics_map } +struct TFragmentInstanceReport { + 1: optional Types.TUniqueId fragment_instance_id; + 2: optional i32 num_finished_range; +} + // The results of an INSERT query, sent to the coordinator as part of // TReportExecStatusParams struct TReportExecStatusParams { @@ -487,6 +492,8 @@ struct TReportExecStatusParams { 26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates 28: optional list<DataSinks.TIcebergCommitData> iceberg_commit_datas + + 31: optional list<TFragmentInstanceReport> fragment_instance_reports; } struct TFeResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org