This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3fb4a9f7b9c branch-4.0: [fix](load) Use atomic operations for 
_try_close flag and remove unused _close_wait #61593 (#61623)
3fb4a9f7b9c is described below

commit 3fb4a9f7b9cedc69e86821aadb4da0bd8ead2b15
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Mar 25 15:14:34 2026 +0800

    branch-4.0: [fix](load) Use atomic operations for _try_close flag and 
remove unused _close_wait #61593 (#61623)
    
    Cherry-picked from #61593
    
    Co-authored-by: zclllyybb <[email protected]>
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 5 ++---
 be/src/vec/sink/writer/vtablet_writer.h   | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index c8cb3496d2c..3c8133afaa1 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1387,7 +1387,7 @@ void VTabletWriter::_send_batch_process() {
         // we must RECHECK opened_nodes below, after got closed signal, 
because it may changed. Think of this:
         //      checked opened_nodes = 0 ---> new block arrived ---> task 
finished, close() was called ---> we got _try_close here
         // if we don't check again, we may lose the last package.
-        if (_try_close) {
+        if (_try_close.load(std::memory_order_acquire)) {
             opened_nodes = 0;
             std::ranges::for_each(_channels,
                                   [&opened_nodes](const 
std::shared_ptr<IndexChannel>& ich) {
@@ -1777,7 +1777,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
         status = _send_new_partition_batch();
     }
 
-    _try_close = true; // will stop periodic thread
+    _try_close.store(true, std::memory_order_release); // will stop periodic 
thread
     if (status.ok()) {
         // BE id -> add_batch method counter
         std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
@@ -1858,7 +1858,6 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
     if (!status.ok()) {
         _cancel_all_channel(status);
         _close_status = status;
-        _close_wait = true;
     }
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 6249febf8ad..32c55cff6db 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -756,9 +756,8 @@ private:
     // Save the status of try_close() and close() method
     Status _close_status;
     // if we called try_close(), for auto partition the periodic send thread 
should stop if it's still waiting for node channels first-time open.
-    bool _try_close = false;
-    // for non-pipeline, if close() did something, close_wait() should wait it.
-    bool _close_wait = false;
+    // atomic: written by pthread (_do_try_close), read by bthread 
(_send_batch_process)
+    std::atomic<bool> _try_close {false};
     bool _inited = false;
     bool _write_file_cache = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to