This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 877935442f3 [feature](pipelineX)use markFragments instead of markInstances in pipelineX (#27829) 877935442f3 is described below commit 877935442f3a0592737b5e11dd1bc46eba5d5849 Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Dec 11 17:59:53 2023 +0800 [feature](pipelineX)use markFragments instead of markInstances in pipelineX (#27829) --- be/src/exprs/runtime_filter.cpp | 38 ++-- be/src/exprs/runtime_filter.h | 34 ++-- be/src/exprs/runtime_filter_rpc.cpp | 16 +- be/src/pipeline/pipeline_fragment_context.h | 7 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 208 ++++++++++++--------- .../pipeline_x/pipeline_x_fragment_context.h | 41 ++-- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 9 +- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 + be/src/runtime/fragment_mgr.cpp | 132 ++++++++----- be/src/runtime/fragment_mgr.h | 8 + be/src/runtime/query_context.h | 5 + be/src/runtime/runtime_filter_mgr.cpp | 13 +- be/src/runtime/runtime_filter_mgr.h | 13 +- be/src/runtime/runtime_state.cpp | 63 ++++++- be/src/runtime/runtime_state.h | 43 ++++- be/src/service/internal_service.cpp | 17 +- be/test/exprs/runtime_filter_test.cpp | 5 +- .../doris/common/profile/ExecutionProfile.java | 135 ++++++++++++- .../main/java/org/apache/doris/qe/Coordinator.java | 161 +++++++++++----- .../org/apache/doris/rpc/BackendServiceProxy.java | 18 ++ gensrc/proto/internal_service.proto | 2 + 21 files changed, 710 insertions(+), 261 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index de3f2ad4fd5..efdb8af7029 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -279,20 +279,20 @@ Status create_vbin_predicate(const TypeDescriptor& type, TExprOpcode::type opcod // This class is a wrapper of runtime predicate function class RuntimePredicateWrapper { public: - RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, + RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* pool, const RuntimeFilterParams* params) : _state(state), - _be_exec_version(_state->be_exec_version()), + _be_exec_version(_state->be_exec_version), _pool(pool), _column_return_type(params->column_return_type), _filter_type(params->filter_type), _filter_id(params->filter_id) {} // for a 'tmp' runtime predicate wrapper // only could called assign method or as a param for merge - RuntimePredicateWrapper(RuntimeState* state, ObjectPool* pool, PrimitiveType column_type, - RuntimeFilterType type, uint32_t filter_id) + RuntimePredicateWrapper(RuntimeFilterParamsContext* state, ObjectPool* pool, + PrimitiveType column_type, RuntimeFilterType type, uint32_t filter_id) : _state(state), - _be_exec_version(_state->be_exec_version()), + _be_exec_version(_state->be_exec_version), _pool(pool), _column_return_type(column_type), _filter_type(type), @@ -945,7 +945,7 @@ public: } private: - RuntimeState* _state; + RuntimeFilterParamsContext* _state; QueryContext* _query_ctx; int _be_exec_version; ObjectPool* _pool; @@ -962,9 +962,10 @@ private: uint32_t _filter_id; }; -Status IRuntimeFilter::create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, - const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res, bool build_bf_exactly) { +Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool, + const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, + const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, + bool build_bf_exactly) { *res = pool->add(new IRuntimeFilter(state, pool, desc)); (*res)->set_role(role); return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly); @@ -1003,7 +1004,7 @@ Status IRuntimeFilter::publish() { DCHECK(is_producer()); if (_has_local_target) { std::vector<IRuntimeFilter*> filters; - RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filters(_filter_id, filters)); + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); // push down for (auto filter : filters) { filter->_wrapper = _wrapper; @@ -1014,7 +1015,7 @@ Status IRuntimeFilter::publish() { } else { TNetworkAddress addr; DCHECK(_state != nullptr); - RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_merge_addr(&addr)); + RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); return push_to_remote(_state, &addr, _opt_remote_rf); } } @@ -1036,9 +1037,9 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr bool IRuntimeFilter::await() { DCHECK(is_consumer()); auto execution_timeout = _state == nullptr ? _query_ctx->execution_timeout() * 1000 - : _state->execution_timeout() * 1000; + : _state->execution_timeout * 1000; auto runtime_filter_wait_time_ms = _state == nullptr ? _query_ctx->runtime_filter_wait_time_ms() - : _state->runtime_filter_wait_time_ms(); + : _state->runtime_filter_wait_time_ms; // bitmap filter is precise filter and only filter once, so it must be applied. int64_t wait_times_ms = _wrapper->get_real_type() == RuntimeFilterType::BITMAP_FILTER ? execution_timeout @@ -1234,14 +1235,14 @@ Status IRuntimeFilter::serialize(PPublishFilterRequestV2* request, void** data, return serialize_impl(request, data, len); } -Status IRuntimeFilter::create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, - ObjectPool* pool, +Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, + const MergeRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper) { return _create_wrapper(state, param, pool, wrapper); } -Status IRuntimeFilter::create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param, - ObjectPool* pool, +Status IRuntimeFilter::create_wrapper(RuntimeFilterParamsContext* state, + const UpdateRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper) { return _create_wrapper(state, param, pool, wrapper); } @@ -1290,7 +1291,8 @@ Status IRuntimeFilter::init_bloom_filter(const size_t build_bf_cardinality) { } template <class T> -Status IRuntimeFilter::_create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool, +Status IRuntimeFilter::_create_wrapper(RuntimeFilterParamsContext* state, const T* param, + ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper) { int filter_type = param->request->filter_type(); PrimitiveType column_type = PrimitiveType::INVALID_TYPE; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 97078c11757..cc47d590e6b 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -185,7 +185,8 @@ enum RuntimeFilterState { /// that can be pushed down to node based on the results of the right table. class IRuntimeFilter { public: - IRuntimeFilter(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc) + IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool, + const TRuntimeFilterDesc* desc) : _state(state), _pool(pool), _filter_id(desc->filter_id), @@ -199,9 +200,9 @@ public: _always_true(false), _is_ignored(false), registration_time_(MonotonicMillis()), - _wait_infinitely(_state->runtime_filter_wait_infinitely()), - _rf_wait_time_ms(_state->runtime_filter_wait_time_ms()), - _enable_pipeline_exec(_state->enable_pipeline_exec()), + _wait_infinitely(_state->runtime_filter_wait_infinitely), + _rf_wait_time_ms(_state->runtime_filter_wait_time_ms), + _enable_pipeline_exec(_state->enable_pipeline_exec), _runtime_filter_type(get_runtime_filter_type(desc)), _name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id, to_string(_runtime_filter_type))), @@ -231,9 +232,10 @@ public: ~IRuntimeFilter() = default; - static Status create(RuntimeState* state, ObjectPool* pool, const TRuntimeFilterDesc* desc, - const TQueryOptions* query_options, const RuntimeFilterRole role, - int node_id, IRuntimeFilter** res, bool build_bf_exactly = false); + static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool, + const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, + const RuntimeFilterRole role, int node_id, IRuntimeFilter** res, + bool build_bf_exactly = false); static Status create(QueryContext* query_ctx, ObjectPool* pool, const TRuntimeFilterDesc* desc, const TQueryOptions* query_options, const RuntimeFilterRole role, @@ -299,11 +301,11 @@ public: Status merge_from(const RuntimePredicateWrapper* wrapper); // for ut - static Status create_wrapper(RuntimeState* state, const MergeRuntimeFilterParams* param, - ObjectPool* pool, + static Status create_wrapper(RuntimeFilterParamsContext* state, + const MergeRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper); - static Status create_wrapper(RuntimeState* state, const UpdateRuntimeFilterParams* param, - ObjectPool* pool, + static Status create_wrapper(RuntimeFilterParamsContext* state, + const UpdateRuntimeFilterParams* param, ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper); static Status create_wrapper(QueryContext* query_ctx, const UpdateRuntimeFilterParamsV2* param, ObjectPool* pool, @@ -325,7 +327,8 @@ public: Status join_rpc(); // async push runtimefilter to remote node - Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr, bool opt_remote_rf); + Status push_to_remote(RuntimeFilterParamsContext* state, const TNetworkAddress* addr, + bool opt_remote_rf); void init_profile(RuntimeProfile* parent_profile); @@ -367,7 +370,7 @@ public: int32_t wait_time_ms() const { int32_t res = 0; if (wait_infinitely()) { - res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout(); + res = _state == nullptr ? _query_ctx->execution_timeout() : _state->execution_timeout; // Convert to ms res *= 1000; } else { @@ -391,7 +394,8 @@ protected: Status serialize_impl(T* request, void** data, int* len); template <class T> - static Status _create_wrapper(RuntimeState* state, const T* param, ObjectPool* pool, + static Status _create_wrapper(RuntimeFilterParamsContext* state, const T* param, + ObjectPool* pool, std::unique_ptr<RuntimePredicateWrapper>* wrapper); void _set_push_down() { _is_push_down = true; } @@ -419,7 +423,7 @@ protected: } } - RuntimeState* _state = nullptr; + RuntimeFilterParamsContext* _state = nullptr; QueryContext* _query_ctx = nullptr; ObjectPool* _pool = nullptr; // _wrapper is a runtime filter function wrapper diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index 00540b8382c..a9aa7944625 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -48,12 +48,12 @@ struct IRuntimeFilter::RPCContext { static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; } }; -Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr, - bool opt_remote_rf) { +Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state, + const TNetworkAddress* addr, bool opt_remote_rf) { DCHECK(is_producer()); DCHECK(_rpc_context == nullptr); std::shared_ptr<PBackendService_Stub> stub( - state->exec_env()->brpc_internal_client_cache()->get_client(*addr)); + state->exec_env->brpc_internal_client_cache()->get_client(*addr)); if (!stub) { std::string msg = fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port); @@ -64,16 +64,16 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress int len = 0; auto pquery_id = _rpc_context->request.mutable_query_id(); - pquery_id->set_hi(_state->query_id().hi); - pquery_id->set_lo(_state->query_id().lo); + pquery_id->set_hi(_state->query_id.hi()); + pquery_id->set_lo(_state->query_id.lo()); auto pfragment_instance_id = _rpc_context->request.mutable_fragment_instance_id(); - pfragment_instance_id->set_hi(state->fragment_instance_id().hi); - pfragment_instance_id->set_lo(state->fragment_instance_id().lo); + pfragment_instance_id->set_hi(state->fragment_instance_id.hi()); + pfragment_instance_id->set_lo(state->fragment_instance_id.lo()); _rpc_context->request.set_filter_id(_filter_id); _rpc_context->request.set_opt_remote_rf(opt_remote_rf); - _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec()); + _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec); _rpc_context->cntl.set_timeout_ms(wait_time_ms()); _rpc_context->cid = _rpc_context->cntl.call_id(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index e95bef870a3..0800da22ad4 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -74,10 +74,15 @@ public: TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; } - virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) { + RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) { return _runtime_state.get(); } + virtual RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId /*fragment_instance_id*/) { + return _runtime_state->runtime_filter_mgr(); + } + + QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); } // should be protected by lock? [[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ac19c92ff55..724d50f55e4 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -100,11 +100,6 @@ namespace doris::pipeline { -#define FOR_EACH_RUNTIME_STATE(stmt) \ - for (auto& runtime_state : _runtime_states) { \ - stmt \ - } - PipelineXFragmentContext::PipelineXFragmentContext( const TUniqueId& query_id, const int fragment_id, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back, @@ -114,10 +109,13 @@ PipelineXFragmentContext::PipelineXFragmentContext( PipelineXFragmentContext::~PipelineXFragmentContext() { auto st = _query_ctx->exec_status(); - if (!_runtime_states.empty()) { + if (!_task_runtime_states.empty()) { // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); - FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &st); runtime_state.reset();) + for (auto& runtime_state : _task_runtime_states) { + _call_back(runtime_state.get(), &st); + runtime_state.reset(); + } } else { _call_back(nullptr, &st); } @@ -136,8 +134,9 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } if (_query_ctx->cancel(true, msg, Status::Cancelled(msg), _fragment_id)) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { - FOR_EACH_RUNTIME_STATE(LOG(WARNING) << "PipelineXFragmentContext cancel instance: " - << print_id(runtime_state->fragment_instance_id());) + for (auto& id : _fragment_instance_ids) { + LOG(WARNING) << "PipelineXFragmentContext cancel instance: " << print_id(id); + } } else { _set_is_report_on_cancel(false); // TODO bug llj } @@ -229,7 +228,10 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast<void>(root_pipeline->set_sink(_sink)); - RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx)); + if (_enable_local_shuffle()) { + RETURN_IF_ERROR( + _plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx)); + } // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { @@ -451,43 +453,84 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( const doris::TPipelineFragmentParams& request) { _total_tasks = 0; int target_size = request.local_params.size(); - _runtime_states.resize(target_size); _tasks.resize(target_size); + auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); + DCHECK(pipeline_id_to_profile.empty()); + pipeline_id_to_profile.resize(_pipelines.size()); + { + size_t pip_idx = 0; + for (auto& pipeline_profile : pipeline_id_to_profile) { + pipeline_profile = + std::make_unique<RuntimeProfile>("Pipeline : " + std::to_string(pip_idx)); + pip_idx++; + } + } + for (size_t i = 0; i < target_size; i++) { const auto& local_params = request.local_params[i]; + auto fragment_instance_id = local_params.fragment_instance_id; + _fragment_instance_ids.push_back(fragment_instance_id); + std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr; + auto set_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) { + runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); - _runtime_states[i] = RuntimeState::create_unique( - local_params.fragment_instance_id, request.query_id, request.fragment_id, - request.query_options, _query_ctx->query_globals, _exec_env); - if (local_params.__isset.runtime_filter_params) { - _runtime_states[i]->set_runtime_filter_params(local_params.runtime_filter_params); - } - _runtime_states[i]->set_query_ctx(_query_ctx.get()); - _runtime_states[i]->set_query_mem_tracker(_query_ctx->query_mem_tracker); + runtime_state->set_query_ctx(_query_ctx.get()); + runtime_state->set_be_number(local_params.backend_num); - static_cast<void>(_runtime_states[i]->runtime_filter_mgr()->init()); - _runtime_states[i]->set_be_number(local_params.backend_num); + if (request.__isset.backend_id) { + runtime_state->set_backend_id(request.backend_id); + } + if (request.__isset.import_label) { + runtime_state->set_import_label(request.import_label); + } + if (request.__isset.db_name) { + runtime_state->set_db_name(request.db_name); + } + if (request.__isset.load_job_id) { + runtime_state->set_load_job_id(request.load_job_id); + } - if (request.__isset.backend_id) { - _runtime_states[i]->set_backend_id(request.backend_id); - } - if (request.__isset.import_label) { - _runtime_states[i]->set_import_label(request.import_label); - } - if (request.__isset.db_name) { - _runtime_states[i]->set_db_name(request.db_name); + runtime_state->set_desc_tbl(_desc_tbl); + runtime_state->set_per_fragment_instance_idx(local_params.sender_id); + runtime_state->set_num_per_fragment_instances(request.num_senders); + runtime_state->resize_op_id_to_local_state(max_operator_id(), max_sink_operator_id()); + runtime_state->set_load_stream_per_node(request.load_stream_per_node); + runtime_state->set_total_load_streams(request.total_load_streams); + runtime_state->set_num_local_sink(request.num_local_sink); + DCHECK(runtime_filter_mgr); + runtime_state->set_pipeline_x_runtime_filter_mgr(runtime_filter_mgr.get()); + }; + + auto filterparams = std::make_unique<RuntimeFilterParamsContext>(); + + { + filterparams->runtime_filter_wait_infinitely = + _runtime_state->runtime_filter_wait_infinitely(); + filterparams->runtime_filter_wait_time_ms = + _runtime_state->runtime_filter_wait_time_ms(); + filterparams->enable_pipeline_exec = _runtime_state->enable_pipeline_exec(); + filterparams->execution_timeout = _runtime_state->execution_timeout(); + + filterparams->exec_env = ExecEnv::GetInstance(); + filterparams->query_id.set_hi(_runtime_state->query_id().hi); + filterparams->query_id.set_lo(_runtime_state->query_id().lo); + + filterparams->fragment_instance_id.set_hi(fragment_instance_id.hi); + filterparams->fragment_instance_id.set_lo(fragment_instance_id.lo); + filterparams->be_exec_version = _runtime_state->be_exec_version(); + filterparams->query_ctx = _query_ctx.get(); } - if (request.__isset.load_job_id) { - _runtime_states[i]->set_load_job_id(request.load_job_id); + + // build runtime_filter_mgr for each instance + runtime_filter_mgr = + std::make_unique<RuntimeFilterMgr>(request.query_id, filterparams.get()); + if (local_params.__isset.runtime_filter_params) { + runtime_filter_mgr->set_runtime_filter_params(local_params.runtime_filter_params); } + RETURN_IF_ERROR(runtime_filter_mgr->init()); + filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); - _runtime_states[i]->set_desc_tbl(_desc_tbl); - _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); - _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); - _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(), max_sink_operator_id()); - _runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node); - _runtime_states[i]->set_total_load_streams(request.total_load_streams); - _runtime_states[i]->set_num_local_sink(request.num_local_sink); + _runtime_filter_states.push_back(std::move(filterparams)); std::map<PipelineId, PipelineXTask*> pipeline_id_to_task; auto get_local_exchange_state = [&](PipelinePtr pipeline) -> std::map<int, std::shared_ptr<LocalExchangeSharedState>> { @@ -504,12 +547,25 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } return le_state_map; }; - for (auto& pipeline : _pipelines) { + auto get_task_runtime_state = [&](int task_id) -> RuntimeState* { + DCHECK(_task_runtime_states[task_id]); + return _task_runtime_states[task_id].get(); + }; + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { + auto& pipeline = _pipelines[pip_idx]; if (pipeline->need_to_create_task()) { - auto task = std::make_unique<PipelineXTask>(pipeline, _total_tasks++, - _runtime_states[i].get(), this, - _runtime_states[i]->runtime_profile(), - get_local_exchange_state(pipeline), i); + // build task runtime state + _task_runtime_states.push_back(RuntimeState::create_unique( + this, local_params.fragment_instance_id, request.query_id, + request.fragment_id, request.query_options, _query_ctx->query_globals, + _exec_env)); + auto& task_runtime_state = _task_runtime_states.back(); + set_runtime_state(task_runtime_state); + auto cur_task_id = _total_tasks++; + auto task = std::make_unique<PipelineXTask>( + pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, + pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), + i); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } @@ -533,32 +589,18 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( * and JoinProbeOperator2. */ - // First, set up the parent profile, - // then prepare the task profile and add it to operator_id_to_task_profile. - std::vector<RuntimeProfile*> operator_id_to_task_profile( - max_operator_id(), _runtime_states[i]->runtime_profile()); - auto prepare_and_set_parent_profile = [&](PipelineXTask* task) { - auto sink = task->sink(); - const auto& dests_id = sink->dests_id(); - int dest_id = dests_id.front(); - DCHECK(dest_id < operator_id_to_task_profile.size()); - task->set_parent_profile(operator_id_to_task_profile[dest_id]); - - RETURN_IF_ERROR(task->prepare(_runtime_states[i].get(), local_params, - request.fragment.output_sink)); + // First, set up the parent profile,task runtime state - for (auto o : task->operatorXs()) { - int id = o->operator_id(); - DCHECK(id < operator_id_to_task_profile.size()); - auto* op_local_state = _runtime_states[i].get()->get_local_state(o->operator_id()); - operator_id_to_task_profile[id] = op_local_state->profile(); - } + auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) { + DCHECK(pipeline_id_to_profile[pip_idx]); + RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params, + request.fragment.output_sink)); return Status::OK(); }; for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) { - auto task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; + auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; DCHECK(task != nullptr); // if this task has upstream dependency, then record them. @@ -571,15 +613,12 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( } } } - RETURN_IF_ERROR(prepare_and_set_parent_profile(task)); + RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx)); } } - { std::lock_guard<std::mutex> l(_state_map_lock); - _instance_id_to_runtime_state.insert( - {UniqueId(_runtime_states[i]->fragment_instance_id()), - _runtime_states[i].get()}); + _runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr); } } _pipeline_parent_map.clear(); @@ -692,7 +731,8 @@ Status PipelineXFragmentContext::_add_local_exchange( int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, const std::vector<TExpr>& texprs, ExchangeType exchange_type, bool* do_local_exchange, int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) { - if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) { + DCHECK(_enable_local_shuffle()); + if (_num_instances <= 1) { return Status::OK(); } @@ -1144,17 +1184,17 @@ Status PipelineXFragmentContext::submit() { } void PipelineXFragmentContext::close_sink() { - FOR_EACH_RUNTIME_STATE(static_cast<void>(_sink->close( - runtime_state.get(), - _prepared ? Status::RuntimeError("prepare failed") : Status::OK()));); + for (auto& tasks : _tasks) { + auto& root_task = *tasks.begin(); + auto st = root_task->close_sink(_prepared ? Status::RuntimeError("prepare failed") + : Status::OK()); + if (!st.ok()) { + LOG_WARNING("PipelineXFragmentContext::close_sink() error").tag("msg", st.msg()); + } + } } void PipelineXFragmentContext::close_if_prepare_failed() { - if (_tasks.empty()) { - FOR_EACH_RUNTIME_STATE( - static_cast<void>(_root_op->close(runtime_state.get())); static_cast<void>( - _sink->close(runtime_state.get(), Status::RuntimeError("prepare failed")));) - } for (auto& task : _tasks) { for (auto& t : task) { DCHECK(!t->is_pending_finish()); @@ -1196,15 +1236,15 @@ Status PipelineXFragmentContext::send_report(bool done) { return Status::NeedSendAgain(""); } - std::vector<RuntimeState*> runtime_states(_runtime_states.size()); - for (size_t i = 0; i < _runtime_states.size(); i++) { - runtime_states[i] = _runtime_states[i].get(); - } + std::vector<RuntimeState*> runtime_states; + for (auto& task_state : _task_runtime_states) { + runtime_states.push_back(task_state.get()); + } return _report_status_cb( - {true, exec_status, runtime_states, nullptr, nullptr, done || !exec_status.ok(), - _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, - _runtime_state.get(), + {true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(), + done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, + TUniqueId(), _backend_num, _runtime_state.get(), std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, std::placeholders::_2), diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index a95a90e356d..326f1f84254 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -19,6 +19,7 @@ #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> +#include <glog/logging.h> #include <stddef.h> #include <stdint.h> @@ -69,16 +70,16 @@ public: ~PipelineXFragmentContext() override; void instance_ids(std::vector<TUniqueId>& ins_ids) const override { - ins_ids.resize(_runtime_states.size()); - for (size_t i = 0; i < _runtime_states.size(); i++) { - ins_ids[i] = _runtime_states[i]->fragment_instance_id(); + ins_ids.resize(_fragment_instance_ids.size()); + for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { + ins_ids[i] = _fragment_instance_ids[i]; } } void instance_ids(std::vector<string>& ins_ids) const override { - ins_ids.resize(_runtime_states.size()); - for (size_t i = 0; i < _runtime_states.size(); i++) { - ins_ids[i] = print_id(_runtime_states[i]->fragment_instance_id()); + ins_ids.resize(_fragment_instance_ids.size()); + for (size_t i = 0; i < _fragment_instance_ids.size(); i++) { + ins_ids[i] = print_id(_fragment_instance_ids[i]); } } @@ -102,13 +103,9 @@ public: Status send_report(bool) override; - RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override { - std::lock_guard<std::mutex> l(_state_map_lock); - if (_instance_id_to_runtime_state.contains(fragment_instance_id)) { - return _instance_id_to_runtime_state[fragment_instance_id]; - } else { - return _runtime_state.get(); - } + RuntimeFilterMgr* get_runtime_filter_mgr(UniqueId fragment_instance_id) override { + DCHECK(_runtime_filter_mgr_map.contains(fragment_instance_id)); + return _runtime_filter_mgr_map[fragment_instance_id].get(); } [[nodiscard]] int next_operator_id() { return _operator_id++; } @@ -162,13 +159,12 @@ private: bool _has_inverted_index_or_partial_update(TOlapTableSink sink); + bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); } + OperatorXPtr _root_op = nullptr; // this is a [n * m] matrix. n is parallelism of pipeline engine and m is the number of pipelines. std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks; - // Local runtime states for each pipeline task. - std::vector<std::unique_ptr<RuntimeState>> _runtime_states; - // It is used to manage the lifecycle of RuntimeFilterMergeController std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers; @@ -219,6 +215,19 @@ private: int _operator_id = 0; int _sink_operator_id = 0; std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> _op_id_to_le_state; + + // UniqueId -> runtime mgr + std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map; + + //Here are two types of runtime states: + // - _runtime state is at the Fragment level. + // - _task_runtime_states is at the task level, unique to each task. + + std::vector<TUniqueId> _fragment_instance_ids; + // Local runtime states for each task + std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states; + + std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states; }; } // namespace pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index cfe859b2e7c..5f07b79bebf 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -73,10 +73,11 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); + DCHECK_EQ(state, _state); { // set sink local state - LocalSinkStateInfo info {_parent_profile, local_params.sender_id, + LocalSinkStateInfo info {_task_profile.get(), local_params.sender_id, get_downstream_dependency(), _le_state_map, tsink}; RETURN_IF_ERROR(_sink->setup_local_state(state, info)); } @@ -84,7 +85,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams std::vector<TScanRangeParams> no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); - auto* parent_profile = _parent_profile; + auto* parent_profile = _task_profile.get(); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); @@ -330,6 +331,10 @@ Status PipelineXTask::close(Status exec_status) { return s; } +Status PipelineXTask::close_sink(Status exec_status) { + return _sink->close(_state, exec_status); +} + std::string PipelineXTask::debug_string() { std::unique_lock<std::mutex> lc(_release_lock); fmt::memory_buffer debug_string_buffer; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 261ecb54f24..0bb3e16fc98 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -71,6 +71,7 @@ public: // must be call after all pipeline task is finish to release resource Status close(Status exec_status) override; + Status close_sink(Status exec_status); bool source_can_read() override { if (_dry_run) { return true; @@ -123,6 +124,8 @@ public: OperatorXs operatorXs() { return _operators; } + int task_id() const { return _index; }; + void clear_blocking_state() { if (!is_final_state(get_state()) && get_state() != PipelineTaskState::PENDING_FINISH && _blocked_dep) { diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 57074bc629c..3c795273a77 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -236,32 +236,44 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } if (req.is_pipeline_x) { params.__isset.detailed_report = true; - for (auto* rs : req.runtime_states) { - TDetailedReportParams detailed_param; - detailed_param.__set_fragment_instance_id(rs->fragment_instance_id()); - detailed_param.__isset.fragment_instance_id = true; - - if (rs->enable_profile()) { - detailed_param.__isset.profile = true; - detailed_param.__isset.loadChannelProfile = true; - - rs->runtime_profile()->to_thrift(&detailed_param.profile); + DCHECK(!req.runtime_states.empty()); + const bool enable_profile = (*req.runtime_states.begin())->enable_profile(); + if (enable_profile) { + params.__isset.profile = true; + params.__isset.loadChannelProfile = false; + for (auto* rs : req.runtime_states) { + DCHECK(req.load_channel_profile); + TDetailedReportParams detailed_param; rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile); + // merge all runtime_states.loadChannelProfile to req.load_channel_profile + req.load_channel_profile->update(detailed_param.loadChannelProfile); } - - params.detailed_report.push_back(detailed_param); + req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); + } else { + params.__isset.profile = false; } - } - if (req.profile != nullptr) { - req.profile->to_thrift(¶ms.profile); - if (req.load_channel_profile) { - req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); + if (enable_profile) { + for (auto& pipeline_profile : req.runtime_state->pipeline_id_to_profile()) { + TDetailedReportParams detailed_param; + detailed_param.__isset.fragment_instance_id = false; + detailed_param.__isset.profile = true; + detailed_param.__isset.loadChannelProfile = false; + pipeline_profile->to_thrift(&detailed_param.profile); + params.detailed_report.push_back(detailed_param); + } } - params.__isset.profile = true; - params.__isset.loadChannelProfile = true; } else { - params.__isset.profile = false; + if (req.profile != nullptr) { + req.profile->to_thrift(¶ms.profile); + if (req.load_channel_profile) { + req.load_channel_profile->to_thrift(¶ms.loadChannelProfile); + } + params.__isset.profile = true; + params.__isset.loadChannelProfile = true; + } else { + params.__isset.profile = false; + } } if (!req.runtime_state->output_files().empty()) { @@ -770,8 +782,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, g_fragmentmgr_prepare_latency << (duration_ns / 1000); std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; // TODO need check the status, but when I add return_if_error the P0 will not pass - static_cast<void>(_runtimefilter_controller.add_entity(params, &handler, - fragment_executor->runtime_state())); + static_cast<void>(_runtimefilter_controller.add_entity( + params, &handler, + RuntimeFilterParamsContext::create(fragment_executor->runtime_state()))); fragment_executor->set_merge_controller_handler(handler); { std::lock_guard<std::mutex> lock(_lock); @@ -852,9 +865,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, for (size_t i = 0; i < params.local_params.size(); i++) { std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; - static_cast<void>( - _runtimefilter_controller.add_entity(params, params.local_params[i], &handler, - context->get_runtime_state(UniqueId()))); + static_cast<void>(_runtimefilter_controller.add_entity( + params, params.local_params[i], &handler, + RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); context->set_merge_controller_handler(handler); const TUniqueId& fragment_instance_id = params.local_params[i].fragment_instance_id; { @@ -933,7 +946,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; static_cast<void>(_runtimefilter_controller.add_entity( - params, local_params, &handler, context->get_runtime_state(UniqueId()))); + params, local_params, &handler, + RuntimeFilterParamsContext::create(context->get_runtime_state(UniqueId())))); context->set_merge_controller_handler(handler); { @@ -1010,9 +1024,15 @@ void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id, LOG(WARNING) << "Query " << print_id(query_id) << " does not exists, failed to cancel it"; return; } + if (ctx->second->enable_pipeline_x_exec()) { + for (auto& [f_id, f_context] : ctx->second->fragment_id_to_pipeline_ctx) { + cancel_fragment_unlocked(query_id, f_id, reason, state_lock, msg); + } - for (auto it : ctx->second->fragment_instance_ids) { - cancel_instance_unlocked(it, reason, state_lock, msg); + } else { + for (auto it : ctx->second->fragment_instance_ids) { + cancel_instance_unlocked(it, reason, state_lock, msg); + } } ctx->second->cancel(true, msg, Status::Cancelled(msg)); @@ -1054,23 +1074,50 @@ void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, } } +void FragmentMgr::cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, + const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::unique_lock<std::mutex> state_lock(_lock); + return cancel_fragment_unlocked(query_id, fragment_id, reason, state_lock, msg); +} + +void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& query_id, int32_t fragment_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg) { + auto q_ctx = _query_ctx_map.find(query_id)->second; + auto f_context = q_ctx->fragment_id_to_pipeline_ctx.find(fragment_id); + if (f_context != q_ctx->fragment_id_to_pipeline_ctx.end()) { + f_context->second->cancel(reason, msg); + } else { + LOG(WARNING) << "Could not find the pipeline query id:" << print_id(query_id) + << " fragment id:" << fragment_id << " to cancel"; + } +} + bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) { std::lock_guard<std::mutex> lock(_lock); auto ctx = _query_ctx_map.find(query_id); if (ctx != _query_ctx_map.end()) { const bool is_pipeline_version = ctx->second->enable_pipeline_exec(); - for (auto itr : ctx->second->fragment_instance_ids) { - if (is_pipeline_version) { - auto pipeline_ctx_iter = _pipeline_map.find(itr); - if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) { - return pipeline_ctx_iter->second->is_canceled(); - } - } else { - auto fragment_instance_itr = _fragment_instance_map.find(itr); - if (fragment_instance_itr != _fragment_instance_map.end() && - fragment_instance_itr->second) { - return fragment_instance_itr->second->is_canceled(); + const bool is_pipeline_x = ctx->second->enable_pipeline_x_exec(); + if (is_pipeline_x) { + for (auto& [id, f_context] : ctx->second->fragment_id_to_pipeline_ctx) { + return f_context->is_canceled(); + } + } else { + for (auto itr : ctx->second->fragment_instance_ids) { + if (is_pipeline_version) { + auto pipeline_ctx_iter = _pipeline_map.find(itr); + if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) { + return pipeline_ctx_iter->second->is_canceled(); + } + } else { + auto fragment_instance_itr = _fragment_instance_map.find(itr); + if (fragment_instance_itr != _fragment_instance_map.end() && + fragment_instance_itr->second) { + return fragment_instance_itr->second->is_canceled(); + } } } } @@ -1306,8 +1353,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, pip_context = iter->second; DCHECK(pip_context != nullptr); - runtime_filter_mgr = - pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr(); + runtime_filter_mgr = pip_context->get_runtime_filter_mgr(fragment_instance_id); } else { std::unique_lock<std::mutex> lock(_lock); auto iter = _fragment_instance_map.find(tfragment_instance_id); @@ -1349,9 +1395,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, pip_context = iter->second; DCHECK(pip_context != nullptr); - runtime_filter_mgr = pip_context->get_runtime_state(fragment_instance_id) - ->get_query_ctx() - ->runtime_filter_mgr(); + runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); pool = &pip_context->get_query_context()->obj_pool; } else { std::unique_lock<std::mutex> lock(_lock); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index a20da9387af..21d85503803 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -102,6 +102,14 @@ public: const PPlanFragmentCancelReason& reason, const std::unique_lock<std::mutex>& state_lock, const std::string& msg = ""); + // Cancel fragment (only pipelineX). + // {query id fragment} -> PipelineXFragmentContext + void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, + const PPlanFragmentCancelReason& reason, const std::string& msg = ""); + void cancel_fragment_unlocked(const TUniqueId& query_id, int32_t fragment_id, + const PPlanFragmentCancelReason& reason, + const std::unique_lock<std::mutex>& state_lock, + const std::string& msg = ""); // Can be used in both version. void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 0e3a04f8998..6d392c56175 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -172,6 +172,11 @@ public: _query_options.enable_pipeline_engine; } + bool enable_pipeline_x_exec() const { + return _query_options.__isset.enable_pipeline_x_engine && + _query_options.enable_pipeline_x_engine; + } + int be_exec_version() const { if (!_query_options.__isset.be_exec_version) { return 0; diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index bca128c652a..a2120c92389 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -51,7 +51,10 @@ struct AsyncRPCContext { brpc::CallId cid; }; -RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state) : _state(state) {} +RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state) { + _state = state; + _state->runtime_filter_mgr = this; +} RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx) : _query_ctx(query_ctx) {} @@ -133,8 +136,6 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc build_bf_exactly)); _consumer_map[key].emplace_back(node_id, filter); } else { - DCHECK(_state != nullptr); - if (iter != _consumer_map.end()) { for (auto holder : iter->second) { if (holder.node_id == node_id) { @@ -475,7 +476,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ Status RuntimeFilterMergeController::add_entity( const TExecPlanFragmentParams& params, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, RuntimeState* state) { + std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, + RuntimeFilterParamsContext* state) { if (!params.params.__isset.runtime_filter_params || params.params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { return Status::OK(); @@ -506,7 +508,8 @@ Status RuntimeFilterMergeController::add_entity( Status RuntimeFilterMergeController::add_entity( const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, - std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, RuntimeState* state) { + std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, + RuntimeFilterParamsContext* state) { if (!local_params.__isset.runtime_filter_params || local_params.runtime_filter_params.rid_to_runtime_filter.size() == 0) { return Status::OK(); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 939ee2c8139..5f9ee46d656 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -50,6 +50,7 @@ class RuntimeState; enum class RuntimeFilterRole; class RuntimePredicateWrapper; class QueryContext; +struct RuntimeFilterParamsContext; /// producer: /// Filter filter; @@ -65,7 +66,7 @@ class QueryContext; // RuntimeFilterMgr will be destroyed when RuntimeState is destroyed class RuntimeFilterMgr { public: - RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state); + RuntimeFilterMgr(const UniqueId& query_id, RuntimeFilterParamsContext* state); RuntimeFilterMgr(const UniqueId& query_id, QueryContext* query_ctx); @@ -106,7 +107,7 @@ private: std::map<int32_t, std::vector<ConsumerFilterHolder>> _consumer_map; std::map<int32_t, IRuntimeFilter*> _producer_map; - RuntimeState* _state = nullptr; + RuntimeFilterParamsContext* _state = nullptr; QueryContext* _query_ctx = nullptr; std::unique_ptr<MemTracker> _tracker; ObjectPool _pool; @@ -123,7 +124,7 @@ private: // the class is destroyed with the last fragment_exec. class RuntimeFilterMergeControllerEntity { public: - RuntimeFilterMergeControllerEntity(RuntimeState* state) + RuntimeFilterMergeControllerEntity(RuntimeFilterParamsContext* state) : _query_id(0, 0), _fragment_instance_id(0, 0), _state(state) {} ~RuntimeFilterMergeControllerEntity() = default; @@ -172,7 +173,7 @@ private: using CntlValwithLock = std::pair<std::shared_ptr<RuntimeFilterCntlVal>, std::unique_ptr<std::mutex>>; std::map<int, CntlValwithLock> _filter_map; - RuntimeState* _state = nullptr; + RuntimeFilterParamsContext* _state = nullptr; bool _opt_remote_rf = true; }; @@ -188,11 +189,11 @@ public: // add_entity will return a exists entity Status add_entity(const TExecPlanFragmentParams& params, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeState* state); + RuntimeFilterParamsContext* state); Status add_entity(const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, std::shared_ptr<RuntimeFilterMergeControllerEntity>* handle, - RuntimeState* state); + RuntimeFilterParamsContext* state); // thread safe // increase a reference count // if a query-id is not exist diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 43eff466019..9082a5a322d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -54,7 +54,6 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, : _profile("Fragment " + print_id(fragment_instance_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), - _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _is_cancelled(false), @@ -71,6 +70,8 @@ RuntimeState::RuntimeState(const TUniqueId& fragment_instance_id, _error_log_file(nullptr) { Status status = init(fragment_instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); + _runtime_filter_mgr.reset( + new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this))); } RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, @@ -79,7 +80,6 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, : _profile("Fragment " + print_id(fragment_exec_params.fragment_instance_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), - _runtime_filter_mgr(new RuntimeFilterMgr(fragment_exec_params.query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(fragment_exec_params.query_id), @@ -94,12 +94,14 @@ RuntimeState::RuntimeState(const TPlanFragmentExecParams& fragment_exec_params, _normal_row_number(0), _error_row_number(0), _error_log_file(nullptr) { - if (fragment_exec_params.__isset.runtime_filter_params) { - _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params); - } Status status = init(fragment_exec_params.fragment_instance_id, query_options, query_globals, exec_env); DCHECK(status.ok()); + _runtime_filter_mgr.reset(new RuntimeFilterMgr(fragment_exec_params.query_id, + RuntimeFilterParamsContext::create(this))); + if (fragment_exec_params.__isset.runtime_filter_params) { + _runtime_filter_mgr->set_runtime_filter_params(fragment_exec_params.runtime_filter_params); + } } RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_id, @@ -108,7 +110,36 @@ RuntimeState::RuntimeState(const TUniqueId& instance_id, const TUniqueId& query_ : _profile("Fragment " + print_id(instance_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), - _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)), + _data_stream_recvrs_pool(new ObjectPool()), + _unreported_error_idx(0), + _query_id(query_id), + _fragment_id(fragment_id), + _is_cancelled(false), + _per_fragment_instance_idx(0), + _num_rows_load_total(0), + _num_rows_load_filtered(0), + _num_rows_load_unselected(0), + _num_rows_filtered_in_strict_mode_partial_update(0), + _num_print_error_rows(0), + _num_bytes_load_total(0), + _num_finished_scan_range(0), + _normal_row_number(0), + _error_row_number(0), + _error_log_file(nullptr) { + [[maybe_unused]] auto status = init(instance_id, query_options, query_globals, exec_env); + DCHECK(status.ok()); + _runtime_filter_mgr.reset( + new RuntimeFilterMgr(query_id, RuntimeFilterParamsContext::create(this))); +} + +RuntimeState::RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& instance_id, + const TUniqueId& query_id, int32_t fragment_id, + const TQueryOptions& query_options, const TQueryGlobals& query_globals, + ExecEnv* exec_env) + : _profile("Fragment " + print_id(instance_id)), + _load_channel_profile("<unnamed>"), + _obj_pool(new ObjectPool()), + _runtime_filter_mgr(nullptr), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(query_id), @@ -135,7 +166,6 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, : _profile("PipelineX " + std::to_string(fragment_id)), _load_channel_profile("<unnamed>"), _obj_pool(new ObjectPool()), - _runtime_filter_mgr(new RuntimeFilterMgr(query_id, this)), _data_stream_recvrs_pool(new ObjectPool()), _unreported_error_idx(0), _query_id(query_id), @@ -155,6 +185,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id, // TODO: do we really need instance id? Status status = init(TUniqueId(), query_options, query_globals, exec_env); DCHECK(status.ok()); + _runtime_filter_mgr.reset( + new RuntimeFilterMgr(query_id, RuntimeFilterParamsContext::create(this))); } RuntimeState::RuntimeState(const TQueryGlobals& query_globals) @@ -485,4 +517,21 @@ bool RuntimeState::enable_page_cache() const { (_query_options.__isset.enable_page_cache && _query_options.enable_page_cache); } +RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) { + RuntimeFilterParamsContext* params = state->obj_pool()->add(new RuntimeFilterParamsContext()); + params->runtime_filter_wait_infinitely = state->runtime_filter_wait_infinitely(); + params->runtime_filter_wait_time_ms = state->runtime_filter_wait_time_ms(); + params->enable_pipeline_exec = state->enable_pipeline_exec(); + params->execution_timeout = state->execution_timeout(); + params->runtime_filter_mgr = state->runtime_filter_mgr(); + params->exec_env = state->exec_env(); + params->query_id.set_hi(state->query_id().hi); + params->query_id.set_lo(state->query_id().lo); + + params->fragment_instance_id.set_hi(state->fragment_instance_id().hi); + params->fragment_instance_id.set_lo(state->fragment_instance_id().lo); + params->be_exec_version = state->be_exec_version(); + params->query_ctx = state->get_query_ctx(); + return params; +} } // end namespace doris diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index e37883abbe1..e064e6e7610 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -47,6 +47,7 @@ namespace doris { namespace pipeline { class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; +class PipelineXFragmentContext; } // namespace pipeline class DescriptorTbl; @@ -74,6 +75,11 @@ public: const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); + // for only use in pipelineX + RuntimeState(pipeline::PipelineXFragmentContext*, const TUniqueId& instance_id, + const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, + const TQueryGlobals& query_globals, ExecEnv* exec_env); + // Used by pipelineX. This runtime state is only used for setup. RuntimeState(const TUniqueId& query_id, int32 fragment_id, const TQueryOptions& query_options, const TQueryGlobals& query_globals, ExecEnv* exec_env); @@ -437,7 +443,17 @@ public: // if load mem limit is not set, or is zero, using query mem limit instead. int64_t get_load_mem_limit(); - RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } + RuntimeFilterMgr* runtime_filter_mgr() { + if (_pipeline_x_runtime_filter_mgr) { + return _pipeline_x_runtime_filter_mgr; + } else { + return _runtime_filter_mgr.get(); + } + } + + void set_pipeline_x_runtime_filter_mgr(RuntimeFilterMgr* pipeline_x_runtime_filter_mgr) { + _pipeline_x_runtime_filter_mgr = pipeline_x_runtime_filter_mgr; + } void set_query_ctx(QueryContext* ctx) { _query_ctx = ctx; } @@ -513,6 +529,8 @@ public: void resize_op_id_to_local_state(int operator_size, int sink_size); + auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; } + private: Status create_error_log_file(); @@ -531,6 +549,9 @@ private: // runtime filter std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; + // owned by PipelineXFragmentContext + RuntimeFilterMgr* _pipeline_x_runtime_filter_mgr = nullptr; + // Protects _data_stream_recvrs_pool std::mutex _data_stream_recvrs_lock; @@ -623,10 +644,30 @@ private: // true if max_filter_ratio is 0 bool _load_zero_tolerance = false; + std::vector<std::unique_ptr<RuntimeProfile>> _pipeline_id_to_profile; + // prohibit copies RuntimeState(const RuntimeState&); }; +// from runtime state +struct RuntimeFilterParamsContext { + RuntimeFilterParamsContext() = default; + static RuntimeFilterParamsContext* create(RuntimeState* state); + + bool runtime_filter_wait_infinitely; + int32_t runtime_filter_wait_time_ms; + bool enable_pipeline_exec; + int32_t execution_timeout; + RuntimeFilterMgr* runtime_filter_mgr; + ExecEnv* exec_env; + PUniqueId query_id; + PUniqueId fragment_instance_id; + int be_exec_version; + QueryContext* query_ctx; + QueryContext* get_query_ctx() const { return query_ctx; } +}; + #define RETURN_IF_CANCELLED(state) \ do { \ if (UNLIKELY((state)->is_cancelled())) return Status::Cancelled("Cancelled"); \ diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 987a2106894..24152c67089 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -593,10 +593,19 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* has_cancel_reason ? PPlanFragmentCancelReason_Name(request->cancel_reason()) : "INTERNAL_ERROR"); - - _exec_env->fragment_mgr()->cancel_instance( - tid, has_cancel_reason ? request->cancel_reason() - : PPlanFragmentCancelReason::INTERNAL_ERROR); + if (request->has_fragment_id()) { + TUniqueId query_id; + query_id.__set_hi(request->query_id().hi()); + query_id.__set_lo(request->query_id().lo()); + _exec_env->fragment_mgr()->cancel_fragment( + query_id, request->fragment_id(), + has_cancel_reason ? request->cancel_reason() + : PPlanFragmentCancelReason::INTERNAL_ERROR); + } else { + _exec_env->fragment_mgr()->cancel_instance( + tid, has_cancel_reason ? request->cancel_reason() + : PPlanFragmentCancelReason::INTERNAL_ERROR); + } // TODO: the logic seems useless, cancel only return Status::OK. remove it st.to_protobuf(result->mutable_status()); diff --git a/be/test/exprs/runtime_filter_test.cpp b/be/test/exprs/runtime_filter_test.cpp index b8cce3fbf7d..9739c3930ef 100644 --- a/be/test/exprs/runtime_filter_test.cpp +++ b/be/test/exprs/runtime_filter_test.cpp @@ -103,8 +103,9 @@ IRuntimeFilter* create_runtime_filter(TRuntimeFilterType::type type, TQueryOptio } IRuntimeFilter* runtime_filter = nullptr; - Status status = IRuntimeFilter::create(_runtime_stat, _obj_pool, &desc, options, - RuntimeFilterRole::PRODUCER, -1, &runtime_filter); + Status status = IRuntimeFilter::create(RuntimeFilterParamsContext::create(_runtime_stat), + _obj_pool, &desc, options, RuntimeFilterRole::PRODUCER, + -1, &runtime_filter); EXPECT_TRUE(status.ok()) << status.to_string(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index 465bb977226..ecee299f104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -63,9 +63,14 @@ public class ExecutionProfile { // Profile for load channels. Only for load job. private RuntimeProfile loadChannelProfile; // A countdown latch to mark the completion of each instance. + // use for old pipeline // instance id -> dummy value private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal; + // A countdown latch to mark the completion of each fragment. use for pipelineX + // fragmentId -> dummy value + private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal; + public ExecutionProfile(TUniqueId queryId, int fragmentNum) { executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); @@ -79,7 +84,35 @@ public class ExecutionProfile { executionProfile.addChild(loadChannelProfile); } - public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { + private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) { + RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); + for (int i = 0; i < fragmentProfiles.size(); ++i) { + RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i); + RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i); + fragmentsProfile.addChild(newFragmentProfile); + List<RuntimeProfile> allPipelines = new ArrayList<RuntimeProfile>(); + for (Pair<RuntimeProfile, Boolean> runtimeProfile : oldFragmentProfile.getChildList()) { + allPipelines.add(runtimeProfile.first); + } + int pipelineIdx = 0; + for (RuntimeProfile pipeline : allPipelines) { + List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>(); + for (Pair<RuntimeProfile, Boolean> runtimeProfile : pipeline.getChildList()) { + allPipelineTask.add(runtimeProfile.first); + } + RuntimeProfile mergedpipelineProfile = new RuntimeProfile( + "Pipeline : " + pipelineIdx + "(instance_num=" + + allPipelineTask.size() + ")", + allPipelines.get(0).nodeId()); + RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap); + newFragmentProfile.addChild(mergedpipelineProfile); + pipelineIdx++; + } + } + return fragmentsProfile; + } + + private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) { RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); for (int i = 0; i < fragmentProfiles.size(); ++i) { RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i); @@ -97,6 +130,54 @@ public class ExecutionProfile { return fragmentsProfile; } + public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { + if (enablePipelineX()) { + /* + * Fragment 0 + * ---Pipeline 0 + * ------pipelineTask 0 + * ------pipelineTask 0 + * ------pipelineTask 0 + * ---Pipeline 1 + * ------pipelineTask 1 + * ---Pipeline 2 + * ------pipelineTask 2 + * ------pipelineTask 2 + * Fragment 1 + * ---Pipeline 0 + * ------...... + * ---Pipeline 1 + * ------...... + * ---Pipeline 2 + * ------...... + * ...... + */ + return getPipelineXAggregatedProfile(planNodeMap); + } else { + /* + * Fragment 0 + * ---Instance 0 + * ------pipelineTask 0 + * ------pipelineTask 1 + * ------pipelineTask 2 + * ---Instance 1 + * ------pipelineTask 0 + * ------pipelineTask 1 + * ------pipelineTask 2 + * ---Instance 2 + * ------pipelineTask 0 + * ------pipelineTask 1 + * ------pipelineTask 2 + * Fragment 1 + * ---Instance 0 + * ---Instance 1 + * ---Instance 2 + * ...... + */ + return getNonPipelineXAggregatedProfile(planNodeMap); + } + } + public RuntimeProfile getExecutionProfile() { return executionProfile; } @@ -120,6 +201,17 @@ public class ExecutionProfile { } } + private boolean enablePipelineX() { + return profileFragmentDoneSignal != null; + } + + public void markFragments(int fragments) { + profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments); + for (int fragmentId = 0; fragmentId < fragments; fragmentId++) { + profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */); + } + } + public void update(long startTime, boolean isFinished) { if (startTime > 0) { executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime)); @@ -133,6 +225,14 @@ public class ExecutionProfile { } } + if (isFinished && profileFragmentDoneSignal != null) { + try { + profileFragmentDoneSignal.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e1) { + LOG.warn("signal await error", e1); + } + } + for (RuntimeProfile fragmentProfile : fragmentProfiles) { fragmentProfile.sortChildren(); } @@ -143,6 +243,9 @@ public class ExecutionProfile { // count down to zero to notify all objects waiting for this profileDoneSignal.countDownToZero(new Status()); } + if (profileFragmentDoneSignal != null) { + profileFragmentDoneSignal.countDownToZero(new Status()); + } } public void markOneInstanceDone(TUniqueId fragmentInstanceId) { @@ -153,6 +256,14 @@ public class ExecutionProfile { } } + public void markOneFragmentDone(int fragmentId) { + if (profileFragmentDoneSignal != null) { + if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) { + LOG.warn("Mark fragment {} done failed", fragmentId); + } + } + } + public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException { if (profileDoneSignal == null) { return true; @@ -160,6 +271,13 @@ public class ExecutionProfile { return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS); } + public boolean awaitAllFragmentsDone(long waitTimeS) throws InterruptedException { + if (profileFragmentDoneSignal == null) { + return true; + } + return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS); + } + public boolean isAllInstancesDone() { if (profileDoneSignal == null) { return true; @@ -167,9 +285,16 @@ public class ExecutionProfile { return profileDoneSignal.getCount() == 0; } - public void addInstanceProfile(int instanceIdx, RuntimeProfile instanceProfile) { - Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(), - instanceIdx + " vs. " + fragmentProfiles.size()); - fragmentProfiles.get(instanceIdx).addChild(instanceProfile); + public boolean isAllFragmentsDone() { + if (profileFragmentDoneSignal == null) { + return true; + } + return profileFragmentDoneSignal.getCount() == 0; + } + + public void addInstanceProfile(int fragmentId, RuntimeProfile instanceProfile) { + Preconditions.checkArgument(fragmentId < fragmentProfiles.size(), + fragmentId + " vs. " + fragmentProfiles.size()); + fragmentProfiles.get(fragmentId).addChild(instanceProfile); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5188412bd3a..ba1499fae11 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -148,6 +148,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import java.util.stream.Stream; public class Coordinator implements CoordInterface { private static final Logger LOG = LogManager.getLogger(Coordinator.class); @@ -677,7 +678,12 @@ public class Coordinator implements CoordInterface { Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } - executionProfile.markInstances(instanceIds); + if (enablePipelineXEngine) { + executionProfile.markFragments(fragments.size()); + } else { + executionProfile.markInstances(instanceIds); + } + if (enablePipelineEngine) { sendPipelineCtx(); } else { @@ -894,7 +900,8 @@ public class Coordinator implements CoordInterface { Long backendId = this.addressToBackendID.get(entry.getKey()); PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(), profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap, - executionProfile.getLoadChannelProfile()); + executionProfile.getLoadChannelProfile(), this.enablePipelineXEngine, + this.executionProfile); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -2459,7 +2466,7 @@ public class Coordinator implements CoordInterface { public void updateFragmentExecStatus(TReportExecStatusParams params) { if (enablePipelineXEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (!ctx.updateProfile(params, true)) { + if (!ctx.updateProfile(params)) { return; } @@ -2503,16 +2510,14 @@ public class Coordinator implements CoordInterface { } Preconditions.checkArgument(params.isSetDetailedReport()); - for (TDetailedReportParams param : params.detailed_report) { - if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) { - LOG.debug("Query {} instance {} is marked done", - DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId())); - executionProfile.markOneInstanceDone(param.getFragmentInstanceId()); - } + if (ctx.done) { + LOG.debug("Query {} fragment {} is marked done", + DebugUtil.printId(queryId), ctx.profileFragmentId); + executionProfile.markOneFragmentDone(ctx.profileFragmentId); } } else if (enablePipelineEngine) { PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId())); - if (!ctx.updateProfile(params, false)) { + if (!ctx.updateProfile(params)) { return; } @@ -2657,7 +2662,11 @@ public class Coordinator implements CoordInterface { long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime); boolean awaitRes = false; try { - awaitRes = executionProfile.awaitAllInstancesDone(waitTime); + if (enablePipelineXEngine) { + awaitRes = executionProfile.awaitAllFragmentsDone(waitTime); + } else { + awaitRes = executionProfile.awaitAllInstancesDone(waitTime); + } } catch (InterruptedException e) { // Do nothing } @@ -2700,7 +2709,11 @@ public class Coordinator implements CoordInterface { } public boolean isDone() { - return executionProfile.isAllInstancesDone(); + if (enablePipelineXEngine) { + return executionProfile.isAllFragmentsDone(); + } else { + return executionProfile.isAllInstancesDone(); + } } // map from a BE host address to the per-node assigned scan ranges; @@ -3092,9 +3105,13 @@ public class Coordinator implements CoordInterface { boolean initiated; volatile boolean done; boolean hasCanceled; + // use for pipeline Map<TUniqueId, RuntimeProfile> fragmentInstancesMap; + // use for pipelineX + List<RuntimeProfile> taskProfile; + + boolean enablePipelineX; RuntimeProfile loadChannelProfile; - int cancelProgress = 0; int profileFragmentId; TNetworkAddress brpcAddress; TNetworkAddress address; @@ -3103,16 +3120,18 @@ public class Coordinator implements CoordInterface { long profileReportProgress = 0; long beProcessEpoch = 0; private final int numInstances; + final ExecutionProfile executionProfile; public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId, TPipelineFragmentParams rpcParams, Long backendId, Map<TUniqueId, RuntimeProfile> fragmentInstancesMap, - RuntimeProfile loadChannelProfile) { + RuntimeProfile loadChannelProfile, boolean enablePipelineX, final ExecutionProfile executionProfile) { this.profileFragmentId = profileFragmentId; this.fragmentId = fragmentId; this.rpcParams = rpcParams; this.numInstances = rpcParams.local_params.size(); this.fragmentInstancesMap = fragmentInstancesMap; + this.taskProfile = new ArrayList<RuntimeProfile>(); this.loadChannelProfile = loadChannelProfile; this.initiated = false; @@ -3125,12 +3144,27 @@ public class Coordinator implements CoordInterface { this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); + this.enablePipelineX = enablePipelineX; + this.executionProfile = executionProfile; + } + + public Stream<RuntimeProfile> profileStream() { + if (enablePipelineX) { + return taskProfile.stream(); + } + return fragmentInstancesMap.values().stream(); + } + + private void attachInstanceProfileToFragmentProfile() { + profileStream() + .forEach(p -> executionProfile.addInstanceProfile(this.profileFragmentId, p)); } /** * Some information common to all Fragments does not need to be sent repeatedly. * Therefore, when we confirm that a certain BE has accepted the information, - * we will delete the information in the subsequent Fragment to avoid repeated sending. + * we will delete the information in the subsequent Fragment to avoid repeated + * sending. * This information can be obtained from the cache of BE. */ public void unsetFields() { @@ -3144,29 +3178,31 @@ public class Coordinator implements CoordInterface { // update profile. // return true if profile is updated. Otherwise, return false. - public synchronized boolean updateProfile(TReportExecStatusParams params, boolean isPipelineX) { - if (isPipelineX) { + public synchronized boolean updateProfile(TReportExecStatusParams params) { + if (enablePipelineX) { + taskProfile.clear(); + int pipelineIdx = 0; for (TDetailedReportParams param : params.detailed_report) { - RuntimeProfile profile = fragmentInstancesMap.get(param.fragment_instance_id); - if (params.done && profile.getIsDone()) { - continue; - } - + String name = "Pipeline :" + pipelineIdx + " " + + " (host=" + address + ")"; + RuntimeProfile profile = new RuntimeProfile(name); + taskProfile.add(profile); if (param.isSetProfile()) { profile.update(param.profile); } - if (params.isSetLoadChannelProfile()) { - loadChannelProfile.update(params.loadChannelProfile); - } if (params.done) { profile.setIsDone(true); - profileReportProgress++; } + pipelineIdx++; } - if (profileReportProgress == numInstances) { - this.done = true; + if (params.isSetLoadChannelProfile()) { + loadChannelProfile.update(params.loadChannelProfile); } - return true; + this.done = params.done; + if (this.done) { + attachInstanceProfileToFragmentProfile(); + } + return this.done; } else { RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id); if (params.done && profile.getIsDone()) { @@ -3192,7 +3228,7 @@ public class Coordinator implements CoordInterface { } public synchronized void printProfile(StringBuilder builder) { - this.fragmentInstancesMap.values().stream().forEach(p -> { + this.profileStream().forEach(p -> { p.computeTimeInProfile(); p.prettyPrint(builder, ""); }); @@ -3200,23 +3236,41 @@ public class Coordinator implements CoordInterface { // cancel all fragment instances. // return true if cancel success. Otherwise, return false - public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { - if (!this.initiated) { - LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId)); + + private synchronized boolean cancelFragment(Types.PPlanFragmentCancelReason cancelReason) { + if (!this.hasCanceled) { return false; } - // don't cancel if it is already finished - if (this.done) { - LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId)); - return false; + for (RuntimeProfile profile : taskProfile) { + profile.setIsCancel(true); } - if (this.hasCanceled) { - LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId)); + if (LOG.isDebugEnabled()) { + LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}," + + " fragment id={} query={}, reason: {}", + this.initiated, this.done, this.hasCanceled, backend.getId(), + this.profileFragmentId, + DebugUtil.printId(queryId), cancelReason.name()); + } + try { + try { + BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress, + this.profileFragmentId, queryId, cancelReason); + } catch (RpcException e) { + LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), + brpcAddress.getPort()); + SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage()); + } + } catch (Exception e) { + LOG.warn("catch a exception", e); return false; } + return true; + } + + private synchronized boolean cancelInstance(Types.PPlanFragmentCancelReason cancelReason) { for (TPipelineInstanceParams localParam : rpcParams.local_params) { LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend:{}," - + " fragment instance id={} query={}, reason: {}", + + " fragment instance id={} query={}, reason: {}", this.initiated, this.done, this.hasCanceled, backend.getId(), DebugUtil.printId(localParam.fragment_instance_id), DebugUtil.printId(queryId), cancelReason.name()); @@ -3244,14 +3298,35 @@ public class Coordinator implements CoordInterface { if (!this.hasCanceled) { return false; } - for (int i = 0; i < this.numInstances; i++) { fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true); } - cancelProgress = numInstances; return true; } + /// TODO: refactor rpcParams + public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) { + if (!this.initiated) { + LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId)); + return false; + } + // don't cancel if it is already finished + if (this.done) { + LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId)); + return false; + } + if (this.hasCanceled) { + LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId)); + return false; + } + + if (this.enablePipelineX) { + return cancelFragment(cancelReason); + } else { + return cancelInstance(cancelReason); + } + } + public synchronized boolean computeTimeInProfile(int maxFragmentId) { if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) { LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); @@ -3843,7 +3918,7 @@ public class Coordinator implements CoordInterface { private void attachInstanceProfileToFragmentProfile() { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { - ctx.fragmentInstancesMap.values().stream() + ctx.profileStream() .forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p)); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 02245c83ced..52350d805bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -240,6 +240,24 @@ public class BackendServiceProxy { } } + public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address, + int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException { + final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest + .newBuilder() + .setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build()) + .setCancelReason(cancelReason) + .setFragmentId(fragmentId) + .setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build(); + try { + final BackendServiceClient client = getProxy(address); + return client.cancelPlanFragmentAsync(pRequest); + } catch (Throwable e) { + LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(), + e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future<InternalService.PFetchDataResult> fetchDataAsync( TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException { try { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index ec3714d618a..46e2e194f06 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -229,6 +229,8 @@ message PExecPlanFragmentResult { message PCancelPlanFragmentRequest { required PUniqueId finst_id = 1; optional PPlanFragmentCancelReason cancel_reason = 2; + optional PUniqueId query_id = 3; + optional int32 fragment_id = 4; }; message PCancelPlanFragmentResult { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org