This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5d450f5ae90 [Chore](runtime-filter) add lock for all runtime filter producer/consumer's public method (#50110) 5d450f5ae90 is described below commit 5d450f5ae90cefc42acf31e5defa65028eecbb11 Author: Pxl <x...@selectdb.com> AuthorDate: Sat Apr 19 08:11:26 2025 +0800 [Chore](runtime-filter) add lock for all runtime filter producer/consumer's public method (#50110) --- be/src/runtime_filter/runtime_filter.h | 6 +- be/src/runtime_filter/runtime_filter_consumer.cpp | 5 +- be/src/runtime_filter/runtime_filter_consumer.h | 7 +- be/src/runtime_filter/runtime_filter_merger.h | 2 +- be/src/runtime_filter/runtime_filter_producer.cpp | 6 +- be/src/runtime_filter/runtime_filter_producer.h | 30 ++++---- .../runtime_filter_producer_helper_set_test.cpp | 81 ++++++++++++++++++++++ 7 files changed, 114 insertions(+), 23 deletions(-) diff --git a/be/src/runtime_filter/runtime_filter.h b/be/src/runtime_filter/runtime_filter.h index e5c833d8f67..8b42576c7b4 100644 --- a/be/src/runtime_filter/runtime_filter.h +++ b/be/src/runtime_filter/runtime_filter.h @@ -46,11 +46,13 @@ public: template <class T> Status assign(const T& request, butil::IOBufAsZeroCopyInputStream* data) { + std::unique_lock<std::recursive_mutex> l(_rmtx); return _wrapper->assign(request, data); } template <class T> Status serialize(T* request, void** data, int* len) { + std::unique_lock<std::recursive_mutex> l(_rmtx); auto real_runtime_filter_type = _wrapper->get_real_type(); request->set_filter_type(get_type(real_runtime_filter_type)); @@ -81,7 +83,7 @@ public: return Status::OK(); } - virtual std::string debug_string() const = 0; + virtual std::string debug_string() = 0; protected: RuntimeFilter(const TRuntimeFilterDesc* desc) @@ -118,6 +120,8 @@ protected: friend class RuntimeFilterProducer; friend class RuntimeFilterConsumer; friend class RuntimeFilterMerger; + + std::recursive_mutex _rmtx; // lock all member function of runtime filter producer/consumer }; #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp b/be/src/runtime_filter/runtime_filter_consumer.cpp index 4b1842c8170..acc25ac46cc 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.cpp +++ b/be/src/runtime_filter/runtime_filter_consumer.cpp @@ -45,6 +45,7 @@ Status RuntimeFilterConsumer::_apply_ready_expr( } Status RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs) { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (_rf_state == State::READY) { RETURN_IF_ERROR(_apply_ready_expr(push_exprs)); } @@ -55,6 +56,7 @@ Status RuntimeFilterConsumer::acquire_expr(std::vector<vectorized::VRuntimeFilte } void RuntimeFilterConsumer::signal(RuntimeFilter* other) { + std::unique_lock<std::recursive_mutex> l(_rmtx); COUNTER_SET(_wait_timer, int64_t((MonotonicMillis() - _registration_time) * NANOS_PER_MILLIS)); _set_state(State::READY, other->_wrapper); if (!_filter_timer.empty()) { @@ -66,6 +68,7 @@ void RuntimeFilterConsumer::signal(RuntimeFilter* other) { std::shared_ptr<pipeline::RuntimeFilterTimer> RuntimeFilterConsumer::create_filter_timer( std::shared_ptr<pipeline::Dependency> dependencies) { + std::unique_lock<std::recursive_mutex> l(_rmtx); auto timer = std::make_shared<pipeline::RuntimeFilterTimer>(_registration_time, _rf_wait_time_ms, dependencies); _filter_timer.push_back(timer); @@ -211,13 +214,13 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi } void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile* parent_operator_profile) { + std::unique_lock<std::recursive_mutex> l(_rmtx); DCHECK(parent_operator_profile != nullptr); int filter_id = -1; { // since debug_string will read from RuntimeFilter::_wrapper // and it is a shared_ptr, instead of a atomic_shared_ptr // so it is not thread safe - std::unique_lock<std::mutex> l(_mtx); filter_id = _wrapper->filter_id(); parent_operator_profile->add_description(fmt::format("RF{} Info", filter_id), debug_string(), "RuntimeFilterInfo"); diff --git a/be/src/runtime_filter/runtime_filter_consumer.h b/be/src/runtime_filter/runtime_filter_consumer.h index 3fb72ef8881..e0e42e509d4 100644 --- a/be/src/runtime_filter/runtime_filter_consumer.h +++ b/be/src/runtime_filter/runtime_filter_consumer.h @@ -58,7 +58,8 @@ public: // Called after `State` is ready (e.g. signaled) Status acquire_expr(std::vector<vectorized::VRuntimeFilterPtr>& push_exprs); - std::string debug_string() const override { + std::string debug_string() override { + std::unique_lock<std::recursive_mutex> l(_rmtx); return fmt::format("Consumer: ({}, state: {}, reached_timeout: {}, timeout_limit: {}ms)", _debug_string(), to_string(_rf_state), _reached_timeout ? "true" : "false", std::to_string(_rf_wait_time_ms)); @@ -112,7 +113,6 @@ private: } void _set_state(State rf_state, std::shared_ptr<RuntimeFilterWrapper> other = nullptr) { - std::unique_lock<std::mutex> l(_mtx); if (rf_state == State::TIMEOUT) { DorisMetrics::instance()->runtime_filter_consumer_timeout_num->increment(1); _reached_timeout = true; @@ -154,9 +154,6 @@ private: const int64_t _registration_time; std::atomic<State> _rf_state; - // only used to lock _set_state() to make _wrapper and _rf_state is protected - // signal and acquire_expr may called in different threads at the same time - std::mutex _mtx; bool _reached_timeout = false; diff --git a/be/src/runtime_filter/runtime_filter_merger.h b/be/src/runtime_filter/runtime_filter_merger.h index 63dca0a39ae..bfce64e204a 100644 --- a/be/src/runtime_filter/runtime_filter_merger.h +++ b/be/src/runtime_filter/runtime_filter_merger.h @@ -46,7 +46,7 @@ public: return Status::OK(); } - std::string debug_string() const override { + std::string debug_string() override { return fmt::format( "Merger: ({}, expected_producer_num: {}, received_producer_num: {}, " "received_rf_size_num: {}, received_sum_size: {})", diff --git a/be/src/runtime_filter/runtime_filter_producer.cpp b/be/src/runtime_filter/runtime_filter_producer.cpp index 788aea3b5c0..85d55f9f5c7 100644 --- a/be/src/runtime_filter/runtime_filter_producer.cpp +++ b/be/src/runtime_filter/runtime_filter_producer.cpp @@ -46,6 +46,7 @@ Status RuntimeFilterProducer::_send_to_local_targets(RuntimeState* state, Runtim }; Status RuntimeFilterProducer::publish(RuntimeState* state, bool build_hash_table) { + std::unique_lock<std::recursive_mutex> l(_rmtx); _check_state({State::READY_TO_PUBLISH}); auto do_merge = [&]() { @@ -141,6 +142,7 @@ public: void RuntimeFilterProducer::latch_dependency( const std::shared_ptr<pipeline::CountedFinishDependency>& dependency) { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (_rf_state != State::WAITING_FOR_SEND_SIZE) { _check_state({State::WAITING_FOR_DATA}); return; @@ -151,6 +153,7 @@ void RuntimeFilterProducer::latch_dependency( } Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filter_size) { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (_rf_state != State::WAITING_FOR_SEND_SIZE) { _check_state({State::WAITING_FOR_DATA}); return Status::OK(); @@ -166,7 +169,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt LocalMergeContext* merger_context = nullptr; RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_local_merge_producer_filters( _wrapper->filter_id(), &merger_context)); - std::lock_guard l(merger_context->mtx); + std::lock_guard merger_lock(merger_context->mtx); if (merger_context->merger->add_rf_size(local_filter_size)) { if (!_has_remote_target) { for (auto filter : merger_context->producers) { @@ -230,6 +233,7 @@ Status RuntimeFilterProducer::send_size(RuntimeState* state, uint64_t local_filt } void RuntimeFilterProducer::set_synced_size(uint64_t global_size) { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (!set_state(State::WAITING_FOR_DATA)) { _check_wrapper_state({RuntimeFilterWrapper::State::DISABLED}); } diff --git a/be/src/runtime_filter/runtime_filter_producer.h b/be/src/runtime_filter/runtime_filter_producer.h index 620262f6051..ea013625462 100644 --- a/be/src/runtime_filter/runtime_filter_producer.h +++ b/be/src/runtime_filter/runtime_filter_producer.h @@ -61,6 +61,7 @@ public: // insert data to build filter Status insert(vectorized::ColumnPtr column, size_t start) { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (!_wrapper->is_valid() || _rf_state == State::READY_TO_PUBLISH || _rf_state == State::PUBLISHED) { return Status::OK(); @@ -71,7 +72,8 @@ public: Status publish(RuntimeState* state, bool build_hash_table); - std::string debug_string() const override { + std::string debug_string() override { + std::unique_lock<std::recursive_mutex> l(_rmtx); auto result = fmt::format("Producer: ({}, state: {}", _debug_string(), to_string(_rf_state)); if (_need_sync_filter_size) { @@ -85,6 +87,7 @@ public: void set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State state, std::string reason = "") { + std::unique_lock<std::recursive_mutex> l(_rmtx); if (_rf_state == State::PUBLISHED || _rf_state == State::READY_TO_PUBLISH) { return; } @@ -110,7 +113,7 @@ public: } bool set_state(State state) { - std::unique_lock<std::mutex> l(_mtx); + std::unique_lock<std::recursive_mutex> l(_rmtx); if (_rf_state == State::PUBLISHED || (state != State::PUBLISHED && _rf_state == State::READY_TO_PUBLISH)) { return false; @@ -119,10 +122,17 @@ public: return true; } - std::shared_ptr<RuntimeFilterWrapper> wrapper() const { return _wrapper; } - void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { _wrapper = wrapper; } + std::shared_ptr<RuntimeFilterWrapper> wrapper() { + std::unique_lock<std::recursive_mutex> l(_rmtx); + return _wrapper; + } + void set_wrapper(std::shared_ptr<RuntimeFilterWrapper> wrapper) { + std::unique_lock<std::recursive_mutex> l(_rmtx); + _wrapper = wrapper; + } void collect_realtime_profile(RuntimeProfile* parent_operator_profile) { + std::unique_lock<std::recursive_mutex> l(_rmtx); DCHECK(parent_operator_profile != nullptr); if (parent_operator_profile == nullptr) { return; @@ -131,12 +141,8 @@ public: RuntimeFilterInfo: - RF0 Info: xxxx */ - { - std::unique_lock<std::mutex> l(_mtx); - parent_operator_profile->add_description( - fmt::format("RF{} Info", _wrapper->filter_id()), debug_string(), - "RuntimeFilterInfo"); - } + parent_operator_profile->add_description(fmt::format("RF{} Info", _wrapper->filter_id()), + debug_string(), "RuntimeFilterInfo"); } private: @@ -168,10 +174,6 @@ private: std::shared_ptr<pipeline::CountedFinishDependency> _dependency; std::atomic<State> _rf_state; - - // only used to lock set_state() to make _rf_state is protected - // set_synced_size and RuntimeFilterProducerHelper::terminate may called in different threads at the same time - std::mutex _mtx; }; #include "common/compile_check_end.h" } // namespace doris diff --git a/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp new file mode 100644 index 00000000000..a3bf632c13f --- /dev/null +++ b/be/test/runtime_filter/runtime_filter_producer_helper_set_test.cpp @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime_filter/runtime_filter_producer_helper_set.h" + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "common/object_pool.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/mock_operator.h" +#include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime_filter/runtime_filter_test_utils.h" +#include "vec/columns/columns_number.h" +#include "vec/data_types/data_type_number.h" +#include "vec/exprs/vslot_ref.h" + +namespace doris { + +class RuntimeFilterProducerHelperSetTest : public RuntimeFilterTest { + void SetUp() override { + RuntimeFilterTest::SetUp(); + _pipeline = std::make_shared<pipeline::Pipeline>(0, INSTANCE_NUM, INSTANCE_NUM); + _op.reset(new pipeline::MockOperatorX()); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->add_operator(_op, 2)); + + _sink.reset(new pipeline::HashJoinBuildSinkOperatorX( + &_pool, 0, _op->operator_id(), + TPlanNodeBuilder(0, TPlanNodeType::HASH_JOIN_NODE).build(), _tbl)); + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_pipeline->set_sink(_sink)); + + _task.reset(new pipeline::PipelineTask(_pipeline, 0, _runtime_states[0].get(), nullptr, + &_profile, {}, 0)); + } + + pipeline::OperatorPtr _op; + pipeline::DataSinkOperatorPtr _sink; + pipeline::PipelinePtr _pipeline; + std::shared_ptr<pipeline::PipelineTask> _task; + ObjectPool _pool; +}; + +TEST_F(RuntimeFilterProducerHelperSetTest, basic) { + auto helper = RuntimeFilterProducerHelperSet(); + + vectorized::VExprContextSPtr ctx; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(vectorized::VExpr::create_expr_tree( + TRuntimeFilterDescBuilder::get_default_expr(), ctx)); + ctx->_last_result_column_id = 0; + + vectorized::VExprContextSPtrs build_expr_ctxs = {ctx}; + std::vector<TRuntimeFilterDesc> runtime_filter_descs = {TRuntimeFilterDescBuilder().build()}; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION( + helper.init(_runtime_states[0].get(), build_expr_ctxs, runtime_filter_descs)); + + vectorized::Block block; + auto column = vectorized::ColumnInt32::create(); + column->insert(1); + column->insert(2); + block.insert({std::move(column), std::make_shared<vectorized::DataTypeInt32>(), "col1"}); + + std::map<int, std::shared_ptr<RuntimeFilterWrapper>> runtime_filters; + FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.process(_runtime_states[0].get(), &block, 2)); +} + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org