This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new fa40c1589ff branch-4.1: [fix](load) Use atomic operations for
_try_close flag and remove unused _close_wait #61593 (#61624)
fa40c1589ff is described below
commit fa40c1589ff284c69df6cbc72b34a4b8f7172579
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Mar 25 15:39:19 2026 +0800
branch-4.1: [fix](load) Use atomic operations for _try_close flag and
remove unused _close_wait #61593 (#61624)
Cherry-picked from #61593
Co-authored-by: zclllyybb <[email protected]>
---
be/src/exec/sink/writer/vtablet_writer.cpp | 5 ++---
be/src/exec/sink/writer/vtablet_writer.h | 5 ++---
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 934446e18c3..013da8e58c3 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -1385,7 +1385,7 @@ void VTabletWriter::_send_batch_process() {
// we must RECHECK opened_nodes below, after got closed signal,
because it may changed. Think of this:
// checked opened_nodes = 0 ---> new block arrived ---> task
finished, close() was called ---> we got _try_close here
// if we don't check again, we may lose the last package.
- if (_try_close) {
+ if (_try_close.load(std::memory_order_acquire)) {
opened_nodes = 0;
std::ranges::for_each(_channels,
[&opened_nodes](const
std::shared_ptr<IndexChannel>& ich) {
@@ -1775,7 +1775,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
status = _send_new_partition_batch();
}
- _try_close = true; // will stop periodic thread
+ _try_close.store(true, std::memory_order_release); // will stop periodic
thread
if (status.ok()) {
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
@@ -1856,7 +1856,6 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
if (!status.ok()) {
_cancel_all_channel(status);
_close_status = status;
- _close_wait = true;
}
}
diff --git a/be/src/exec/sink/writer/vtablet_writer.h
b/be/src/exec/sink/writer/vtablet_writer.h
index d9e3869e68e..d3e6e8da0f1 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -752,9 +752,8 @@ private:
// Save the status of try_close() and close() method
Status _close_status;
// if we called try_close(), for auto partition the periodic send thread
should stop if it's still waiting for node channels first-time open.
- bool _try_close = false;
- // for non-pipeline, if close() did something, close_wait() should wait it.
- bool _close_wait = false;
+ // atomic: written by pthread (_do_try_close), read by bthread
(_send_batch_process)
+ std::atomic<bool> _try_close {false};
bool _inited = false;
bool _write_file_cache = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]