This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c8a793ad6ae [Exec](RF) Support merge remote rf local first (#31067) c8a793ad6ae is described below commit c8a793ad6ae68c6ba0584f3ed216f9f6716f7ee7 Author: HappenLee <happen...@hotmail.com> AuthorDate: Wed Feb 21 22:38:33 2024 +0800 [Exec](RF) Support merge remote rf local first (#31067) --- be/src/agent/be_exec_version_manager.h | 1 + be/src/exprs/runtime_filter.cpp | 77 +++++++++--------- be/src/exprs/runtime_filter.h | 24 ++---- be/src/exprs/runtime_filter_slots.h | 11 ++- be/src/pipeline/exec/datagen_operator.cpp | 9 +-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 23 +++--- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../exec/nested_loop_join_build_operator.cpp | 4 +- be/src/pipeline/pipeline_fragment_context.cpp | 5 +- be/src/pipeline/pipeline_fragment_context.h | 2 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 14 ++-- .../pipeline_x/pipeline_x_fragment_context.h | 2 +- be/src/runtime/fragment_mgr.cpp | 5 +- be/src/runtime/query_context.h | 3 + be/src/runtime/runtime_filter_mgr.cpp | 91 ++++++++++++++++++---- be/src/runtime/runtime_filter_mgr.h | 32 ++++---- be/src/runtime/runtime_state.cpp | 36 +++++++-- be/src/runtime/runtime_state.h | 22 ++++-- be/src/vec/exec/join/vhash_join_node.cpp | 6 +- be/src/vec/exec/join/vnested_loop_join_node.cpp | 4 +- be/src/vec/exec/runtime_filter_consumer.cpp | 23 ++---- be/src/vec/exec/runtime_filter_consumer.h | 4 +- be/src/vec/exec/vdata_gen_scan_node.cpp | 9 +-- .../main/java/org/apache/doris/qe/Coordinator.java | 3 +- 24 files changed, 243 insertions(+), 169 deletions(-) diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index f5213c54089..afe738684aa 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -65,6 +65,7 @@ private: * d. unix_timestamp function support timestamp with float for datetimev2, and change nullable mode. * e. change shuffle serialize/deserialize way * f. shrink some function's nullable mode. + * g. do local merge of remote runtime filter */ constexpr inline int BeExecVersionManager::max_be_exec_version = 3; constexpr inline int BeExecVersionManager::min_be_exec_version = 0; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 2ac9ad40c5a..1ec66bf2a87 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -938,11 +938,11 @@ private: Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, - bool build_bf_exactly, bool is_global, int parallel_tasks) { - *res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, parallel_tasks)); + bool build_bf_exactly, bool need_local_merge) { + *res = pool->add(new IRuntimeFilter(state, pool, desc, need_local_merge)); (*res)->set_role(role); return (*res)->init_with_desc(desc, query_options, node_id, - is_global ? false : build_bf_exactly); + need_local_merge ? false : build_bf_exactly); } vectorized::SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() { @@ -954,47 +954,53 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta _wrapper->insert_batch(column, start); } -Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) { - SCOPED_TIMER(_merge_local_rf_timer); - std::unique_lock lock(_local_merge_mutex); - if (_merged_rf_num == 0) { - _wrapper = wrapper; - } else { - RETURN_IF_ERROR(merge_from(wrapper)); - } - *merged_num = ++_merged_rf_num; - return Status::OK(); -} - Status IRuntimeFilter::publish(bool publish_local) { DCHECK(is_producer()); - if (_is_global && _has_local_target) { - std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters( - _filter_id, filters)); - // push down - for (auto filter : filters) { - int merged_num = 0; - RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num)); - if (merged_num == _parallel_build_tasks) { - filter->update_runtime_filter_type_to_profile(); - filter->signal(); - } - } - } else if (_has_local_target) { + auto send_to_remote = [&](IRuntimeFilter* filter) { + TNetworkAddress addr; + DCHECK(_state != nullptr); + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); + return filter->push_to_remote(&addr, _opt_remote_rf); + }; + auto send_to_local = [&](RuntimePredicateWrapper* wrapper) { std::vector<IRuntimeFilter*> filters; RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); + DCHECK(!filters.empty()); // push down for (auto filter : filters) { - filter->_wrapper = _wrapper; + filter->_wrapper = wrapper; filter->update_runtime_filter_type_to_profile(); filter->signal(); } + return Status::OK(); + }; + auto do_local_merge = [&]() { + LocalMergeFilters* local_merge_filters = nullptr; + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters( + _filter_id, &local_merge_filters)); + std::lock_guard l(*local_merge_filters->lock); + RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper)); + local_merge_filters->merge_time--; + if (local_merge_filters->merge_time == 0) { + if (_has_local_target) { + RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper)); + } else { + RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0])); + } + } + return Status::OK(); + }; + + if (_need_local_merge && _has_local_target) { + RETURN_IF_ERROR(do_local_merge()); + } else if (_has_local_target) { + RETURN_IF_ERROR(send_to_local(_wrapper)); } else if (!publish_local) { - TNetworkAddress addr; - DCHECK(_state != nullptr); - RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); - return push_to_remote(&addr, _opt_remote_rf); + if (_is_broadcast_join || _state->be_exec_version < 3) { + RETURN_IF_ERROR(send_to_remote(this)); + } else { + RETURN_IF_ERROR(do_local_merge()); + } } else { // remote broadcast join only push onetime in build shared hash table // publish_local only set true on copy shared hash table @@ -1366,9 +1372,6 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) { _profile_init = true; parent_profile->add_child(_profile.get(), true, nullptr); _profile->add_info_string("Info", _format_status()); - if (_is_global) { - _merge_local_rf_timer = ADD_TIMER(_profile.get(), "MergeLocalRuntimeFilterTime"); - } if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) { update_runtime_filter_type_to_profile(); } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index ae8063e5610..d853493889c 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -185,7 +185,7 @@ enum RuntimeFilterState { class IRuntimeFilter { public: IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool, - const TRuntimeFilterDesc* desc, bool is_global = false, int parallel_tasks = -1) + const TRuntimeFilterDesc* desc, bool need_local_merge = false) : _state(state), _pool(pool), _filter_id(desc->filter_id), @@ -204,16 +204,14 @@ public: _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, to_string(_runtime_filter_type))), _profile(new RuntimeProfile(_name)), - _is_global(is_global), - _parallel_build_tasks(parallel_tasks) {} + _need_local_merge(need_local_merge) {} ~IRuntimeFilter() = default; static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, - bool build_bf_exactly = false, bool is_global = false, - int parallel_tasks = 0); + bool build_bf_exactly = false, bool need_local_merge = false); vectorized::SharedRuntimeFilterContext& get_shared_context_ref(); @@ -349,8 +347,6 @@ public: void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>); - Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num); - protected: // serialize _wrapper to protobuf void to_protobuf(PInFilter* filter); @@ -426,18 +422,10 @@ protected: // parent profile // only effect on consumer std::unique_ptr<RuntimeProfile> _profile; - RuntimeProfile::Counter* _merge_local_rf_timer = nullptr; bool _opt_remote_rf; - // `_is_global` indicates whether this runtime filter is global on this BE. - // All runtime filters should be merged on each BE if it is global. - // This is improvement for pipelineX. - const bool _is_global = false; - std::mutex _local_merge_mutex; - // There are `_parallel_build_tasks` pipeline tasks to build runtime filter. - // We should call `signal` once all runtime filters are done and merged to one - // (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`). - int _merged_rf_num = 0; - const int _parallel_build_tasks = -1; + // `_need_local_merge` indicates whether this runtime filter is global on this BE. + // All runtime filters should be merged on each BE before push_to_remote or publish. + const bool _need_local_merge = false; std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer; }; diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 7a738b8c06d..7f34bf7f2c9 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -34,25 +34,24 @@ class VRuntimeFilterSlots { public: VRuntimeFilterSlots( const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs, - const std::vector<IRuntimeFilter*>& runtime_filters, bool is_global = false) + const std::vector<IRuntimeFilter*>& runtime_filters, bool need_local_merge = false) : _build_expr_context(build_expr_ctxs), _runtime_filters(runtime_filters), - _is_global(is_global) {} + _need_local_merge(need_local_merge) {} Status init(RuntimeState* state, int64_t hash_table_size) { // runtime filter effect strategy // 1. we will ignore IN filter when hash_table_size is too big // 2. we will ignore BLOOM filter and MinMax filter when hash_table_size // is too small and IN filter has effect - std::map<int, bool> has_in_filter; auto ignore_local_filter = [&](int filter_id) { // Now pipeline x have bug in ignore, after fix the problem enable ignore logic in pipeline x - if (_is_global) { + if (_need_local_merge) { return Status::OK(); } - auto runtime_filter_mgr = state->runtime_filter_mgr(); + auto runtime_filter_mgr = state->local_runtime_filter_mgr(); std::vector<IRuntimeFilter*> filters; RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, filters)); @@ -217,7 +216,7 @@ public: private: const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context; std::vector<IRuntimeFilter*> _runtime_filters; - const bool _is_global = false; + const bool _need_local_merge = false; // prob_contition index -> [IRuntimeFilter] std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map; }; diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 916ce62aa26..418068ef9d8 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -100,13 +100,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.node_id(), &runtime_filter, false)); - } else { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), p.node_id(), &runtime_filter, false)); - } + RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), + &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 19de8fb7bdc..9c6eff4cda8 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -99,9 +99,9 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo _hash_table_init(state); _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - p._runtime_filter_descs[i], state->query_options(), &_runtime_filters[i], - _build_expr_ctxs.size() == 1, p._use_global_rf, p._child_x->parallel_tasks())); + RETURN_IF_ERROR(state->register_producer_runtime_filter( + p._runtime_filter_descs[i], p._need_local_merge, &_runtime_filters[i], + _build_expr_ctxs.size() == 1)); } return Status::OK(); @@ -370,7 +370,7 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, - bool use_global_rf) + bool need_local_merge) : JoinBuildSinkOperatorX(pool, operator_id, tnode, descs), _join_distribution(tnode.hash_join_node.__isset.dist_type ? tnode.hash_join_node.dist_type : TJoinDistributionType::NONE), @@ -379,7 +379,7 @@ HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int ope _partition_exprs(tnode.__isset.distribute_expr_lists && !_is_broadcast_join ? tnode.distribute_expr_lists[1] : std::vector<TExpr> {}), - _use_global_rf(use_global_rf) {} + _need_local_merge(need_local_merge) {} Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) { if (_is_broadcast_join) { @@ -475,10 +475,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = std::make_shared<vectorized::Block>( local_state._build_side_mutable_block.to_block()); - const bool use_global_rf = - local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf; + const bool need_local_merge = + local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge; RETURN_IF_ERROR(vectorized::process_runtime_filter_build( - state, local_state._shared_state->build_block.get(), &local_state, use_global_rf)); + state, local_state._shared_state->build_block.get(), &local_state, + need_local_merge)); RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { @@ -528,8 +529,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = _shared_hash_table_context->block; local_state._shared_state->build_indexes_null = _shared_hash_table_context->build_indexes_null; - const bool use_global_rf = - local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf; + const bool need_local_merge = + local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge; if (!_shared_hash_table_context->runtime_filters.empty()) { auto ret = std::visit( @@ -545,7 +546,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>( _build_expr_ctxs, local_state._runtime_filters, - use_global_rf); + need_local_merge); RETURN_IF_ERROR(local_state._runtime_filter_slots->init( state, arg.hash_table->size())); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 56a651e4210..134aba69a48 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -190,7 +190,7 @@ private: vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr; const std::vector<TExpr> _partition_exprs; - const bool _use_global_rf; + const bool _need_local_merge; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index c30bb5ad67c..8485e2c0b24 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -42,8 +42,8 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta } _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - p._runtime_filter_descs[i], state->query_options(), &_runtime_filters[i])); + RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i], false, + &_runtime_filters[i], false)); } return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e251b203794..358086e94eb 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -230,8 +230,9 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re _runtime_state = RuntimeState::create_unique( local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); - if (local_params.__isset.runtime_filter_params) { - _runtime_state->set_runtime_filter_params(local_params.runtime_filter_params); + if (idx == 0 && local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); } _runtime_state->set_task_execution_context(shared_from_this()); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 9ffcb40038c..38db8cbe8ff 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -78,7 +78,7 @@ public: RuntimeState* get_runtime_state() { return _runtime_state.get(); } virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId /*fragment_instance_id*/) { - return _runtime_state->runtime_filter_mgr(); + return _runtime_state->local_runtime_filter_mgr(); } QueryContext* get_query_ctx() { return _query_ctx.get(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 9087f94db5e..693cb4d5c30 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -210,8 +210,9 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r _runtime_state->set_total_load_streams(request.total_load_streams); _runtime_state->set_num_local_sink(request.num_local_sink); - _use_global_rf = request.__isset.parallel_instances && (request.__isset.per_node_shared_scans && - !request.per_node_shared_scans.empty()); + _need_local_merge = + request.__isset.parallel_instances && + (request.__isset.per_node_shared_scans && !request.per_node_shared_scans.empty()); // 2. Build pipelines with operators in this fragment. auto root_pipeline = add_pipeline(); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( @@ -523,11 +524,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( filterparams->query_ctx = _query_ctx.get(); } - // build runtime_filter_mgr for each instance + // build local_runtime_filter_mgr for each instance runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(request.query_id, filterparams.get()); - if (local_params.__isset.runtime_filter_params) { - runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params); + if (i == 0 && local_params.__isset.runtime_filter_params) { + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + local_params.runtime_filter_params); } filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); @@ -986,7 +988,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, - _use_global_rf)); + _need_local_merge)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 439b0072d72..54714dd4665 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -177,7 +177,7 @@ private: // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks; - bool _use_global_rf = false; + bool _need_local_merge = false; // It is used to manage the lifecycle of RuntimeFilterMergeController std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index af7370a4c58..64f5d6416a8 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1304,7 +1304,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, pip_context = iter->second; DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_runtime_filter_mgr(fragment_instance_id); + runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); } else { std::unique_lock<std::mutex> lock(_lock); auto iter = _fragment_instance_map.find(tfragment_instance_id); @@ -1315,7 +1315,8 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, fragment_executor = iter->second; DCHECK(fragment_executor != nullptr); - runtime_filter_mgr = fragment_executor->runtime_state()->runtime_filter_mgr(); + runtime_filter_mgr = + fragment_executor->runtime_state()->get_query_ctx()->runtime_filter_mgr(); } return runtime_filter_mgr->update_filter(request, attach_data); diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index cf4321beab3..3db91ba2824 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -187,6 +187,9 @@ public: return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0; } + // global runtime filter mgr, the runtime filter have remote target or + // need local merge should regist here. before publish() or push_to_remote() + // the runtime filter should do the local merge work RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } TUniqueId query_id() const { return _query_id; } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 799348ef1d3..95f65c5fc32 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -74,7 +74,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, IRuntimeFilter** consumer_filter, - bool build_bf_exactly, bool is_global) { + bool build_bf_exactly, bool need_local_merge) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; bool has_exist = false; @@ -89,28 +89,72 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc } } - // TODO: union the remote opt and global two case as one case to one judge - bool remote_opt_or_global = (desc.__isset.opt_remote_rf && desc.opt_remote_rf) || is_global; - if (!has_exist) { IRuntimeFilter* filter; - RETURN_IF_ERROR(IRuntimeFilter::create( - _state, remote_opt_or_global ? _state->obj_pool() : &_pool, &desc, &options, - RuntimeFilterRole::CONSUMER, node_id, &filter, build_bf_exactly, is_global)); + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::CONSUMER, node_id, &filter, + build_bf_exactly, need_local_merge)); _consumer_map[key].emplace_back(node_id, filter); *consumer_filter = filter; - } else if (!remote_opt_or_global) { + } else if (!need_local_merge) { return Status::InvalidArgument("filter has registered"); } return Status::OK(); } +Status RuntimeFilterMgr::register_local_merge_producer_filter( + const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options, + doris::IRuntimeFilter** producer_filter, bool build_bf_exactly) { + SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); + int32_t key = desc.filter_id; + + decltype(_local_merge_producer_map.end()) iter; + { + std::lock_guard<std::mutex> l(_lock); + iter = _local_merge_producer_map.find(key); + if (iter == _local_merge_producer_map.end()) { + auto [new_iter, _] = _local_merge_producer_map.emplace(key, LocalMergeFilters {}); + iter = new_iter; + } + } + + DCHECK(_state != nullptr); + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::PRODUCER, -1, producer_filter, + build_bf_exactly, true)); + { + std::lock_guard<std::mutex> l(*iter->second.lock); + if (iter->second.filters.empty()) { + IRuntimeFilter* merge_filter = nullptr; + RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, + RuntimeFilterRole::PRODUCER, -1, &merge_filter, + build_bf_exactly, true)); + iter->second.filters.emplace_back(merge_filter); + } + iter->second.merge_time++; + iter->second.filters.emplace_back(*producer_filter); + } + return Status::OK(); +} + +Status RuntimeFilterMgr::get_local_merge_producer_filters( + int filter_id, doris::LocalMergeFilters** local_merge_filters) { + std::lock_guard<std::mutex> l(_lock); + auto iter = _local_merge_producer_map.find(filter_id); + if (iter == _local_merge_producer_map.end()) { + return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", filter_id); + } + *local_merge_filters = &iter->second; + DCHECK(!iter->second.filters.empty()); + DCHECK_GT(iter->second.merge_time, 0); + return Status::OK(); +} + Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, IRuntimeFilter** producer_filter, - bool build_bf_exactly, bool is_global, - int parallel_tasks) { + bool build_bf_exactly) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; std::lock_guard<std::mutex> l(_lock); @@ -122,7 +166,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc } RETURN_IF_ERROR(IRuntimeFilter::create(_state, &_pool, &desc, &options, RuntimeFilterRole::PRODUCER, -1, producer_filter, - build_bf_exactly, is_global, parallel_tasks)); + build_bf_exactly)); _producer_map.emplace(key, *producer_filter); return Status::OK(); } @@ -133,7 +177,19 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, UpdateRuntimeFilterParams params(request, data, &_pool); int filter_id = request->filter_id(); std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(get_consume_filters(filter_id, filters)); + // The code is organized for upgrade compatibility to prevent infinite waiting + // old way update filter the code should be deleted after the upgrade is complete. + { + std::lock_guard<std::mutex> l(_lock); + auto iter = _consumer_map.find(filter_id); + if (iter == _consumer_map.end()) { + return Status::InvalidArgument("unknown filter: {}, role: CONSUMER.", filter_id); + } + for (auto& holder : iter->second) { + filters.emplace_back(holder.filter); + } + iter->second.clear(); + } for (auto filter : filters) { RETURN_IF_ERROR(filter->update_filter(¶ms)); } @@ -143,8 +199,11 @@ Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, void RuntimeFilterMgr::set_runtime_filter_params( const TRuntimeFilterParams& runtime_filter_params) { - this->_merge_addr = runtime_filter_params.runtime_filter_merge_addr; - this->_has_merge_addr = true; + std::lock_guard l(_lock); + if (!_has_merge_addr) { + _merge_addr = runtime_filter_params.runtime_filter_merge_addr; + _has_merge_addr = true; + } } Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { @@ -454,7 +513,7 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* sta params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms(); params->enable_pipeline_exec = state->enable_pipeline_exec(); params->execution_timeout = state->execution_timeout(); - params->runtime_filter_mgr = state->runtime_filter_mgr(); + params->runtime_filter_mgr = state->local_runtime_filter_mgr(); params->exec_env = state->exec_env(); params->query_id.set_hi(state->query_id().hi); params->query_id.set_lo(state->query_id().lo); @@ -479,8 +538,6 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(QueryContext* que params->be_exec_version = query_ctx->be_exec_version(); params->query_ctx = query_ctx; - params->_obj_pool = &query_ctx->obj_pool; - params->_is_global = true; return params; } diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 7caea8011d2..c9b455bc107 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -53,6 +53,12 @@ class QueryContext; struct RuntimeFilterParamsContext; class ExecEnv; +struct LocalMergeFilters { + std::unique_ptr<std::mutex> lock = std::make_unique<std::mutex>(); + int merge_time = 0; + std::vector<IRuntimeFilter*> filters; +}; + /// producer: /// Filter filter; /// get_filter(filter_id, &filter); @@ -76,10 +82,18 @@ public: // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, IRuntimeFilter** consumer_filter, - bool build_bf_exactly = false, bool is_global = false); + bool build_bf_exactly = false, bool need_local_merge = false); + + Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc, + const TQueryOptions& options, + IRuntimeFilter** producer_filter, + bool build_bf_exactly = false); + + Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters); + Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - IRuntimeFilter** producer_filter, bool build_bf_exactly = false, - bool is_global = false, int parallel_tasks = 0); + IRuntimeFilter** producer_filter, + bool build_bf_exactly = false); // update filter by remote Status update_filter(const PPublishFilterRequest* request, @@ -100,6 +114,7 @@ private: /// TODO: should it need protected by a mutex? std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map; std::map<int32_t, IRuntimeFilter*> _producer_map; + std::map<int32_t, LocalMergeFilters> _local_merge_producer_map; RuntimeFilterParamsContext* _state = nullptr; std::unique_ptr<MemTracker> _tracker; @@ -257,15 +272,6 @@ struct RuntimeFilterParamsContext { int be_exec_version; QueryContext* query_ctx; QueryContext* get_query_ctx() const { return query_ctx; } - ObjectPool* _obj_pool; - bool _is_global = false; - PUniqueId fragment_instance_id() const { - DCHECK(!_is_global); - return _fragment_instance_id; - } - ObjectPool* obj_pool() const { - DCHECK(_is_global); - return _obj_pool; - } + PUniqueId fragment_instance_id() const { return _fragment_instance_id; } }; } // namespace doris diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index d25d914147b..8762366fe04 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -101,7 +101,8 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _runtime_filter_mgr.reset(new RuntimeFilterMgr(fragment_exec_params.query_id, RuntimeFilterParamsContext::create(this))); if (fragment_exec_params.__isset.runtime_filter_params) { - _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params); + _query_ctx->runtime_filter_mgr()->set_runtime_filter_params( + fragment_exec_params.runtime_filter_params); } } @@ -305,11 +306,6 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt return Status::OK(); } -void RuntimeState::set_runtime_filter_params( - const TRuntimeFilterParams& runtime_filter_params) const { - _runtime_filter_mgr->set_runtime_filter_params(runtime_filter_params); -} - void RuntimeState::init_mem_trackers(const TUniqueId& id, const std::string& name) { _query_mem_tracker = std::make_shared<MemTrackerLimiter>( MemTrackerLimiter::Type::EXPERIMENTAL, fmt::format("{}#Id={}", name, print_id(id))); @@ -506,4 +502,32 @@ bool RuntimeState::enable_page_cache() const { (_query_options.__isset.enable_page_cache && _query_options.enable_page_cache); } +RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() { + return _query_ctx->runtime_filter_mgr(); +} + +Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, + bool need_local_merge, + doris::IRuntimeFilter** producer_filter, + bool build_bf_exactly) { + if (desc.has_remote_targets || need_local_merge) { + return global_runtime_filter_mgr()->register_local_merge_producer_filter( + desc, query_options(), producer_filter, build_bf_exactly); + } else { + return local_runtime_filter_mgr()->register_producer_filter( + desc, query_options(), producer_filter, build_bf_exactly); + } +} + +Status RuntimeState::register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, + bool need_local_merge, int node_id, + doris::IRuntimeFilter** consumer_filter) { + if (desc.has_remote_targets || need_local_merge) { + return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, + consumer_filter, false, true); + } else { + return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id, + consumer_filter, false, false); + } +} } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index cdc7e83f042..03b518e5c34 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -44,6 +44,7 @@ #include "util/runtime_profile.h" namespace doris { +class IRuntimeFilter; namespace pipeline { class PipelineXLocalStateBase; @@ -98,8 +99,6 @@ public: Status init(const TUniqueId& fragment_instance_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); - void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params) const; - // for ut and non-query. void set_exec_env(ExecEnv* exec_env) { _exec_env = exec_env; } void init_mem_trackers(const TUniqueId& id = TUniqueId(), const std::string& name = "unknown"); @@ -454,7 +453,10 @@ public: // if load mem limit is not set, or is zero, using query mem limit instead. int64_t get_load_mem_limit(); - RuntimeFilterMgr* runtime_filter_mgr() { + // local runtime filter mgr, the runtime filter do not have remote target or + // not need local merge should regist here. the instance exec finish, the local + // runtime filter mgr can release the memory of local runtime filter + RuntimeFilterMgr* local_runtime_filter_mgr() { if (_pipeline_x_runtime_filter_mgr) { return _pipeline_x_runtime_filter_mgr; } else { @@ -462,6 +464,8 @@ public: } } + RuntimeFilterMgr* global_runtime_filter_mgr(); + void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr* pipeline_x_runtime_filter_mgr) { _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr; } @@ -567,6 +571,15 @@ public: return _task_execution_context; } + Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc, + bool need_local_merge, + doris::IRuntimeFilter** producer_filter, + bool build_bf_exactly); + + Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc, + bool need_local_merge, int node_id, + doris::IRuntimeFilter** producer_filter); + private: Status create_error_log_file(); @@ -595,9 +608,6 @@ private: // owned by PipelineXFragmentContext RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr; - // Protects _data_stream_recvrs_pool - std::mutex _data_stream_recvrs_lock; - // Data stream receivers created by a plan fragment are gathered here to make sure // they are destroyed before _obj_pool (class members are destroyed in reverse order). // Receivers depend on the descriptor table and we need to guarantee that their control diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 67dffa4b203..de3b63371ee 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -180,9 +180,9 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { #endif for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - _runtime_filter_descs[i], state->query_options(), &_runtime_filters[i], - _probe_expr_ctxs.size() == 1)); + RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i], false, + &_runtime_filters[i], + _probe_expr_ctxs.size() == 1)); } // init left/right output slots flags, only column of slot_id in _hash_output_slot_ids need diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 150068096ba..ad168ba9c86 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -118,8 +118,8 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { std::vector<TExpr> filter_src_exprs; for (size_t i = 0; i < _runtime_filter_descs.size(); i++) { filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr); - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter( - _runtime_filter_descs[i], state->query_options(), &_runtime_filters[i])); + RETURN_IF_ERROR(state->register_producer_runtime_filter(_runtime_filter_descs[i], false, + &_runtime_filters[i], false)); } RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs)); return Status::OK(); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index eba11dac45d..e683c4f2be0 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, _blocked_by_rf = std::make_shared<std::atomic_bool>(false); } -Status RuntimeFilterConsumer::init(RuntimeState* state, bool is_global) { +Status RuntimeFilterConsumer::init(RuntimeState* state, bool need_local_merge) { _state = state; - RETURN_IF_ERROR(_register_runtime_filter(is_global)); + RETURN_IF_ERROR(_register_runtime_filter(need_local_merge)); return Status::OK(); } @@ -45,28 +45,15 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { profile->add_info_string("RuntimeFilters: ", ss.str()); } -Status RuntimeFilterConsumer::_register_runtime_filter(bool is_global) { +Status RuntimeFilterConsumer::_register_runtime_filter(bool need_local_merge) { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); for (int i = 0; i < filter_size; ++i) { IRuntimeFilter* runtime_filter = nullptr; const auto& filter_desc = _runtime_filter_descs[i]; - if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - DCHECK(filter_desc.has_remote_targets); - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, - is_global)); - } else if (is_global) { - // For pipelineX engine, runtime filter is global iff data distribution is ignored. - RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, - is_global)); - } else { - RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, _state->query_options(), _filter_id, &runtime_filter, false, - is_global)); - } + RETURN_IF_ERROR(_state->register_consumer_runtime_filter(filter_desc, need_local_merge, + _filter_id, &runtime_filter)); _runtime_filter_ctxs.emplace_back(runtime_filter); _runtime_filter_ready_flag.emplace_back(false); } diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 15b9455ac56..b8513e666bc 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -30,7 +30,7 @@ public: const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); ~RuntimeFilterConsumer() = default; - Status init(RuntimeState* state, bool is_global = false); + Status init(RuntimeState* state, bool need_local_merge = false); // Try to append late arrived runtime filters. // Return num of filters which are applied already. @@ -42,7 +42,7 @@ public: protected: // Register and get all runtime filters at Init phase. - Status _register_runtime_filter(bool is_global); + Status _register_runtime_filter(bool need_local_merge); // Get all arrived runtime filters at Open phase. Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp index 42f6250a030..13d19921b03 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -81,13 +81,8 @@ Status VDataGenFunctionScanNode::prepare(RuntimeState* state) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : _runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - if (filter_desc.__isset.opt_remote_rf && filter_desc.opt_remote_rf) { - RETURN_IF_ERROR(state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), id(), &runtime_filter, false)); - } else { - RETURN_IF_ERROR(state->runtime_filter_mgr()->register_consumer_filter( - filter_desc, state->query_options(), id(), &runtime_filter, false)); - } + RETURN_IF_ERROR( + state->register_consumer_runtime_filter(filter_desc, false, id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ebd6b240028..1093600a485 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2153,7 +2153,8 @@ public class Coordinator implements CoordInterface { } for (RuntimeFilterId rid : fragment.getBuilderRuntimeFilterIds()) { - ridToBuilderNum.merge(rid, params.instanceExecParams.size(), Integer::sum); + ridToBuilderNum.merge(rid, + (int) params.instanceExecParams.stream().map(ins -> ins.host).distinct().count(), Integer::sum); } } // Use the uppermost fragment as a merged node, the uppermost fragment has one and only one instance --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org