This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4948f0f90 [fix](load) fix DataSink prepared check in 
PlanFragmentExecutor (#27735)
5a4948f0f90 is described below

commit 5a4948f0f901afcc4197065a0e29e5616386f9bf
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Nov 30 15:24:04 2023 +0800

    [fix](load) fix DataSink prepared check in PlanFragmentExecutor (#27735)
---
 be/src/runtime/plan_fragment_executor.cpp | 13 +++++++++----
 be/src/runtime/plan_fragment_executor.h   |  3 +++
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index e6b979c3a99..437fe34fe74 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -89,6 +89,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
           _report_thread_active(false),
           _done(false),
           _prepared(false),
+          _opened(false),
           _closed(false),
           _is_report_success(false),
           _is_report_on_cancel(true),
@@ -318,6 +319,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
             return Status::OK();
         }
         RETURN_IF_ERROR(_sink->open(runtime_state()));
+        _opened = true;
         std::unique_ptr<doris::vectorized::Block> block =
                 _group_commit ? doris::vectorized::FutureBlock::create_unique()
                               : doris::vectorized::Block::create_unique();
@@ -659,16 +661,19 @@ void PlanFragmentExecutor::close() {
         }
 
         if (_sink != nullptr) {
-            if (_prepared) {
+            if (!_prepared) {
+                static_cast<void>(
+                        _sink->close(runtime_state(), 
Status::InternalError("prepare failed")));
+            } else if (!_opened) {
+                static_cast<void>(
+                        _sink->close(runtime_state(), 
Status::InternalError("open failed")));
+            } else {
                 Status status;
                 {
                     std::lock_guard<std::mutex> l(_status_lock);
                     status = _status;
                 }
                 static_cast<void>(_sink->close(runtime_state(), status));
-            } else {
-                static_cast<void>(
-                        _sink->close(runtime_state(), 
Status::InternalError("prepare failed")));
             }
         }
 
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index 5b37855a5b7..29309ccf501 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -183,6 +183,9 @@ private:
     // true if prepare() returned OK
     bool _prepared;
 
+    // true if open() returned OK
+    bool _opened;
+
     // true if close() has been called
     bool _closed;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to