This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d0d7a51d3f5 branch-3.1: [fix](load) update scanned rows and loaded 
bytes progressively #54606 (#54787)
d0d7a51d3f5 is described below

commit d0d7a51d3f5086b094e11ae8a16be38bb94907c9
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 15 17:49:29 2025 +0800

    branch-3.1: [fix](load) update scanned rows and loaded bytes progressively 
#54606 (#54787)
    
    Backport #54606
    
    ### What problem does this PR solve?
    
    Issue Number: DORIS-20541
    
    Related PR: #29802
    
    Problem Summary:
    
    Fix an issue where the scanned rows and loaded bytes metrics were not
    updated progressively in the FE.
    Although the BE periodically reports execution status, the FE was
    ignoring these reports due to a change introduced in PR #29802, which
    skips processing reports without the isDone flag.
    
    This fix ensures that intermediate execution reports are processed,
    allowing progressive updates of scanned rows and loaded bytes during
    query execution.
---
 be/src/vec/sink/writer/vtablet_writer.cpp          |   5 +
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   5 +
 .../apache/doris/load/loadv2/LoadStatistic.java    |  17 +++-
 .../main/java/org/apache/doris/qe/Coordinator.java |  38 ++++----
 .../test_s3_load_progressive_scanned_rows.groovy   | 107 +++++++++++++++++++++
 5 files changed, 152 insertions(+), 20 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fdb75a31d87..812cc7cc773 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1791,6 +1791,11 @@ Status VTabletWriter::close(Status exec_status) {
     _do_try_close(_state, exec_status);
     TEST_INJECTION_POINT("VOlapTableSink::close");
 
+    DBUG_EXECUTE_IF("VTabletWriter.close.sleep", {
+        auto sleep_sec = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "VTabletWriter.close.sleep", "sleep_sec", 1);
+        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
+    });
     DBUG_EXECUTE_IF("VTabletWriter.close.close_status_not_ok",
                     { _close_status = Status::InternalError("injected close 
status not ok"); });
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 2c89975d142..187f6160aff 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -613,6 +613,11 @@ Status VTabletWriterV2::close(Status exec_status) {
         status = _send_new_partition_batch();
     }
 
+    DBUG_EXECUTE_IF("VTabletWriterV2.close.sleep", {
+        auto sleep_sec = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "VTabletWriterV2.close.sleep", "sleep_sec", 1);
+        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
+    });
     DBUG_EXECUTE_IF("VTabletWriterV2.close.cancel",
                     { status = Status::InternalError("load cancel"); });
     if (status.ok()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
index 0c65aa27851..43c67098bfd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadStatistic.java
@@ -25,6 +25,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Table;
 import com.google.gson.Gson;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.HashMap;
 import java.util.List;
@@ -32,6 +34,8 @@ import java.util.Map;
 import java.util.Set;
 
 public class LoadStatistic {
+    private static final Logger LOG = 
LogManager.getLogger(LoadStatistic.class);
+
     // number of rows processed on BE, this number will be updated 
periodically by query report.
     // A load job may has several load tasks(queries), and each task has 
several fragments.
     // each fragment will report independently.
@@ -78,15 +82,24 @@ public class LoadStatistic {
     public synchronized void updateLoadProgress(long backendId, TUniqueId 
loadId, TUniqueId fragmentId,
                                                 long rows, long bytes, boolean 
isDone) {
         if (counterTbl.contains(loadId, fragmentId)) {
-            counterTbl.put(loadId, fragmentId, rows);
+            if (counterTbl.get(loadId, fragmentId) < rows) {
+                counterTbl.put(loadId, fragmentId, rows);
+            }
         }
 
         if (loadBytes.contains(loadId, fragmentId)) {
-            loadBytes.put(loadId, fragmentId, bytes);
+            if (loadBytes.get(loadId, fragmentId) < bytes) {
+                loadBytes.put(loadId, fragmentId, bytes);
+            }
         }
         if (isDone && unfinishedBackendIds.containsKey(loadId)) {
             unfinishedBackendIds.get(loadId).remove(backendId);
         }
+
+        LOG.debug("updateLoadProgress: loadId={}, fragmentId={}, backendId={}, 
"
+                + "rows={}, bytes={}, isDone={}, scannedRows={}, loadBytes={}",
+                DebugUtil.printId(loadId), DebugUtil.printId(fragmentId), 
backendId,
+                rows, bytes, isDone, getScannedRows(), getLoadBytes());
     }
 
     public synchronized long getScannedRows() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 94e322e1153..452afb20847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2381,8 +2381,28 @@ public class Coordinator implements CoordInterface {
 
     // update job progress from BE
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
+        if (params.isSetLoadedRows() && jobId != -1) {
+            if (params.isSetFragmentInstanceReports()) {
+                for (TFragmentInstanceReport report : 
params.getFragmentInstanceReports()) {
+                    Env.getCurrentEnv().getLoadManager().updateJobProgress(
+                            jobId, params.getBackendId(), params.getQueryId(), 
report.getFragmentInstanceId(),
+                            report.getLoadedRows(), report.getLoadedBytes(), 
params.isDone());
+                    
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+                            params.getQueryId(), 
report.getFragmentInstanceId(), report.getNumFinishedRange());
+                }
+            } else {
+                Env.getCurrentEnv().getLoadManager().updateJobProgress(
+                        jobId, params.getBackendId(), params.getQueryId(), 
params.getFragmentInstanceId(),
+                        params.getLoadedRows(), params.getLoadedBytes(), 
params.isDone());
+                
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+                        params.getQueryId(), params.getFragmentInstanceId(), 
params.getFinishedScanRanges());
+            }
+        }
+
         PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
         if (ctx == null || !ctx.updatePipelineStatus(params)) {
+            LOG.debug("Fragment {} is not done, ignore report status: {}",
+                    params.getFragmentId(), params.toString());
             return;
         }
 
@@ -2448,24 +2468,6 @@ public class Coordinator implements CoordInterface {
             }
             fragmentsDoneLatch.markedCountDown(params.getFragmentId(), 
params.getBackendId());
         }
-
-        if (params.isSetLoadedRows() && jobId != -1) {
-            if (params.isSetFragmentInstanceReports()) {
-                for (TFragmentInstanceReport report : 
params.getFragmentInstanceReports()) {
-                    Env.getCurrentEnv().getLoadManager().updateJobProgress(
-                            jobId, params.getBackendId(), params.getQueryId(), 
report.getFragmentInstanceId(),
-                            report.getLoadedRows(), report.getLoadedBytes(), 
params.isDone());
-                    
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
-                            params.getQueryId(), 
report.getFragmentInstanceId(), report.getNumFinishedRange());
-                }
-            } else {
-                Env.getCurrentEnv().getLoadManager().updateJobProgress(
-                        jobId, params.getBackendId(), params.getQueryId(), 
params.getFragmentInstanceId(),
-                        params.getLoadedRows(), params.getLoadedBytes(), 
params.isDone());
-                
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
-                        params.getQueryId(), params.getFragmentInstanceId(), 
params.getFinishedScanRanges());
-            }
-        }
     }
 
     /*
diff --git 
a/regression-test/suites/fault_injection_p0/test_s3_load_progressive_scanned_rows.groovy
 
b/regression-test/suites/fault_injection_p0/test_s3_load_progressive_scanned_rows.groovy
new file mode 100644
index 00000000000..590000dae92
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_s3_load_progressive_scanned_rows.groovy
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_s3_load_progressive_scanned_rows", "nonConcurrent,p0") {
+    def tableName = "segcompaction_correctness_test"
+    def create_table_sql = """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` 
VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
+                    `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` 
VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
+                    `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` 
VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
+                    `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` 
VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
+                    `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` 
VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
+                    `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` 
VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
+                    `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` 
VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
+                    `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` 
VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
+                    `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` 
VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
+                    `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` 
VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
+                    )
+                DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
+                PROPERTIES ( "replication_num" = "1" );
+            """
+    def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, 
col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, 
col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, 
col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, 
col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, 
col_48, col_49"
+    def runLoadWithSleep = {
+        String ak = getS3AK()
+        String sk = getS3SK()
+        String endpoint = getS3Endpoint()
+        String region = getS3Region()
+        String bucket = getS3BucketName()
+        try {
+
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql "${create_table_sql}"
+
+            def uuid = UUID.randomUUID().toString().replace("-", "0")
+            String columns_str = ("$columns" != "") ? "($columns)" : "";
+
+            sql """
+            LOAD LABEL $uuid (
+                DATA 
INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
+                INTO TABLE ${tableName}
+                FORMAT AS "ORC"
+                $columns_str
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "$endpoint",
+                "AWS_REGION" = "$region",
+                "provider" = "${getS3Provider()}"
+            )
+            """
+
+            def max_try_milli_secs = 120000
+            def scannedRows = 0
+            def loadBytes   = 0
+            String [][] result = ''
+            while (max_try_milli_secs > 0) {
+                result = sql """ show load where label="$uuid" order by 
createtime desc limit 1; """
+                logger.info("SHOW LOAD result: ${result}")
+
+                scannedRows = (result =~ /"ScannedRows":(\d+)/)[0][1] as long
+                loadBytes   = (result =~ /"LoadBytes":(\d+)/)[0][1] as long
+
+                if (scannedRows > 0 && loadBytes > 0) {
+                    break;
+                }
+                Thread.sleep(1000)
+                max_try_milli_secs -= 1000
+                if(max_try_milli_secs <= 0) {
+                    assertTrue(1 == 2, "load Timeout: $uuid")
+                }
+            }
+            try_sql(""" cancel load where label="$uuid"; """)
+            assertTrue(scannedRows >= 0, "ScannedRows should be >= 0 but was 
${scannedRows}")
+            assertTrue(loadBytes >= 0, "LoadBytes should be >= 0 but was 
${loadBytes}")
+        } finally {
+            try_sql("DROP TABLE IF EXISTS ${tableName}")
+        }
+    }
+
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs("VTabletWriter.close.sleep", 
[sleep_sec:150]);
+        
GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.sleep", 
[sleep_sec:150]);
+        runLoadWithSleep()
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs("VTabletWriter.close.sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.sleep")
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to