This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 09a7a355478 [fix] (streamload) fixed the issue of data loss due to 
concurrency when importing data from streamload (#48948) (#49666)
09a7a355478 is described below

commit 09a7a355478bd5f18572da508b6780f28817c02b
Author: Xin Liao <liao...@selectdb.com>
AuthorDate: Mon Mar 31 11:18:42 2025 +0800

    [fix] (streamload) fixed the issue of data loss due to concurrency when 
importing data from streamload (#48948) (#49666)
    
    Cherry-picked from #48948
    
    Co-authored-by: kang <35803862+ghkan...@users.noreply.github.com>
    Co-authored-by: lik40 <li...@chinatelecom.cn>
---
 be/src/vec/sink/writer/async_result_writer.cpp | 54 +++++++++++++-------------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 29877cb268a..0cc37e3458b 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -108,41 +108,43 @@ void AsyncResultWriter::process_block(RuntimeState* 
state, RuntimeProfile* profi
     }
 
     DCHECK(_dependency);
-    if (_writer_status.ok()) {
-        while (true) {
-            ThreadCpuStopWatch cpu_time_stop_watch;
-            cpu_time_stop_watch.start();
-            Defer defer {[&]() {
-                if (state && state->get_query_ctx()) {
-                    
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
-                }
-            }};
-            if (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                std::unique_lock l(_m);
-                while (!_eos && _data_queue.empty() && _writer_status.ok()) {
-                    // Add 1s to check to avoid lost signal
-                    _cv.wait_for(l, std::chrono::seconds(1));
-                }
+    while (_writer_status.ok()) {
+        ThreadCpuStopWatch cpu_time_stop_watch;
+        cpu_time_stop_watch.start();
+        Defer defer {[&]() {
+            if (state && state->get_query_ctx()) {
+                
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
+            }
+        }};
+
+        //1) wait scan operator write data
+        {
+            std::unique_lock l(_m);
+            while (!_eos && _data_queue.empty() && _writer_status.ok()) {
+                // Add 1s to check to avoid lost signal
+                _cv.wait_for(l, std::chrono::seconds(1));
             }
 
+            //check if eos or writer error
             if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
                 _data_queue.clear();
                 break;
             }
+        }
 
-            auto block = _get_block_from_queue();
-            auto status = write(state, *block);
-            if (!status.ok()) [[unlikely]] {
-                std::unique_lock l(_m);
-                _writer_status.update(status);
-                if (_is_finished()) {
-                    _dependency->set_ready();
-                }
-                break;
+        //2) get the block from  data queue and write to downstream
+        auto block = _get_block_from_queue();
+        auto status = write(state, *block);
+        if (!status.ok()) [[unlikely]] {
+            std::unique_lock l(_m);
+            _writer_status.update(status);
+            if (_is_finished()) {
+                _dependency->set_ready();
             }
-
-            _return_free_block(std::move(block));
+            break;
         }
+
+        _return_free_block(std::move(block));
     }
 
     bool need_finish = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to