This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3fb4a9f7b9c branch-4.0: [fix](load) Use atomic operations for
_try_close flag and remove unused _close_wait #61593 (#61623)
3fb4a9f7b9c is described below
commit 3fb4a9f7b9cedc69e86821aadb4da0bd8ead2b15
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Mar 25 15:14:34 2026 +0800
branch-4.0: [fix](load) Use atomic operations for _try_close flag and
remove unused _close_wait #61593 (#61623)
Cherry-picked from #61593
Co-authored-by: zclllyybb <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 5 ++---
be/src/vec/sink/writer/vtablet_writer.h | 5 ++---
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index c8cb3496d2c..3c8133afaa1 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1387,7 +1387,7 @@ void VTabletWriter::_send_batch_process() {
// we must RECHECK opened_nodes below, after got closed signal,
because it may changed. Think of this:
// checked opened_nodes = 0 ---> new block arrived ---> task
finished, close() was called ---> we got _try_close here
// if we don't check again, we may lose the last package.
- if (_try_close) {
+ if (_try_close.load(std::memory_order_acquire)) {
opened_nodes = 0;
std::ranges::for_each(_channels,
[&opened_nodes](const
std::shared_ptr<IndexChannel>& ich) {
@@ -1777,7 +1777,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
status = _send_new_partition_batch();
}
- _try_close = true; // will stop periodic thread
+ _try_close.store(true, std::memory_order_release); // will stop periodic
thread
if (status.ok()) {
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
@@ -1858,7 +1858,6 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
if (!status.ok()) {
_cancel_all_channel(status);
_close_status = status;
- _close_wait = true;
}
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 6249febf8ad..32c55cff6db 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -756,9 +756,8 @@ private:
// Save the status of try_close() and close() method
Status _close_status;
// if we called try_close(), for auto partition the periodic send thread
should stop if it's still waiting for node channels first-time open.
- bool _try_close = false;
- // for non-pipeline, if close() did something, close_wait() should wait it.
- bool _close_wait = false;
+ // atomic: written by pthread (_do_try_close), read by bthread
(_send_batch_process)
+ std::atomic<bool> _try_close {false};
bool _inited = false;
bool _write_file_cache = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]