This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e010fa8d4f [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593) e010fa8d4f is described below commit e010fa8d4ff17e0800c9b71fd0b1701b7d205afb Author: Pxl <pxl...@qq.com> AuthorDate: Tue Jun 13 11:20:49 2023 +0800 [Chore](runtime filter) remove runtime filter ready_for_publish/publish_finally (#20593) --- be/src/common/config.cpp | 4 --- be/src/common/config.h | 4 --- be/src/exprs/runtime_filter.cpp | 29 ------------------- be/src/exprs/runtime_filter.h | 15 +++++----- be/src/exprs/runtime_filter_rpc.cpp | 35 +++++++++++++---------- be/src/exprs/runtime_filter_slots.h | 37 ++++++++++++++----------- be/src/exprs/runtime_filter_slots_cross.h | 10 +++---- be/src/pipeline/exec/hashjoin_build_sink.h | 1 + be/src/runtime/runtime_filter_mgr.cpp | 8 +++--- be/src/vec/exec/join/vhash_join_node.cpp | 17 +++++++----- be/src/vec/exec/join/vhash_join_node.h | 9 +++++- be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +- 12 files changed, 77 insertions(+), 94 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f23bbffe99..f3ea211a8e 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -732,10 +732,6 @@ DEFINE_mInt32(mem_tracker_consume_min_size_bytes, "1048576"); // In most cases, it does not need to be modified. DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, "0.1"); -// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method -// else we will call sync method -DEFINE_mBool(runtime_filter_use_async_rpc, "true"); - // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/common/config.h b/be/src/common/config.h index 7b61e13c92..4f6a608929 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -768,10 +768,6 @@ DECLARE_mInt32(mem_tracker_consume_min_size_bytes); // In most cases, it does not need to be modified. DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio); -// if set runtime_filter_use_async_rpc true, publish runtime filter will be a async method -// else we will call sync method -DECLARE_mBool(runtime_filter_use_async_rpc); - // max send batch parallelism for OlapTableSink // The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job, // if exceed, the value of send_batch_parallelism would be max_send_batch_parallelism_per_job diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index ef8543e136..1a4b183e5d 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1019,26 +1019,6 @@ public: PrimitiveType column_type() { return _column_return_type; } - void ready_for_publish() { - if (_filter_type == RuntimeFilterType::MINMAX_FILTER) { - switch (_column_return_type) { - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - StringRef* min_value = static_cast<StringRef*>(_context.minmax_func->get_min()); - StringRef* max_value = static_cast<StringRef*>(_context.minmax_func->get_max()); - auto min_val_ptr = _pool->add(new std::string(min_value->data)); - auto max_val_ptr = _pool->add(new std::string(max_value->data)); - StringRef min_val(min_val_ptr->c_str(), min_val_ptr->length()); - StringRef max_val(max_val_ptr->c_str(), max_val_ptr->length()); - _context.minmax_func->assign(&min_val, &max_val); - } - default: - break; - } - } - } - bool is_bloomfilter() const { return _is_bloomfilter; } bool is_ignored_in_filter() const { return _is_ignored_in_filter; } @@ -1164,11 +1144,6 @@ Status IRuntimeFilter::publish() { } } -void IRuntimeFilter::publish_finally() { - DCHECK(is_producer()); - join_rpc(); -} - Status IRuntimeFilter::get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs) { DCHECK(is_consumer()); if (!_is_ignored) { @@ -1531,10 +1506,6 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { } } -void IRuntimeFilter::ready_for_publish() { - _wrapper->ready_for_publish(); -} - Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { if (!_is_ignored && wrapper->is_ignored_in_filter()) { set_ignored(); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 8389e8416b..6ff64a9af3 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -217,8 +217,6 @@ public: // push filter to remote node or push down it to scan_node Status publish(); - void publish_finally(); - RuntimeFilterType type() const { return _runtime_filter_type; } Status get_push_expr_ctxs(std::vector<vectorized::VExprSPtr>* push_exprs); @@ -293,9 +291,12 @@ public: // consumer should call before released Status consumer_close(); + bool is_finish_rpc(); + + Status join_rpc(); + // async push runtimefilter to remote node Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf); - Status join_rpc(); void init_profile(RuntimeProfile* parent_profile); @@ -303,8 +304,6 @@ public: void update_runtime_filter_type_to_profile(); - void ready_for_publish(); - static bool enable_use_batch(bool use_batch, PrimitiveType type) { return use_batch && (is_int_or_bool(type) || is_float_or_double(type)); } @@ -389,9 +388,9 @@ protected: std::vector<doris::vectorized::VExprSPtr> _push_down_vexprs; - struct rpc_context; + struct RPCContext; - std::shared_ptr<rpc_context> _rpc_context; + std::shared_ptr<RPCContext> _rpc_context; // parent profile // only effect on consumer @@ -403,7 +402,7 @@ protected: const bool _enable_pipeline_exec; bool _profile_init = false; - doris::Mutex _profile_mutex; + std::mutex _profile_mutex; std::string _name; bool _opt_remote_rf; }; diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index 829224f3f2..4d613e3a57 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -38,11 +38,14 @@ namespace doris { -struct IRuntimeFilter::rpc_context { +struct IRuntimeFilter::RPCContext { PMergeFilterRequest request; PMergeFilterResponse response; brpc::Controller cntl; brpc::CallId cid; + bool is_finished = false; + + static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; } }; Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr, @@ -54,10 +57,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress if (!stub) { std::string msg = fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port); - LOG(WARNING) << msg; return Status::InternalError(msg); } - _rpc_context = std::make_shared<IRuntimeFilter::rpc_context>(); + _rpc_context = std::make_shared<IRuntimeFilter::RPCContext>(); void* data = nullptr; int len = 0; @@ -72,7 +74,7 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress _rpc_context->request.set_filter_id(_filter_id); _rpc_context->request.set_opt_remote_rf(opt_remote_rf); _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec()); - _rpc_context->cntl.set_timeout_ms(1000); + _rpc_context->cntl.set_timeout_ms(state->runtime_filter_wait_time_ms()); _rpc_context->cid = _rpc_context->cntl.call_id(); Status serialize_status = serialize(&_rpc_context->request, &data, &len); @@ -83,14 +85,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress DCHECK(data != nullptr); _rpc_context->cntl.request_attachment().append(data, len); } - if (config::runtime_filter_use_async_rpc) { - stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response, - brpc::DoNothing()); - } else { - stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response, - nullptr); - _rpc_context.reset(); - } + + stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response, + brpc::NewCallback(RPCContext::finish, _rpc_context)); } else { // we should reset context @@ -99,15 +96,25 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress return serialize_status; } +bool IRuntimeFilter::is_finish_rpc() { + if (_rpc_context == nullptr) { + return true; + } + return _rpc_context->is_finished; +} + Status IRuntimeFilter::join_rpc() { - DCHECK(is_producer()); + if (!is_producer()) { + return Status::InternalError("RuntimeFilter::join_rpc only called when rf is producer."); + } if (_rpc_context != nullptr) { brpc::Join(_rpc_context->cid); if (_rpc_context->cntl.Failed()) { - LOG(WARNING) << "runtimefilter rpc err:" << _rpc_context->cntl.ErrorText(); // reset stub cache ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( _rpc_context->cntl.remote_side()); + return Status::InternalError("RuntimeFilter::join_rpc meet rpc error, msg={}.", + _rpc_context->cntl.ErrorText()); } } return Status::OK(); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index b1f4b125b4..d6a5866cae 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -17,6 +17,7 @@ #pragma once +#include "common/status.h" #include "exprs/runtime_filter.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" @@ -60,8 +61,8 @@ public: auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, std::string& msg) { runtime_filter->set_ignored(); runtime_filter->set_ignored_msg(msg); - runtime_filter->publish(); - runtime_filter->publish_finally(); + RETURN_IF_ERROR(runtime_filter->publish()); + return Status::OK(); }; // ordered vector: IN, IN_OR_BLOOM, others. @@ -142,9 +143,9 @@ public: "in_num({}) >= max_in_num({})", print_id(state->fragment_instance_id()), filter_desc.filter_id, hash_table_size, max_in_num); - ignore_remote_filter(runtime_filter, msg); + RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg)); #else - ignore_remote_filter(runtime_filter, "ignored"); + RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, "ignored")); #endif continue; } @@ -196,29 +197,33 @@ public: } } - // should call this method after insert - void ready_for_publish() { + bool ready_finish_publish() { for (auto& pair : _runtime_filters) { for (auto filter : pair.second) { - filter->ready_for_publish(); + if (!filter->is_finish_rpc()) { + return false; + } } } + return true; } - // publish runtime filter - void publish() { - for (int i = 0; i < _probe_expr_context.size(); ++i) { - auto iter = _runtime_filters.find(i); - if (iter != _runtime_filters.end()) { - for (auto filter : iter->second) { - filter->publish(); - } + + void finish_publish() { + for (auto& pair : _runtime_filters) { + for (auto filter : pair.second) { + filter->join_rpc(); } } + } + + // publish runtime filter + Status publish() { for (auto& pair : _runtime_filters) { for (auto filter : pair.second) { - filter->publish_finally(); + RETURN_IF_ERROR(filter->publish()); } } + return Status::OK(); } void copy_to_shared_context(vectorized::SharedHashTableContextPtr& context) { diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 1e8c15e713..184ba46591 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -19,6 +19,7 @@ #include <vector> +#include "common/status.h" #include "exprs/runtime_filter.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" @@ -88,14 +89,11 @@ public: return Status::OK(); } - void publish() { + Status publish() { for (auto& filter : _runtime_filters) { - filter->publish(); - } - for (auto& filter : _runtime_filters) { - // todo: cross join may not need publish_finally() - filter->publish_finally(); + RETURN_IF_ERROR(filter->publish()); } + return Status::OK(); } bool empty() { return !_runtime_filters.size(); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index dfa077b83f..c1e67512dd 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -39,6 +39,7 @@ class HashJoinBuildSink final : public StreamingOperator<HashJoinBuildSinkBuilde public: HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node); bool can_write() override { return _node->can_sink_write(); } + bool is_pending_finish() const override { return !_node->ready_for_finish(); } }; } // namespace pipeline diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index b6dfa0a625..a566018c11 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -43,7 +43,7 @@ namespace doris { template <class RPCRequest, class RPCResponse> -struct async_rpc_context { +struct AsyncRPCContext { RPCRequest request; RPCResponse response; brpc::Controller cntl; @@ -55,7 +55,7 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx) : _query_ctx(query_ctx) {} -RuntimeFilterMgr::~RuntimeFilterMgr() {} +RuntimeFilterMgr::~RuntimeFilterMgr() = default; Status RuntimeFilterMgr::init() { _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr", @@ -315,7 +315,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ // 2. FE has been upgraded (e.g. cntVal->targetv2_info.size() > 0) // 3. This filter is bloom filter (only bloom filter should be used for merging) using PPublishFilterRpcContext = - async_rpc_context<PPublishFilterRequestV2, PPublishFilterResponse>; + AsyncRPCContext<PPublishFilterRequestV2, PPublishFilterResponse>; std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts; rpc_contexts.reserve(cntVal->targetv2_info.size()); @@ -380,7 +380,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ } else { // prepare rpc context using PPublishFilterRpcContext = - async_rpc_context<PPublishFilterRequest, PPublishFilterResponse>; + AsyncRPCContext<PPublishFilterRequest, PPublishFilterResponse>; std::vector<std::unique_ptr<PPublishFilterRpcContext>> rpc_contexts; rpc_contexts.reserve(cntVal->target_info.size()); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 822b99db58..9b410f56ae 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -285,9 +285,9 @@ struct ProcessRuntimeFilterBuild { if (_join_node->_runtime_filter_descs.empty()) { return Status::OK(); } - _join_node->_runtime_filter_slots = _join_node->_pool->add( - new VRuntimeFilterSlots(_join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs, - _join_node->_runtime_filter_descs)); + _join_node->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( + _join_node->_probe_expr_ctxs, _join_node->_build_expr_ctxs, + _join_node->_runtime_filter_descs); RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init( state, hash_table_ctx.hash_table.get_size(), _join_node->_build_bf_cardinality)); @@ -300,7 +300,7 @@ struct ProcessRuntimeFilterBuild { } { SCOPED_TIMER(_join_node->_push_down_timer); - _join_node->_runtime_filter_slots->publish(); + RETURN_IF_ERROR(_join_node->_runtime_filter_slots->publish()); } return Status::OK(); @@ -917,15 +917,15 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc if (_runtime_filter_descs.empty()) { return Status::OK(); } - _runtime_filter_slots = _pool->add(new VRuntimeFilterSlots( + _runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( _probe_expr_ctxs, _build_expr_ctxs, - _runtime_filter_descs)); + _runtime_filter_descs); RETURN_IF_ERROR(_runtime_filter_slots->init( state, arg.hash_table.get_size(), 0)); RETURN_IF_ERROR(_runtime_filter_slots->copy_from_shared_context( _shared_hash_table_context)); - _runtime_filter_slots->publish(); + RETURN_IF_ERROR(_runtime_filter_slots->publish()); return Status::OK(); }}, *_hash_table_variants); @@ -1261,6 +1261,9 @@ HashJoinNode::~HashJoinNode() { // signal at here is abnormal _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor")); } + if (_runtime_filter_slots != nullptr) { + _runtime_filter_slots->finish_publish(); + } } void HashJoinNode::_release_mem() { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 36398ba4ad..4e9cd40e9a 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -252,6 +252,13 @@ public: bool should_build_hash_table() const { return _should_build_hash_table; } + bool ready_for_finish() { + if (_runtime_filter_slots == nullptr) { + return true; + } + return _runtime_filter_slots->ready_finish_publish(); + } + private: void _init_short_circuit_for_probe() override { _short_circuit_for_probe = @@ -341,7 +348,7 @@ private: bool _is_broadcast_join = false; bool _should_build_hash_table = true; std::shared_ptr<SharedHashTableController> _shared_hashtable_controller = nullptr; - VRuntimeFilterSlots* _runtime_filter_slots = nullptr; + std::shared_ptr<VRuntimeFilterSlots> _runtime_filter_slots = nullptr; std::vector<SlotId> _hash_output_slot_ids; std::vector<bool> _left_output_slot_flags; diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 57ad81f120..491a9c6854 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -84,7 +84,7 @@ struct RuntimeFilterBuild { } { SCOPED_TIMER(_join_node->_push_down_timer); - runtime_filter_slots.publish(); + RETURN_IF_ERROR(runtime_filter_slots.publish()); } return Status::OK(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org