This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c58a1b7491e [reafactor](runtime filter) Delete unused targets parameter (#45573) c58a1b7491e is described below commit c58a1b7491ee9118027b9a316289c1a28a9801c5 Author: Gabriel <liwenqi...@selectdb.com> AuthorDate: Wed Dec 18 14:34:32 2024 +0800 [reafactor](runtime filter) Delete unused targets parameter (#45573) `TRuntimeFilterTargetParams` will never be used since version 3.0. --- be/src/runtime/runtime_filter_mgr.cpp | 68 +++++----------------- be/src/runtime/runtime_filter_mgr.h | 7 +-- .../main/java/org/apache/doris/qe/Coordinator.java | 10 ---- .../qe/runtime/RuntimeFiltersThriftBuilder.java | 10 ---- gensrc/thrift/PaloInternalService.thrift | 4 +- 5 files changed, 19 insertions(+), 80 deletions(-) diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index c16db7c67d3..b4a38173d72 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -219,35 +219,14 @@ Status RuntimeFilterMgr::get_merge_addr(TNetworkAddress* addr) { Status RuntimeFilterMergeControllerEntity::_init_with_desc( const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, - const std::vector<doris::TRuntimeFilterTargetParams>* target_info, - const int producer_size) { - std::unique_lock<std::shared_mutex> guard(_filter_map_mutex); - std::shared_ptr<RuntimeFilterCntlVal> cnt_val = std::make_shared<RuntimeFilterCntlVal>(); - // runtime_filter_desc and target will be released, - // so we need to copy to cnt_val - cnt_val->producer_size = producer_size; - cnt_val->runtime_filter_desc = *runtime_filter_desc; - cnt_val->pool.reset(new ObjectPool()); - cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); - - auto filter_id = runtime_filter_desc->filter_id; - RETURN_IF_ERROR( - cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1)); - cnt_val->filter->set_ignored(); - _filter_map.emplace(filter_id, cnt_val); - return Status::OK(); -} - -Status RuntimeFilterMergeControllerEntity::_init_with_desc( - const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, - const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info, + const std::vector<doris::TRuntimeFilterTargetParamsV2>&& targetv2_info, const int producer_size) { std::shared_ptr<RuntimeFilterCntlVal> cnt_val = std::make_shared<RuntimeFilterCntlVal>(); // runtime_filter_desc and target will be released, // so we need to copy to cnt_val cnt_val->producer_size = producer_size; cnt_val->runtime_filter_desc = *runtime_filter_desc; - cnt_val->targetv2_info = *targetv2_info; + cnt_val->targetv2_info = targetv2_info; cnt_val->pool.reset(new ObjectPool()); cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); auto filter_id = runtime_filter_desc->filter_id; @@ -268,36 +247,21 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, if (runtime_filter_params.__isset.rid_to_runtime_filter) { for (const auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) { int filter_id = filterid_to_desc.first; - const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id); - if (target_iter == runtime_filter_params.rid_to_target_param.end() && - !runtime_filter_params.__isset.rid_to_target_paramv2) { - // This runtime filter has to target info - return Status::InternalError("runtime filter params meet error"); - } else if (target_iter == runtime_filter_params.rid_to_target_param.end()) { - const auto& targetv2_iter = - runtime_filter_params.rid_to_target_paramv2.find(filter_id); - if (targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end()) { - // This runtime filter has to target info - return Status::InternalError("runtime filter params meet error"); - } - const auto& build_iter = - runtime_filter_params.runtime_filter_builder_num.find(filter_id); - if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { - // This runtime filter has to builder info - return Status::InternalError("runtime filter params meet error"); - } - - RETURN_IF_ERROR(_init_with_desc(&filterid_to_desc.second, &query_options, - &targetv2_iter->second, build_iter->second)); - } else { - const auto& build_iter = - runtime_filter_params.runtime_filter_builder_num.find(filter_id); - if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { - return Status::InternalError("runtime filter params meet error"); - } - RETURN_IF_ERROR(_init_with_desc(&filterid_to_desc.second, &query_options, - &target_iter->second, build_iter->second)); + const auto& targetv2_iter = runtime_filter_params.rid_to_target_paramv2.find(filter_id); + const auto& build_iter = + runtime_filter_params.runtime_filter_builder_num.find(filter_id); + if (build_iter == runtime_filter_params.runtime_filter_builder_num.end()) { + // This runtime filter has no builder info + return Status::InternalError( + "Runtime filter has a wrong parameter. Maybe FE version is mismatched."); } + + RETURN_IF_ERROR(_init_with_desc( + &filterid_to_desc.second, &query_options, + targetv2_iter == runtime_filter_params.rid_to_target_paramv2.end() + ? std::vector<doris::TRuntimeFilterTargetParamsV2> {} + : std::move(targetv2_iter->second), + build_iter->second)); } } return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 9f4cf5f4e22..c54be905f28 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -192,12 +192,7 @@ public: private: Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* query_options, - const std::vector<doris::TRuntimeFilterTargetParams>* target_info, - const int producer_size); - - Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc, - const TQueryOptions* query_options, - const std::vector<doris::TRuntimeFilterTargetParamsV2>* target_info, + const std::vector<doris::TRuntimeFilterTargetParamsV2>&& target_info, const int producer_size); UniqueId _query_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 1e327c469e7..472f2462e4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -111,7 +111,6 @@ import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReportExecStatusParams; import org.apache.doris.thrift.TResourceLimit; import org.apache.doris.thrift.TRuntimeFilterParams; -import org.apache.doris.thrift.TRuntimeFilterTargetParams; import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; @@ -3262,15 +3261,6 @@ public class Coordinator implements CoordInterface { localParams.runtime_filter_params.putToRidToTargetParamv2(rf.getFilterId().asInt(), new ArrayList<TRuntimeFilterTargetParamsV2>(targetParamsV2.values())); - } else { - List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList(); - for (FRuntimeFilterTargetParam targetParam : fParams) { - // Instance id make no sense if this runtime filter doesn't have remote targets. - targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(), - targetParam.targetFragmentInstanceAddr)); - } - localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(), - targetParams); } } for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java index 47c01ef8eb3..f9ab8e83f07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java @@ -27,11 +27,8 @@ import org.apache.doris.planner.RuntimeFilterId; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TRuntimeFilterParams; -import org.apache.doris.thrift.TRuntimeFilterTargetParams; import org.apache.doris.thrift.TRuntimeFilterTargetParamsV2; -import org.apache.doris.thrift.TUniqueId; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.ArrayList; @@ -94,13 +91,6 @@ public class RuntimeFiltersThriftBuilder { runtimeFilterParams.putToRidToTargetParamv2( rf.getFilterId().asInt(), new ArrayList<>(targetToParams.values()) ); - } else { - List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList(); - for (RuntimeFilterTarget target : targets) { - // Instance id make no sense if this runtime filter doesn't have remote targets. - targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(), target.address)); - } - runtimeFilterParams.putToRidToTargetParam(rf.getFilterId().asInt(), targetParams); } } for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 85abbf9b66d..0a1ea4a98fc 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -373,6 +373,7 @@ struct TScanRangeParams { 2: optional i32 volume_id = -1 } +// deprecated struct TRuntimeFilterTargetParams { 1: required Types.TUniqueId target_fragment_instance_id // The address of the instance where the fragment is expected to run @@ -390,8 +391,7 @@ struct TRuntimeFilterParams { // Runtime filter merge instance address 1: optional Types.TNetworkAddress runtime_filter_merge_addr - // Runtime filter ID to the instance address of the fragment, - // that is expected to use this runtime filter + // deprecated 2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param // Runtime filter ID to the runtime filter desc --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org