This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 549bc3e288e [fix](pipelinex) fix fragment instance progress reports 
(#40325) (#40987)
549bc3e288e is described below

commit 549bc3e288e003856e9c233f0d38855b621e6e2e
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Sep 19 23:58:38 2024 +0800

    [fix](pipelinex) fix fragment instance progress reports (#40325) (#40987)
    
    backport #40325
---
 be/src/runtime/fragment_mgr.cpp                     | 14 ++++++++++----
 .../main/java/org/apache/doris/qe/Coordinator.java  | 21 ++++++++++++++++-----
 gensrc/thrift/FrontendService.thrift                |  7 +++++++
 3 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 829440f339e..e5cd7c7cb8d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -402,7 +402,6 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
         int64_t num_rows_load_success = 0;
         int64_t num_rows_load_filtered = 0;
         int64_t num_rows_load_unselected = 0;
-        int64_t num_finished_ranges = 0;
         if (req.runtime_state->num_rows_load_total() > 0 ||
             req.runtime_state->num_rows_load_filtered() > 0 ||
             req.runtime_state->num_finished_range() > 0) {
@@ -411,7 +410,11 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
             num_rows_load_success = req.runtime_state->num_rows_load_success();
             num_rows_load_filtered = 
req.runtime_state->num_rows_load_filtered();
             num_rows_load_unselected = 
req.runtime_state->num_rows_load_unselected();
-            num_finished_ranges = req.runtime_state->num_finished_range();
+            params.__isset.fragment_instance_reports = true;
+            TFragmentInstanceReport t;
+            
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
+            
t.__set_num_finished_range(req.runtime_state->num_finished_range());
+            params.fragment_instance_reports.push_back(t);
         } else if (!req.runtime_states.empty()) {
             for (auto* rs : req.runtime_states) {
                 if (rs->num_rows_load_total() > 0 || 
rs->num_rows_load_filtered() > 0 ||
@@ -420,11 +423,14 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
                     num_rows_load_success += rs->num_rows_load_success();
                     num_rows_load_filtered += rs->num_rows_load_filtered();
                     num_rows_load_unselected += rs->num_rows_load_unselected();
-                    num_finished_ranges += rs->num_finished_range();
+                    params.__isset.fragment_instance_reports = true;
+                    TFragmentInstanceReport t;
+                    t.__set_fragment_instance_id(rs->fragment_instance_id());
+                    t.__set_num_finished_range(rs->num_finished_range());
+                    params.fragment_instance_reports.push_back(t);
                 }
             }
         }
-        params.__set_finished_scan_ranges(num_finished_ranges);
         params.load_counters.emplace(s_dpp_normal_all, 
std::to_string(num_rows_load_success));
         params.load_counters.emplace(s_dpp_abnormal_all, 
std::to_string(num_rows_load_filtered));
         params.load_counters.emplace(s_unselected_rows, 
std::to_string(num_rows_load_unselected));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef67084741b..cc1b36ac941 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -93,6 +93,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TExternalScanRange;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.TFragmentInstanceReport;
 import org.apache.doris.thrift.THivePartitionUpdate;
 import org.apache.doris.thrift.TIcebergCommitData;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -2821,11 +2822,21 @@ public class Coordinator implements CoordInterface {
         }
 
         if (params.isSetLoadedRows() && jobId != -1) {
-            Env.getCurrentEnv().getLoadManager().updateJobProgress(
-                    jobId, params.getBackendId(), params.getQueryId(), 
params.getFragmentInstanceId(),
-                    params.getLoadedRows(), params.getLoadedBytes(), 
params.isDone());
-            
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
-                    params.getQueryId(), params.getFragmentInstanceId(), 
params.getFinishedScanRanges());
+            if (params.isSetFragmentInstanceReports()) {
+                for (TFragmentInstanceReport report : 
params.getFragmentInstanceReports()) {
+                    Env.getCurrentEnv().getLoadManager().updateJobProgress(
+                            jobId, params.getBackendId(), params.getQueryId(), 
report.getFragmentInstanceId(),
+                            params.getLoadedRows(), params.getLoadedBytes(), 
params.isDone());
+                    
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+                            params.getQueryId(), 
report.getFragmentInstanceId(), report.getNumFinishedRange());
+                }
+            } else {
+                Env.getCurrentEnv().getLoadManager().updateJobProgress(
+                        jobId, params.getBackendId(), params.getQueryId(), 
params.getFragmentInstanceId(),
+                        params.getLoadedRows(), params.getLoadedBytes(), 
params.isDone());
+                
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+                        params.getQueryId(), params.getFragmentInstanceId(), 
params.getFinishedScanRanges());
+            }
         }
     }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 61dce73400b..5480a84cf69 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -417,6 +417,11 @@ struct TReportWorkloadRuntimeStatusParams {
     2: optional map<string, TQueryStatistics> query_statistics_map
 }
 
+struct TFragmentInstanceReport {
+  1: optional Types.TUniqueId fragment_instance_id;
+  2: optional i32 num_finished_range;
+}
+
 // The results of an INSERT query, sent to the coordinator as part of
 // TReportExecStatusParams
 struct TReportExecStatusParams {
@@ -487,6 +492,8 @@ struct TReportExecStatusParams {
   26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates
 
   28: optional list<DataSinks.TIcebergCommitData> iceberg_commit_datas
+
+  31: optional list<TFragmentInstanceReport> fragment_instance_reports;
 }
 
 struct TFeResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to