This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 11971eddb49 [atomicstatus](be) add atomic status to share state 
between multi thread (#35002)
11971eddb49 is described below

commit 11971eddb49fe670c3ef11bcb77f407d3b97661d
Author: yiguolei <676222...@qq.com>
AuthorDate: Fri May 17 22:47:00 2024 +0800

    [atomicstatus](be) add atomic status to share state between multi thread 
(#35002)
---
 be/src/common/status.h                         | 44 ++++++++++++++++++++++++++
 be/src/vec/sink/writer/async_result_writer.cpp | 16 ++++------
 be/src/vec/sink/writer/async_result_writer.h   |  8 ++---
 3 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index 39cb205ce89..cf9b42a3c69 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -542,6 +542,50 @@ private:
     }
 };
 
+// There are many thread using status to indicate the cancel state, one thread 
may update it and
+// the other thread will read it. Status is not thread safe, for example, if 
one thread is update it
+// and another thread is call to_string method, it may core, because the 
_err_msg is an unique ptr and
+// it is deconstructed during copy method.
+// And also we could not use lock, because we need get status frequently to 
check if it is cancelled.
+// The defaule value is ok.
+class AtomicStatus {
+public:
+    AtomicStatus() : error_st_(Status::OK()) {}
+
+    bool ok() const { return error_code_.load() == 0; }
+
+    bool update(const Status& new_status) {
+        // If new status is normal, or the old status is abnormal, then not 
need update
+        if (new_status.ok() || error_code_.load() != 0) {
+            return false;
+        }
+        int16_t expected_error_code = 0;
+        if (error_code_.compare_exchange_strong(expected_error_code, 
new_status.code(),
+                                                std::memory_order_acq_rel)) {
+            // lock here for read status, to avoid core during return error_st_
+            std::lock_guard l(mutex_);
+            error_st_ = new_status;
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    // will copy a new status object to avoid concurrency
+    Status status() {
+        std::lock_guard l(mutex_);
+        return error_st_;
+    }
+
+private:
+    std::atomic_int16_t error_code_ = 0;
+    Status error_st_;
+    std::mutex mutex_;
+
+    AtomicStatus(const AtomicStatus&) = delete;
+    void operator=(const AtomicStatus&) = delete;
+};
+
 inline std::ostream& operator<<(std::ostream& ostr, const Status& status) {
     ostr << '[' << status.code_as_string() << ']';
     ostr << status.msg();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 9a84f374464..4ed878a4634 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -55,7 +55,7 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
     // if io task failed, just return error status to
     // end the query
     if (!_writer_status.ok()) {
-        return _writer_status;
+        return _writer_status.status();
     }
 
     if (_dependency && _is_finished()) {
@@ -143,7 +143,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             auto status = write(*block);
             if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
-                _writer_status = status;
+                _writer_status.update(status);
                 if (_dependency && _is_finished()) {
                     _dependency->set_ready();
                 }
@@ -172,14 +172,10 @@ void AsyncResultWriter::process_block(RuntimeState* 
state, RuntimeProfile* profi
         // Should not call finish in lock because it may hang, and it will 
lock _m too long.
         // And get_writer_status will also need this lock, it will block 
pipeline exec thread.
         Status st = finish(state);
-        std::lock_guard l(_m);
-        _writer_status = st;
+        _writer_status.update(st);
     }
     Status st = Status::OK();
-    {
-        std::lock_guard l(_m);
-        st = _writer_status;
-    }
+    { st = _writer_status.status(); }
 
     Status close_st = close(st);
     {
@@ -187,7 +183,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
         // the real reason.
         std::lock_guard l(_m);
         if (_writer_status.ok()) {
-            _writer_status = close_st;
+            _writer_status.update(close_st);
         }
         _writer_thread_closed = true;
     }
@@ -215,7 +211,7 @@ Status 
AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc
 
 void AsyncResultWriter::force_close(Status s) {
     std::lock_guard l(_m);
-    _writer_status = s;
+    _writer_status.update(s);
     if (_dependency && _is_finished()) {
         _dependency->set_ready();
     }
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 7f9700486da..1fd0fc280f0 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -78,10 +78,7 @@ public:
     // Add the IO thread task process block() to thread pool to dispose the IO
     Status start_writer(RuntimeState* state, RuntimeProfile* profile);
 
-    Status get_writer_status() {
-        std::lock_guard l(_m);
-        return _writer_status;
-    }
+    Status get_writer_status() { return _writer_status.status(); }
 
 protected:
     Status _projection_block(Block& input_block, Block* output_block);
@@ -103,7 +100,8 @@ private:
     std::mutex _m;
     std::condition_variable _cv;
     std::deque<std::unique_ptr<Block>> _data_queue;
-    Status _writer_status = Status::OK();
+    // Default value is ok
+    AtomicStatus _writer_status;
     bool _eos = false;
     // The writer is not started at the beginning. If prepare failed but not 
open, the the writer
     // is not started, so should not pending finish on it.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to