This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 00d81c8d6e3 branch-3.1: [fix](scanner) Make Scanner close() method
thread-safe using atomic operations #57436 (#57644)
00d81c8d6e3 is described below
commit 00d81c8d6e3b126219a921119e0c697cfb395d09
Author: Socrates <[email protected]>
AuthorDate: Wed Nov 5 14:10:47 2025 +0800
branch-3.1: [fix](scanner) Make Scanner close() method thread-safe using
atomic operations #57436 (#57644)
bp: #57436
---
be/src/vec/exec/scan/new_es_scanner.cpp | 2 +-
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 3 +++
be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 2 +-
be/src/vec/exec/scan/vmeta_scanner.cpp | 5 ++++-
be/src/vec/exec/scan/vscan_node.h | 2 +-
be/src/vec/exec/scan/vscanner.cpp | 12 +++++++-----
be/src/vec/exec/scan/vscanner.h | 8 +++++++-
8 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/exec/scan/new_es_scanner.cpp
b/be/src/vec/exec/scan/new_es_scanner.cpp
index b19b009b314..6ef27491870 100644
--- a/be/src/vec/exec/scan/new_es_scanner.cpp
+++ b/be/src/vec/exec/scan/new_es_scanner.cpp
@@ -197,7 +197,7 @@ Status
NewEsScanner::_get_next(std::vector<vectorized::MutableColumnPtr>& column
}
Status NewEsScanner::close(RuntimeState* state) {
- if (_is_closed) {
+ if (!_try_close()) {
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 480b34b3bf4..d90633ca07b 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -195,6 +195,9 @@ void NewJdbcScanner::_update_profile() {
}
Status NewJdbcScanner::close(RuntimeState* state) {
+ if (!_try_close()) {
+ return Status::OK();
+ }
RETURN_IF_ERROR(VScanner::close(state));
RETURN_IF_ERROR(_jdbc_connector->close());
return Status::OK();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 33a7e54f8b7..df69c2f345b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -538,7 +538,7 @@ Status NewOlapScanner::_get_block_impl(RuntimeState* state,
Block* block, bool*
}
Status NewOlapScanner::close(RuntimeState* state) {
- if (_is_closed) {
+ if (!_try_close()) {
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 61f32af56e6..e2e180c61fc 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -1540,7 +1540,7 @@ Status VFileScanner::_init_expr_ctxes() {
}
Status VFileScanner::close(RuntimeState* state) {
- if (_is_closed) {
+ if (!_try_close()) {
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 0860d2bb979..c444e125021 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -527,7 +527,10 @@ Status
VMetaScanner::_build_partition_values_metadata_request(
}
Status VMetaScanner::close(RuntimeState* state) {
- VLOG_CRITICAL << "VMetaScanner::close";
+ VLOG_CRITICAL << "MetaScanner::close";
+ if (!_try_close()) {
+ return Status::OK();
+ }
if (_reader) {
RETURN_IF_ERROR(_reader->close());
}
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 70ac5970274..54193dc8ff3 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -25,7 +25,7 @@ class VScanner;
class VSlotRef;
// We want to close scanner automatically, so using a delegate class
-// and call close method in the delegate class's dctor.
+// and call close method in the delegate class's destructor.
class ScannerDelegate {
public:
VScannerSPtr _scanner;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 96a384177de..3a6ba20acec 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -232,15 +232,17 @@ Status VScanner::try_append_late_arrival_runtime_filter()
{
}
Status VScanner::close(RuntimeState* state) {
- if (_is_closed) {
- return Status::OK();
- }
-
+#ifndef BE_TEST
COUNTER_UPDATE(_local_state->_scanner_wait_worker_timer,
_scanner_wait_worker_timer);
- _is_closed = true;
+#endif
return Status::OK();
}
+bool VScanner::_try_close() {
+ bool expected = false;
+ return _is_closed.compare_exchange_strong(expected, true);
+}
+
void VScanner::_collect_profile_before_close() {
COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index ac6371c3abc..72c4958482a 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <algorithm>
+#include <atomic>
#include <vector>
#include "common/status.h"
@@ -95,6 +96,11 @@ protected:
// Update the counters before closing this scanner
virtual void _collect_profile_before_close();
+ // Check if scanner is already closed, if not, mark it as closed.
+ // Returns true if the scanner was successfully marked as closed (first
time).
+ // Returns false if the scanner was already closed.
+ bool _try_close();
+
// Filter the output block finally.
Status _filter_output_block(Block* block);
@@ -187,7 +193,7 @@ protected:
Block _input_block;
bool _is_open = false;
- bool _is_closed = false;
+ std::atomic<bool> _is_closed {false};
bool _need_to_close = false;
Status _status;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]