This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c7b447a3d20 [refactor](fragment) Use fragment ID to manage fragment context (#42048) c7b447a3d20 is described below commit c7b447a3d206f101454f73ea51e510d0023639ca Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Oct 18 11:44:20 2024 +0800 [refactor](fragment) Use fragment ID to manage fragment context (#42048) Use fragment ID to manage fragment context --- be/src/pipeline/pipeline_fragment_context.cpp | 9 +-- be/src/pipeline/pipeline_fragment_context.h | 14 ----- be/src/runtime/fragment_mgr.cpp | 68 +++++----------------- be/src/runtime/fragment_mgr.h | 8 +-- be/src/runtime/query_context.h | 2 - be/src/runtime/runtime_filter_mgr.cpp | 16 +++-- be/src/runtime/runtime_filter_mgr.h | 1 - be/src/service/backend_service.cpp | 7 --- be/src/service/backend_service.h | 2 +- .../main/java/org/apache/doris/qe/Coordinator.java | 24 +++++--- gensrc/proto/internal_service.proto | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 12 files changed, 51 insertions(+), 102 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 11898952276..28cfefbf6c1 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -572,10 +572,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag void PipelineFragmentContext::_init_next_report_time() { auto interval_s = config::pipeline_status_report_interval; if (_is_report_success && interval_s > 0 && _timeout > interval_s) { - std::vector<string> ins_ids; - instance_ids(ins_ids); - VLOG_FILE << "enable period report: instance_id=" - << fmt::format("{}", fmt::join(ins_ids, ", ")); + VLOG_FILE << "enable period report: fragment id=" << _fragment_id; uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC; // We don't want to wait longer than it takes to run the entire fragment. _previous_report_time = @@ -613,11 +610,9 @@ void PipelineFragmentContext::trigger_report_if_necessary() { return; } if (VLOG_FILE_IS_ON) { - std::vector<string> ins_ids; - instance_ids(ins_ids); VLOG_FILE << "Reporting " << "profile for query_id " << print_id(_query_id) - << ", instance ids: " << fmt::format("{}", fmt::join(ins_ids, ", ")); + << ", fragment id: " << _fragment_id; std::stringstream ss; _runtime_state->runtime_profile()->compute_time_in_profile(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index bcef1271b60..0749729789e 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -115,20 +115,6 @@ public: [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id--; } - void instance_ids(std::vector<TUniqueId>& ins_ids) const { - ins_ids.resize(_fragment_instance_ids.size()); - for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { - ins_ids[i] = _fragment_instance_ids[i]; - } - } - - void instance_ids(std::vector<string>& ins_ids) const { - ins_ids.resize(_fragment_instance_ids.size()); - for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { - ins_ids[i] = print_id(_fragment_instance_ids[i]); - } - } - void clear_finished_tasks() { for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 008f92cfef8..26fb098c76d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -106,7 +106,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); -bvar::Adder<int64_t> g_pipeline_fragment_instances_count("doris_pipeline_fragment_instances_count"); bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count"); bvar::Status<uint64_t> g_fragment_last_active_time( @@ -598,18 +597,13 @@ void FragmentMgr::remove_pipeline_context( { std::lock_guard<std::mutex> lock(_lock); auto query_id = f_context->get_query_id(); - std::vector<TUniqueId> ins_ids; - f_context->instance_ids(ins_ids); int64 now = duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch()) .count(); g_fragment_executing_count << -1; g_fragment_last_active_time.set_value(now); - for (const auto& ins_id : ins_ids) { - LOG_INFO("Removing query {} instance {}", print_id(query_id), print_id(ins_id)); - _pipeline_map.erase(ins_id); - g_pipeline_fragment_instances_count << -1; - } + LOG_INFO("Removing query {} fragment {}", print_id(query_id), f_context->get_fragment_id()); + _pipeline_map.erase({query_id, f_context->get_fragment_id()}); } } @@ -743,11 +737,10 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) { continue; } auto timeout_second = it.second->timeout_second(); - fmt::format_to(debug_string_buffer, - "No.{} (elapse_second={}s, query_timeout_second={}s, instance_id=" - "{}, is_timeout={}) : {}\n", - i, elapsed, timeout_second, print_id(it.first), - it.second->is_timeout(now), it.second->debug_string()); + fmt::format_to( + debug_string_buffer, + "No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}) : {}\n", i, + elapsed, timeout_second, it.second->is_timeout(now), it.second->debug_string()); i++; } } @@ -807,11 +800,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (const auto& local_param : params.local_params) { const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; std::lock_guard<std::mutex> lock(_lock); - auto iter = _pipeline_map.find(fragment_instance_id); + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); if (iter != _pipeline_map.end()) { - return Status::InternalError( - "exec_plan_fragment input duplicated fragment_instance_id({})", - UniqueId(fragment_instance_id).to_string()); + return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", + params.fragment_id); } query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } @@ -827,12 +819,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); std::lock_guard<std::mutex> lock(_lock); - std::vector<TUniqueId> ins_ids; - context->instance_ids(ins_ids); // TODO: simplify this mapping - for (const auto& ins_id : ins_ids) { - _pipeline_map.insert({ins_id, context}); - } + _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } query_ctx->set_pipeline_context(params.fragment_id, context); @@ -877,31 +865,6 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) { << " is cancelled and removed. Reason: " << reason.to_string(); } -void FragmentMgr::cancel_instance(const TUniqueId instance_id, const Status reason) { - std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_ctx; - { - std::lock_guard<std::mutex> state_lock(_lock); - DCHECK(!_pipeline_map.contains(instance_id)) - << " Pipeline tasks should be canceled by query instead of instance! Query ID: " - << print_id(_pipeline_map[instance_id]->get_query_id()); - const bool is_pipeline_instance = _pipeline_map.contains(instance_id); - if (is_pipeline_instance) { - auto itr = _pipeline_map.find(instance_id); - if (itr != _pipeline_map.end()) { - pipeline_ctx = itr->second; - } else { - LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) - << " to cancel"; - return; - } - } - } - - if (pipeline_ctx != nullptr) { - pipeline_ctx->cancel(reason); - } -} - void FragmentMgr::cancel_worker() { LOG(INFO) << "FragmentMgr cancel worker start working."; @@ -1167,15 +1130,16 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, RuntimeFilterMgr* runtime_filter_mgr = nullptr; - const auto& fragment_instance_ids = request->fragment_instance_ids(); + const auto& fragment_ids = request->fragment_ids(); { std::unique_lock<std::mutex> lock(_lock); - for (UniqueId fragment_instance_id : fragment_instance_ids) { - TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - + for (auto fragment_id : fragment_ids) { if (is_pipeline) { - auto iter = _pipeline_map.find(tfragment_instance_id); + auto iter = _pipeline_map.find( + {UniqueId(request->query_id()).to_thrift(), fragment_id}); if (iter == _pipeline_map.end()) { + LOG(WARNING) << "No pipeline fragment is found: Query-ID = " + << request->query_id() << " fragment_id = " << fragment_id; continue; } pip_context = iter->second; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index bc066066f7b..41b63db0b23 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -100,9 +100,6 @@ public: Status trigger_pipeline_context_report(const ReportStatusRequest, std::shared_ptr<pipeline::PipelineFragmentContext>&&); - // Cancel instance (pipeline or nonpipeline). - void cancel_instance(const TUniqueId instance_id, const Status reason); - // Can be used in both version. void cancel_query(const TUniqueId query_id, const Status reason); @@ -169,7 +166,10 @@ private: // call _lock, so that there is dead lock. std::mutex _lock; - std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map; + // (QueryID, FragmentID) -> PipelineFragmentContext + std::unordered_map<std::pair<TUniqueId, int>, + std::shared_ptr<pipeline::PipelineFragmentContext>> + _pipeline_map; // query id -> QueryContext std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctx_map; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index d1d78573923..9d499f3487e 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -193,8 +193,6 @@ public: ThreadPool* get_memtable_flush_pool(); - std::vector<TUniqueId> get_fragment_instance_ids() const { return fragment_instance_ids; } - int64_t mem_limit() const { return _bytes_limit; } void set_merge_controller_handler( diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 5b865f28dce..08a229c0ecf 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -228,7 +228,6 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc( // so we need to copy to cnt_val cnt_val->producer_size = producer_size; cnt_val->runtime_filter_desc = *runtime_filter_desc; - cnt_val->target_info = *target_info; cnt_val->pool.reset(new ObjectPool()); cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc)); @@ -460,10 +459,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); closure->cntl_->ignore_eovercrowded(); // set fragment-id - for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { - PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); - cur_id->set_hi(target_fragment_instance_id.hi); - cur_id->set_lo(target_fragment_instance_id.lo); + if (target.__isset.target_fragment_ids) { + for (auto& target_fragment_id : target.target_fragment_ids) { + closure->request_->add_fragment_ids(target_fragment_id); + } + } else { + // FE not upgraded yet. + for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) { + PUniqueId* cur_id = closure->request_->add_fragment_instance_ids(); + cur_id->set_hi(target_fragment_instance_id.hi); + cur_id->set_lo(target_fragment_instance_id.lo); + } } std::shared_ptr<PBackendService_Stub> stub( diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index d89a3b9f1b1..b0aea7568cf 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -168,7 +168,6 @@ public: int producer_size; uint64_t global_size; TRuntimeFilterDesc runtime_filter_desc; - std::vector<doris::TRuntimeFilterTargetParams> target_info; std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info; IRuntimeFilter* filter = nullptr; std::unordered_set<UniqueId> arrive_id; diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index aa29661da02..d56aa49b19b 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -657,13 +657,6 @@ Status BaseBackendService::start_plan_fragment_execution( QuerySource::INTERNAL_FRONTEND); } -void BaseBackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) { - LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id); - _exec_env->fragment_mgr()->cancel_instance( - params.fragment_instance_id, Status::InternalError("cancel message received from FE")); -} - void BaseBackendService::transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) { VLOG_ROW << "transmit_data(): instance_id=" << params.dest_fragment_instance_id diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 4d01107ba8a..1d4219e2191 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -90,7 +90,7 @@ public: const TExecPlanFragmentParams& params) override; void cancel_plan_fragment(TCancelPlanFragmentResult& return_val, - const TCancelPlanFragmentParams& params) override; + const TCancelPlanFragmentParams& params) override {}; void transmit_data(TTransmitDataResult& return_val, const TTransmitDataParams& params) override; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5b977a87f1f..8e580c549df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1997,7 +1997,8 @@ public class Coordinator implements CoordInterface { List<FRuntimeFilterTargetParam> targetFragments = ridToTargetParam.computeIfAbsent(rid, k -> new ArrayList<>()); for (final FInstanceExecParam instance : params.instanceExecParams) { - targetFragments.add(new FRuntimeFilterTargetParam(instance.instanceId, toBrpcHost(instance.host))); + targetFragments.add(new FRuntimeFilterTargetParam(instance.fragment().getFragmentId().asInt(), + toBrpcHost(instance.host))); } } @@ -3181,8 +3182,8 @@ public class Coordinator implements CoordInterface { for (FRuntimeFilterTargetParam targetParam : fParams) { if (targetParamsV2.containsKey(targetParam.targetFragmentInstanceAddr)) { targetParamsV2.get(targetParam.targetFragmentInstanceAddr) - .target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + .target_fragment_ids + .add(targetParam.targetFragmentId); } else { targetParamsV2.put(targetParam.targetFragmentInstanceAddr, new TRuntimeFilterTargetParamsV2()); @@ -3190,11 +3191,15 @@ public class Coordinator implements CoordInterface { .target_fragment_instance_addr = targetParam.targetFragmentInstanceAddr; targetParamsV2.get(targetParam.targetFragmentInstanceAddr) - .target_fragment_instance_ids + .target_fragment_ids = new ArrayList<>(); + targetParamsV2.get(targetParam.targetFragmentInstanceAddr) + .target_fragment_ids + .add(targetParam.targetFragmentId); + // `target_fragment_instance_ids` is a required field targetParamsV2.get(targetParam.targetFragmentInstanceAddr) .target_fragment_instance_ids - .add(targetParam.targetFragmentInstanceId); + = new ArrayList<>(); } } @@ -3203,7 +3208,8 @@ public class Coordinator implements CoordInterface { } else { List<TRuntimeFilterTargetParams> targetParams = Lists.newArrayList(); for (FRuntimeFilterTargetParam targetParam : fParams) { - targetParams.add(new TRuntimeFilterTargetParams(targetParam.targetFragmentInstanceId, + // Instance id make no sense if this runtime filter doesn't have remote targets. + targetParams.add(new TRuntimeFilterTargetParams(new TUniqueId(), targetParam.targetFragmentInstanceAddr)); } localParams.runtime_filter_params.putToRidToTargetParam(rf.getFilterId().asInt(), @@ -3373,12 +3379,12 @@ public class Coordinator implements CoordInterface { // Runtime filter target fragment instance param static class FRuntimeFilterTargetParam { - public TUniqueId targetFragmentInstanceId; + public int targetFragmentId; public TNetworkAddress targetFragmentInstanceAddr; - public FRuntimeFilterTargetParam(TUniqueId id, TNetworkAddress host) { - this.targetFragmentInstanceId = id; + public FRuntimeFilterTargetParam(int id, TNetworkAddress host) { + this.targetFragmentId = id; this.targetFragmentInstanceAddr = host; } } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 9abf9d7ea65..f3764cea233 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -613,6 +613,7 @@ message PPublishFilterRequestV2 { optional int64 merge_time = 9; optional bool contain_null = 10; optional bool ignored = 11; + repeated int32 fragment_ids = 12; }; message PPublishFilterResponse { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b560059819f..62a45260f80 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -374,6 +374,7 @@ struct TRuntimeFilterTargetParamsV2 { 1: required list<Types.TUniqueId> target_fragment_instance_ids // The address of the instance where the fragment is expected to run 2: required Types.TNetworkAddress target_fragment_instance_addr + 3: optional list<i32> target_fragment_ids } struct TRuntimeFilterParams { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org