This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 09b0066817f [fix](runtime filter) Fix debug string in 
RuntimeFilterWrapper (#49562)
09b0066817f is described below

commit 09b0066817f95629c783d993a07e17183d33d7e6
Author: Gabriel <liwenqi...@selectdb.com>
AuthorDate: Mon Mar 31 14:00:03 2025 +0800

    [fix](runtime filter) Fix debug string in RuntimeFilterWrapper (#49562)
    
    *** SIGABRT unknown detail explain (@0x1e11) received by PID 7697 (TID
    10382 OR 0x7fe74c2e9640) from PID 7697; stack trace: ***
    0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
    siginfo_t*, void*) at
    /home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421
     1# 0x00007FE875042520 in /lib/x86_64-linux-gnu/libc.so.6
     2# pthread_kill at ./nptl/pthread_kill.c:89
     3# raise at ../sysdeps/posix/raise.c:27
     4# abort at ./stdlib/abort.c:81
    5# __gnu_cxx::__verbose_terminate_handler() [clone .cold] at
    ../../../../libstdc++-v3/libsupc++/vterminate.cc:75
    6# __cxxabiv1::__terminate(void (*)()) at
    ../../../../libstdc++-v3/libsupc++/eh_terminate.cc:48
    7# 0x00005628EF52D5A1 in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    8# 0x00005628EF52D6F4 in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    9# 0x00005628EF52DAE6 in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    10# void fmt::v7::detail::buffer::append(char const*, char const*) in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    11# char const* fmt::v7::detail::parse_replacement_field, char,
    fmt::v7::basic_format_context, char> >&>(char const*, char const*,
    fmt::v7::detail::format_handler, char, fmt::v7::basic_format_context,
    char> >&) in /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    12# void fmt::v7::detail::vformat_to(fmt::v7::detail::buffer&,
    fmt::v7::basic_string_view, fmt::v7::basic_format_args::type>,
    fmt::v7::type_identity::type> >, fmt::v7::detail::locale_ref) in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    13# fmt::v7::detail::vformat[abi:cxx11](fmt::v7::basic_string_view,
    fmt::v7::format_args) in
    /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    14# doris::RuntimeFilterWrapper::debug_string[abi:cxx11]() const at
    
/home/zcp/repo_center/doris_master/doris/be/src/runtime_filter/runtime_filter_wrapper.cpp:602
    15# doris::RuntimeFilter::_debug_string[abi:cxx11]() const at
    
/home/zcp/repo_center/doris_master/doris/be/src/runtime_filter/runtime_filter.cpp:128
    16# doris::RuntimeFilterConsumer::debug_string[abi:cxx11]() const at
    
/home/zcp/repo_center/doris_master/doris/be/src/runtime_filter/runtime_filter_consumer.h:61
    17#
    
doris::RuntimeFilterConsumer::_set_state(doris::RuntimeFilterConsumer::State)
    in /mnt/hdd01/ci/doris-deploy-master-local/be/lib/doris_be
    18# doris::RuntimeFilterConsumer::acquire_expr(std::vector,
    std::allocator > >&) at
    
/home/zcp/repo_center/doris_master/doris/be/src/runtime_filter/runtime_filter_consumer.cpp:59
    19#
    doris::RuntimeFilterConsumerHelper::acquire_runtime_filter(std::vector,
    std::allocator > >&) at
    
/home/zcp/repo_center/doris_master/doris/be/src/runtime_filter/runtime_filter_consumer_helper.cpp:90
    20# doris::pipeline::ScanLocalState::open(doris::RuntimeState*) at
    
/home/zcp/repo_center/doris_master/doris/be/src/pipeline/exec/scan_operator.cpp:100
---
 be/src/runtime_filter/runtime_filter_wrapper.cpp |  3 ++-
 be/src/runtime_filter/runtime_filter_wrapper.h   | 21 ++++++++++++++-------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter_wrapper.cpp 
b/be/src/runtime_filter/runtime_filter_wrapper.cpp
index 3ce5b7a4733..55bdcce2a49 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.cpp
+++ b/be/src/runtime_filter/runtime_filter_wrapper.cpp
@@ -574,11 +574,12 @@ bool RuntimeFilterWrapper::contain_null() const {
     return false;
 }
 
-std::string RuntimeFilterWrapper::debug_string() const {
+std::string RuntimeFilterWrapper::debug_string() {
     auto type_string = _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER
                                ? fmt::format("{}({})", 
filter_type_to_string(_filter_type),
                                              
filter_type_to_string(get_real_type()))
                                : filter_type_to_string(_filter_type);
+    std::shared_lock<std::shared_mutex> rlock(_rwlock);
     auto result = fmt::format("[id: {}, state: {}, type: {}, column_type: {}", 
_filter_id,
                               
states_to_string<RuntimeFilterWrapper>({_state}), type_string,
                               type_to_string(_column_return_type));
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 7857cf94bc4..25f0f4c769b 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -49,8 +49,8 @@ public:
             : _column_return_type(column_type),
               _filter_type(type),
               _filter_id(filter_id),
-              _state(state),
-              _max_in_num(max_in_num) {}
+              _max_in_num(max_in_num),
+              _state(state) {}
 
     Status init(const size_t runtime_size);
     Status insert(const vectorized::ColumnPtr& column, size_t start);
@@ -85,19 +85,21 @@ public:
 
     bool contain_null() const;
 
-    std::string debug_string() const;
+    std::string debug_string();
 
     void set_state(State state, std::string reason = "") {
         if (_state == State::DISABLED) {
             return;
-        } else if (state == State::DISABLED) {
-            _reason = reason;
+        }
+        std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        if (_state == State::DISABLED) {
+            return;
         }
         _state = state;
         _reason = reason;
     }
     State get_state() const { return _state; }
-    void check_state(std::vector<State> assumed_states) const {
+    void check_state(std::vector<State> assumed_states) {
         if (!check_state_impl<RuntimeFilterWrapper>(_state, assumed_states)) {
             throw Exception(ErrorCode::INTERNAL_ERROR,
                             "wrapper meet invalid state, {}, assumed_states is 
{}", debug_string(),
@@ -131,14 +133,19 @@ private:
     const PrimitiveType _column_return_type; // column type
     const RuntimeFilterType _filter_type;
     const uint32_t _filter_id;
-    std::atomic<State> _state;
     const int32_t _max_in_num;
 
     std::shared_ptr<MinMaxFuncBase> _minmax_func;
     std::shared_ptr<HybridSetBase> _hybrid_set;
     std::shared_ptr<BloomFilterFuncBase> _bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func;
+
+    // Wrapper is the core structure of runtime filter. If filter is local, 
wrapper may be shared
+    // by producer and consumer. To avoid read-write conflict, we need a 
rwlock to ensure operations
+    // on state is thread-safe.
+    std::atomic<State> _state;
     std::string _reason;
+    std::shared_mutex _rwlock;
 };
 #include "common/compile_check_end.h"
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to