This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d450f5ae90 [Chore](runtime-filter) add lock for all runtime filter 
producer/consumer's public method (#50110)
5d450f5ae90 is described below

commit 5d450f5ae90cefc42acf31e5defa65028eecbb11
Author: Pxl <x...@selectdb.com>
AuthorDate: Sat Apr 19 08:11:26 2025 +0800

    [Chore](runtime-filter) add lock for all runtime filter producer/consumer's 
public method (#50110)
---
 be/src/runtime_filter/runtime_filter.h             |  6 +-
 be/src/runtime_filter/runtime_filter_consumer.cpp  |  5 +-
 be/src/runtime_filter/runtime_filter_consumer.h    |  7 +-
 be/src/runtime_filter/runtime_filter_merger.h      |  2 +-
 be/src/runtime_filter/runtime_filter_producer.cpp  |  6 +-
 be/src/runtime_filter/runtime_filter_producer.h    | 30 ++++----
 .../runtime_filter_producer_helper_set_test.cpp    | 81 ++++++++++++++++++++++
 7 files changed, 114 insertions(+), 23 deletions(-)

diff --git a/be/src/runtime_filter/runtime_filter.h 
b/be/src/runtime_filter/runtime_filter.h
index e5c833d8f67..8b42576c7b4 100644
--- a/be/src/runtime_filter/runtime_filter.h
+++ b/be/src/runtime_filter/runtime_filter.h
@@ -46,11 +46,13 @@ public:
 
     template <class T>
     Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         return _wrapper->assign(request, data);
     }
 
     template <class T>
     Status serialize(T* request, void** data, int* len) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         auto real_runtime_filter_type = _wrapper->get_real_type();
 
         request->set_filter_type(get_type(real_runtime_filter_type));
@@ -81,7 +83,7 @@ public:
         return Status::OK();
     }
 
-    virtual std::string debug_string() const = 0;
+    virtual std::string debug_string() = 0;
 
 protected:
     RuntimeFilter(const TRuntimeFilterDesc* desc)
@@ -118,6 +120,8 @@ protected:
     friend class RuntimeFilterProducer;
     friend class RuntimeFilterConsumer;
     friend class RuntimeFilterMerger;
+
+    std::recursive_mutex _rmtx; // lock all member function of runtime filter 
producer/consumer
 };
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp 
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index 4b1842c8170..acc25ac46cc 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -45,6 +45,7 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
 }
 
 Status 
RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     if (_rf_state == State::READY) {
         RETURN_IF_ERROR(_apply_ready_expr(push_exprs));
     }
@@ -55,6 +56,7 @@ Status 
RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilte
 }
 
 void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) 
* NANOS_PER_MILLIS));
     _set_state(State::READY, other->_wrapper);
     if (!_filter_timer.empty()) {
@@ -66,6 +68,7 @@ void RuntimeFilterConsumer::signal(RuntimeFilter* other) {
 
 std::shared_ptr<pipeline::RuntimeFilterTimer> 
RuntimeFilterConsumer::create_filter_timer(
         std::shared_ptr<pipeline::Dependency> dependencies) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     auto timer = 
std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time,
                                                                 
_rf_wait_time_ms, dependencies);
     _filter_timer.push_back(timer);
@@ -211,13 +214,13 @@ Status 
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
 }
 
 void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* 
parent_operator_profile) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     DCHECK(parent_operator_profile != nullptr);
     int filter_id = -1;
     {
         // since debug_string will read from  RuntimeFilter::_wrapper
         // and it is a shared_ptr, instead of a atomic_shared_ptr
         // so it is not thread safe
-        std::unique_lock<std::mutex> l(_mtx);
         filter_id = _wrapper->filter_id();
         parent_operator_profile->add_description(fmt::format("RF{} Info", 
filter_id),
                                                  debug_string(), 
"RuntimeFilterInfo");
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index 3fb72ef8881..e0e42e509d4 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -58,7 +58,8 @@ public:
     // Called after `State` is ready (e.g. signaled)
     Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs);
 
-    std::string debug_string() const override {
+    std::string debug_string() override {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         return fmt::format("Consumer: ({}, state: {}, reached_timeout: {}, 
timeout_limit: {}ms)",
                            _debug_string(), to_string(_rf_state),
                            _reached_timeout ? "true" : "false", 
std::to_string(_rf_wait_time_ms));
@@ -112,7 +113,6 @@ private:
     }
 
     void _set_state(State rf_state, std::shared_ptr<RuntimeFilterWrapper> 
other = nullptr) {
-        std::unique_lock<std::mutex> l(_mtx);
         if (rf_state == State::TIMEOUT) {
             
DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1);
             _reached_timeout = true;
@@ -154,9 +154,6 @@ private:
     const int64_t _registration_time;
 
     std::atomic<State> _rf_state;
-    // only used to lock _set_state() to make _wrapper and _rf_state is 
protected
-    // signal and acquire_expr may called in different threads at the same time
-    std::mutex _mtx;
 
     bool _reached_timeout = false;
 
diff --git a/be/src/runtime_filter/runtime_filter_merger.h 
b/be/src/runtime_filter/runtime_filter_merger.h
index 63dca0a39ae..bfce64e204a 100644
--- a/be/src/runtime_filter/runtime_filter_merger.h
+++ b/be/src/runtime_filter/runtime_filter_merger.h
@@ -46,7 +46,7 @@ public:
         return Status::OK();
     }
 
-    std::string debug_string() const override {
+    std::string debug_string() override {
         return fmt::format(
                 "Merger: ({}, expected_producer_num: {}, 
received_producer_num: {}, "
                 "received_rf_size_num: {}, received_sum_size: {})",
diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp 
b/be/src/runtime_filter/runtime_filter_producer.cpp
index 788aea3b5c0..85d55f9f5c7 100644
--- a/be/src/runtime_filter/runtime_filter_producer.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer.cpp
@@ -46,6 +46,7 @@ Status 
RuntimeFilterProducer::_send_to_local_targets(RuntimeState* state, Runtim
 };
 
 Status RuntimeFilterProducer::publish(RuntimeState* state, bool 
build_hash_table) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     _check_state({State::READY_TO_PUBLISH});
 
     auto do_merge = [&]() {
@@ -141,6 +142,7 @@ public:
 
 void RuntimeFilterProducer::latch_dependency(
         const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
         _check_state({State::WAITING_FOR_DATA});
         return;
@@ -151,6 +153,7 @@ void RuntimeFilterProducer::latch_dependency(
 }
 
 Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t 
local_filter_size) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     if (_rf_state != State::WAITING_FOR_SEND_SIZE) {
         _check_state({State::WAITING_FOR_DATA});
         return Status::OK();
@@ -166,7 +169,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
         LocalMergeContext* merger_context = nullptr;
         
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
                 _wrapper->filter_id(), &merger_context));
-        std::lock_guard l(merger_context->mtx);
+        std::lock_guard merger_lock(merger_context->mtx);
         if (merger_context->merger->add_rf_size(local_filter_size)) {
             if (!_has_remote_target) {
                 for (auto filter : merger_context->producers) {
@@ -230,6 +233,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
 }
 
 void RuntimeFilterProducer::set_synced_size(uint64_t global_size) {
+    std::unique_lock<std::recursive_mutex> l(_rmtx);
     if (!set_state(State::WAITING_FOR_DATA)) {
         _check_wrapper_state({RuntimeFilterWrapper::State::DISABLED});
     }
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 620262f6051..ea013625462 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -61,6 +61,7 @@ public:
 
     // insert data to build filter
     Status insert(vectorized::ColumnPtr column, size_t start) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         if (!_wrapper->is_valid() || _rf_state == State::READY_TO_PUBLISH ||
             _rf_state == State::PUBLISHED) {
             return Status::OK();
@@ -71,7 +72,8 @@ public:
 
     Status publish(RuntimeState* state, bool build_hash_table);
 
-    std::string debug_string() const override {
+    std::string debug_string() override {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         auto result =
                 fmt::format("Producer: ({}, state: {}", _debug_string(), 
to_string(_rf_state));
         if (_need_sync_filter_size) {
@@ -85,6 +87,7 @@ public:
 
     void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State 
state,
                                                 std::string reason = "") {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         if (_rf_state == State::PUBLISHED || _rf_state == 
State::READY_TO_PUBLISH) {
             return;
         }
@@ -110,7 +113,7 @@ public:
     }
 
     bool set_state(State state) {
-        std::unique_lock<std::mutex> l(_mtx);
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         if (_rf_state == State::PUBLISHED ||
             (state != State::PUBLISHED && _rf_state == 
State::READY_TO_PUBLISH)) {
             return false;
@@ -119,10 +122,17 @@ public:
         return true;
     }
 
-    std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; }
-    void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper 
= wrapper; }
+    std::shared_ptr<RuntimeFilterWrapper> wrapper() {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        return _wrapper;
+    }
+    void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        _wrapper = wrapper;
+    }
 
     void collect_realtime_profile(RuntimeProfile* parent_operator_profile) {
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
         DCHECK(parent_operator_profile != nullptr);
         if (parent_operator_profile == nullptr) {
             return;
@@ -131,12 +141,8 @@ public:
         RuntimeFilterInfo:
             - RF0 Info: xxxx
         */
-        {
-            std::unique_lock<std::mutex> l(_mtx);
-            parent_operator_profile->add_description(
-                    fmt::format("RF{} Info", _wrapper->filter_id()), 
debug_string(),
-                    "RuntimeFilterInfo");
-        }
+        parent_operator_profile->add_description(fmt::format("RF{} Info", 
_wrapper->filter_id()),
+                                                 debug_string(), 
"RuntimeFilterInfo");
     }
 
 private:
@@ -168,10 +174,6 @@ private:
     std::shared_ptr<pipeline::CountedFinishDependency> _dependency;
 
     std::atomic<State> _rf_state;
-
-    // only used to lock set_state() to make _rf_state is protected
-    // set_synced_size and RuntimeFilterProducerHelper::terminate may called 
in different threads at the same time
-    std::mutex _mtx;
 };
 #include "common/compile_check_end.h"
 } // namespace doris
diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp 
b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp
new file mode 100644
index 00000000000..a3bf632c13f
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_producer_helper_set.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "common/object_pool.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/mock_operator.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime_filter/runtime_filter_test_utils.h"
+#include "vec/columns/columns_number.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/exprs/vslot_ref.h"
+
+namespace doris {
+
+class RuntimeFilterProducerHelperSetTest : public RuntimeFilterTest {
+    void SetUp() override {
+        RuntimeFilterTest::SetUp();
+        _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, 
INSTANCE_NUM);
+        _op.reset(new pipeline::MockOperatorX());
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2));
+
+        _sink.reset(new pipeline::HashJoinBuildSinkOperatorX(
+                &_pool, 0, _op->operator_id(),
+                TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), 
_tbl));
+        FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink));
+
+        _task.reset(new pipeline::PipelineTask(_pipeline, 0, 
_runtime_states[0].get(), nullptr,
+                                               &_profile, {}, 0));
+    }
+
+    pipeline::OperatorPtr _op;
+    pipeline::DataSinkOperatorPtr _sink;
+    pipeline::PipelinePtr _pipeline;
+    std::shared_ptr<pipeline::PipelineTask> _task;
+    ObjectPool _pool;
+};
+
+TEST_F(RuntimeFilterProducerHelperSetTest, basic) {
+    auto helper = RuntimeFilterProducerHelperSet();
+
+    vectorized::VExprContextSPtr ctx;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree(
+            TRuntimeFilterDescBuilder::get_default_expr(), ctx));
+    ctx->_last_result_column_id = 0;
+
+    vectorized::VExprContextSPtrs build_expr_ctxs = {ctx};
+    std::vector<TRuntimeFilterDesc> runtime_filter_descs = 
{TRuntimeFilterDescBuilder().build()};
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+            helper.init(_runtime_states[0].get(), build_expr_ctxs, 
runtime_filter_descs));
+
+    vectorized::Block block;
+    auto column = vectorized::ColumnInt32::create();
+    column->insert(1);
+    column->insert(2);
+    block.insert({std::move(column), 
std::make_shared<vectorized::DataTypeInt32>(), "col1"});
+
+    std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters;
+    FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.process(_runtime_states[0].get(), 
&block, 2));
+}
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to