This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 00d81c8d6e3 branch-3.1: [fix](scanner) Make Scanner close() method 
thread-safe using atomic operations #57436 (#57644)
00d81c8d6e3 is described below

commit 00d81c8d6e3b126219a921119e0c697cfb395d09
Author: Socrates <[email protected]>
AuthorDate: Wed Nov 5 14:10:47 2025 +0800

    branch-3.1: [fix](scanner) Make Scanner close() method thread-safe using 
atomic operations #57436 (#57644)
    
    bp: #57436
---
 be/src/vec/exec/scan/new_es_scanner.cpp   |  2 +-
 be/src/vec/exec/scan/new_jdbc_scanner.cpp |  3 +++
 be/src/vec/exec/scan/new_olap_scanner.cpp |  2 +-
 be/src/vec/exec/scan/vfile_scanner.cpp    |  2 +-
 be/src/vec/exec/scan/vmeta_scanner.cpp    |  5 ++++-
 be/src/vec/exec/scan/vscan_node.h         |  2 +-
 be/src/vec/exec/scan/vscanner.cpp         | 12 +++++++-----
 be/src/vec/exec/scan/vscanner.h           |  8 +++++++-
 8 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp 
b/be/src/vec/exec/scan/new_es_scanner.cpp
index b19b009b314..6ef27491870 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -197,7 +197,7 @@ Status 
NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& column
 }
 
 Status NewEsScanner::close(RuntimeState* state) {
-    if (_is_closed) {
+    if (!_try_close()) {
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp 
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 480b34b3bf4..d90633ca07b 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -195,6 +195,9 @@ void NewJdbcScanner::_update_profile() {
 }
 
 Status NewJdbcScanner::close(RuntimeState* state) {
+    if (!_try_close()) {
+        return Status::OK();
+    }
     RETURN_IF_ERROR(VScanner::close(state));
     RETURN_IF_ERROR(_jdbc_connector->close());
     return Status::OK();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 33a7e54f8b7..df69c2f345b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -538,7 +538,7 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool*
 }
 
 Status NewOlapScanner::close(RuntimeState* state) {
-    if (_is_closed) {
+    if (!_try_close()) {
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 61f32af56e6..e2e180c61fc 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -1540,7 +1540,7 @@ Status VFileScanner::_init_expr_ctxes() {
 }
 
 Status VFileScanner::close(RuntimeState* state) {
-    if (_is_closed) {
+    if (!_try_close()) {
         return Status::OK();
     }
 
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 0860d2bb979..c444e125021 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -527,7 +527,10 @@ Status 
VMetaScanner::_build_partition_values_metadata_request(
 }
 
 Status VMetaScanner::close(RuntimeState* state) {
-    VLOG_CRITICAL << "VMetaScanner::close";
+    VLOG_CRITICAL << "MetaScanner::close";
+    if (!_try_close()) {
+        return Status::OK();
+    }
     if (_reader) {
         RETURN_IF_ERROR(_reader->close());
     }
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 70ac5970274..54193dc8ff3 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -25,7 +25,7 @@ class VScanner;
 class VSlotRef;
 
 // We want to close scanner automatically, so using a delegate class
-// and call close method in the delegate class's dctor.
+// and call close method in the delegate class's destructor.
 class ScannerDelegate {
 public:
     VScannerSPtr _scanner;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 96a384177de..3a6ba20acec 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -232,15 +232,17 @@ Status VScanner::try_append_late_arrival_runtime_filter() 
{
 }
 
 Status VScanner::close(RuntimeState* state) {
-    if (_is_closed) {
-        return Status::OK();
-    }
-
+#ifndef BE_TEST
     COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer, 
_scanner_wait_worker_timer);
-    _is_closed = true;
+#endif
     return Status::OK();
 }
 
+bool VScanner::_try_close() {
+    bool expected = false;
+    return _is_closed.compare_exchange_strong(expected, true);
+}
+
 void VScanner::_collect_profile_before_close() {
     COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
     COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index ac6371c3abc..72c4958482a 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -20,6 +20,7 @@
 #include <stdint.h>
 
 #include <algorithm>
+#include <atomic>
 #include <vector>
 
 #include "common/status.h"
@@ -95,6 +96,11 @@ protected:
     // Update the counters before closing this scanner
     virtual void _collect_profile_before_close();
 
+    // Check if scanner is already closed, if not, mark it as closed.
+    // Returns true if the scanner was successfully marked as closed (first 
time).
+    // Returns false if the scanner was already closed.
+    bool _try_close();
+
     // Filter the output block finally.
     Status _filter_output_block(Block* block);
 
@@ -187,7 +193,7 @@ protected:
     Block _input_block;
 
     bool _is_open = false;
-    bool _is_closed = false;
+    std::atomic<bool> _is_closed {false};
     bool _need_to_close = false;
     Status _status;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to