This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 513784188ad [PipelineX](improvement) Prepare tasks in parallel (#40844) (#40874) 513784188ad is described below commit 513784188adf88879dc4c23602527fbbd5fb8e96 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Wed Sep 18 10:02:58 2024 +0800 [PipelineX](improvement) Prepare tasks in parallel (#40844) (#40874) pick #40844 --- be/src/common/config.cpp | 2 +- be/src/exprs/runtime_filter.cpp | 1 + be/src/pipeline/pipeline.h | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 98 ++++++++++++++++------ be/src/pipeline/pipeline_fragment_context.h | 9 +- be/src/runtime/fragment_mgr.cpp | 3 +- be/src/vec/sink/vdata_stream_sender.cpp | 8 ++ .../java/org/apache/doris/qe/SessionVariable.java | 9 +- gensrc/thrift/PaloInternalService.thrift | 1 + 9 files changed, 101 insertions(+), 33 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 75f4bb7c4ea..d6154c491d5 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -511,7 +511,7 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1"); DEFINE_mBool(enable_bthread_transmit_block, "true"); // The maximum amount of data that can be processed by a stream load -DEFINE_mInt64(streaming_load_max_mb, "10240"); +DEFINE_mInt64(streaming_load_max_mb, "102400"); // Some data formats, such as JSON, cannot be streamed. // Therefore, it is necessary to limit the maximum number of // such data when using stream load to prevent excessive memory consumption. diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 8d7281e9426..456637fb2aa 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1235,6 +1235,7 @@ void IRuntimeFilter::signal() { } void IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> timer) { + std::unique_lock lock(_inner_mutex); _filter_timer.push_back(timer); } diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index e7584dccd4a..ae20c760110 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -105,7 +105,6 @@ public: void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; } void incr_created_tasks() { _num_tasks_created++; } - bool need_to_create_task() const { return _num_tasks > _num_tasks_created; } void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; for (auto& op : operatorXs) { @@ -160,7 +159,7 @@ private: // How many tasks should be created ? int _num_tasks = 1; // How many tasks are already created? - int _num_tasks_created = 0; + std::atomic<int> _num_tasks_created = 0; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 4ca3e20ed24..19eb023e27e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -137,8 +137,10 @@ PipelineFragmentContext::~PipelineFragmentContext() { } } _tasks.clear(); - for (auto& runtime_state : _task_runtime_states) { - runtime_state.reset(); + for (auto& runtime_states : _task_runtime_states) { + for (auto& runtime_state : runtime_states) { + runtime_state.reset(); + } } _pipelines.clear(); _sink.reset(); @@ -229,7 +231,8 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } -Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { +Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, + ThreadPool* thread_pool) { if (_prepared) { return Status::InternalError("Already prepared"); } @@ -346,7 +349,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re { SCOPED_TIMER(_build_tasks_timer); // 5. Build pipeline tasks and initialize local state. - RETURN_IF_ERROR(_build_pipeline_tasks(request)); + RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool)); } _init_next_report_time(); @@ -355,17 +358,23 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re return Status::OK(); } -Status PipelineFragmentContext::_build_pipeline_tasks( - const doris::TPipelineFragmentParams& request) { +Status PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFragmentParams& request, + ThreadPool* thread_pool) { _total_tasks = 0; - int target_size = request.local_params.size(); + const auto target_size = request.local_params.size(); _tasks.resize(target_size); + _fragment_instance_ids.resize(target_size); + _runtime_filter_states.resize(target_size); + _task_runtime_states.resize(_pipelines.size()); + for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { + _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + } auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); - for (size_t i = 0; i < target_size; i++) { + auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) { const auto& local_params = request.local_params[i]; auto fragment_instance_id = local_params.fragment_instance_id; - _fragment_instance_ids.push_back(fragment_instance_id); + _fragment_instance_ids[i] = fragment_instance_id; std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr; auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& runtime_state) { runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); @@ -424,7 +433,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks( filterparams->runtime_filter_mgr = runtime_filter_mgr.get(); - _runtime_filter_states.push_back(std::move(filterparams)); + _runtime_filter_states[i] = std::move(filterparams); std::map<PipelineId, PipelineTask*> pipeline_id_to_task; auto get_local_exchange_state = [&](PipelinePtr pipeline) -> std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, @@ -447,13 +456,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks( for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto& pipeline = _pipelines[pip_idx]; - if (pipeline->need_to_create_task()) { - // build task runtime state - _task_runtime_states.push_back(RuntimeState::create_unique( + if (pipeline->num_tasks() > 1 || i == 0) { + DCHECK(_task_runtime_states[pip_idx][i] == nullptr) + << print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " " + << pipeline->debug_string(); + _task_runtime_states[pip_idx][i] = RuntimeState::create_unique( this, local_params.fragment_instance_id, request.query_id, request.fragment_id, request.query_options, _query_ctx->query_globals, - _exec_env, _query_ctx.get())); - auto& task_runtime_state = _task_runtime_states.back(); + _exec_env, _query_ctx.get()); + auto& task_runtime_state = _task_runtime_states[pip_idx][i]; init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; task_runtime_state->set_task_id(cur_task_id); @@ -527,6 +538,39 @@ Status PipelineFragmentContext::_build_pipeline_tasks( std::lock_guard<std::mutex> l(_state_map_lock); _runtime_filter_mgr_map[fragment_instance_id] = std::move(runtime_filter_mgr); } + return Status::OK(); + }; + if (target_size > 1 && + (_runtime_state->query_options().__isset.parallel_prepare_threshold && + target_size > _runtime_state->query_options().parallel_prepare_threshold)) { + std::vector<Status> prepare_status(target_size); + std::mutex m; + std::condition_variable cv; + int prepare_done = 0; + for (size_t i = 0; i < target_size; i++) { + RETURN_IF_ERROR(thread_pool->submit_func([&, i]() { + SCOPED_ATTACH_TASK(_query_ctx.get()); + prepare_status[i] = pre_and_submit(i, this); + std::unique_lock<std::mutex> lock(m); + prepare_done++; + if (prepare_done == target_size) { + cv.notify_one(); + } + })); + } + std::unique_lock<std::mutex> lock(m); + if (prepare_done != target_size) { + cv.wait(lock); + for (size_t i = 0; i < target_size; i++) { + if (!prepare_status[i].ok()) { + return prepare_status[i]; + } + } + } + } else { + for (size_t i = 0; i < target_size; i++) { + RETURN_IF_ERROR(pre_and_submit(i, this)); + } } _pipeline_parent_map.clear(); _dag.clear(); @@ -1683,8 +1727,12 @@ Status PipelineFragmentContext::send_report(bool done) { std::vector<RuntimeState*> runtime_states; - for (auto& task_state : _task_runtime_states) { - runtime_states.push_back(task_state.get()); + for (auto& task_states : _task_runtime_states) { + for (auto& task_state : task_states) { + if (task_state) { + runtime_states.push_back(task_state.get()); + } + } } ReportStatusRequest req {exec_status, @@ -1755,15 +1803,17 @@ PipelineFragmentContext::collect_realtime_load_channel_profile_x() const { return nullptr; } - for (auto& runtime_state : _task_runtime_states) { - if (runtime_state->runtime_profile() == nullptr) { - continue; - } + for (auto& runtime_states : _task_runtime_states) { + for (auto& runtime_state : runtime_states) { + if (runtime_state->runtime_profile() == nullptr) { + continue; + } - auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); + auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); - runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get()); - this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); + runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get()); + this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); + } } auto load_channel_profile = std::make_shared<TRuntimeProfileTree>(); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 06c88267441..e0e4c12ef0d 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -88,7 +88,7 @@ public: // should be protected by lock? [[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); } - Status prepare(const doris::TPipelineFragmentParams& request); + Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool* thread_pool); Status submit(); @@ -187,7 +187,8 @@ private: bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); } - Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request); + Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request, + ThreadPool* thread_pool); void _close_fragment_instance(); void _init_next_report_time(); @@ -206,7 +207,7 @@ private: int _closed_tasks = 0; // After prepared, `_total_tasks` is equal to the size of `_tasks`. // When submit fail, `_total_tasks` is equal to the number of tasks submitted. - int _total_tasks = 0; + std::atomic<int> _total_tasks = 0; std::unique_ptr<RuntimeProfile> _runtime_profile; bool _is_report_success = false; @@ -303,7 +304,7 @@ private: std::vector<TUniqueId> _fragment_instance_ids; // Local runtime states for each task - std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states; + std::vector<std::vector<std::unique_ptr<RuntimeState>>> _task_runtime_states; std::vector<std::unique_ptr<RuntimeFilterParamsContext>> _runtime_filter_states; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 3d4f072ed85..5bb01dc9ba5 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -820,7 +820,8 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, { SCOPED_RAW_TIMER(&duration_ns); Status prepare_st = Status::OK(); - ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params), prepare_st); + ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params, _thread_pool.get()), + prepare_st); if (!prepare_st.ok()) { query_ctx->cancel(prepare_st, params.fragment_id); query_ctx->set_execution_dependency_ready(); diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index dd221c6aaa3..f18467fbad9 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -92,6 +92,14 @@ Status Channel<Parent>::init_stub(RuntimeState* state) { template <typename Parent> Status Channel<Parent>::open(RuntimeState* state) { + if (_is_local) { + auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr( + _fragment_instance_id, _dest_node_id, &_local_recvr); + if (!st.ok()) { + // Recvr not found. Maybe downstream task is finished already. + LOG(INFO) << "Recvr is not found : " << st.to_string(); + } + } _be_number = state->be_number(); _brpc_request = std::make_shared<PTransmitDataParams>(); // initialize brpc request diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7e70576145c..3eba30ebf41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -290,6 +290,8 @@ public class SessionVariable implements Serializable, Writable { public static final String AUTO_BROADCAST_JOIN_THRESHOLD = "auto_broadcast_join_threshold"; + public static final String PARALLEL_PREPARE_THRESHOLD = "parallel_prepare_threshold"; + public static final String ENABLE_PROJECTION = "enable_projection"; public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_query"; @@ -1010,7 +1012,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy = true, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) - private long parallelScanMinRowsPerScanner = 16384; // 16K + private long parallelScanMinRowsPerScanner = 2097152; // 16K @VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, needForward = true) @@ -1053,6 +1055,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD) public double autoBroadcastJoinThreshold = 0.8; + @VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true) + public int parallelPrepareThreshold = 32; + @VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER) private boolean enableJoinReorderBasedCost = false; @@ -2109,6 +2114,7 @@ public class SessionVariable implements Serializable, Writable { Random random = new SecureRandom(); this.parallelExecInstanceNum = random.nextInt(8) + 1; this.parallelPipelineTaskNum = random.nextInt(8); + this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); // This will cause be dead loop, disable it first @@ -3529,6 +3535,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setNumScannerThreads(numScannerThreads); tResult.setScannerScaleUpRatio(scannerScaleUpRatio); tResult.setMaxColumnReaderNum(maxColumnReaderNum); + tResult.setParallelPrepareThreshold(parallelPrepareThreshold); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index b3000b66ea9..d3558b1f0f3 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -332,6 +332,7 @@ struct TQueryOptions { 125: optional bool enable_segment_cache = true; + 132: optional i32 parallel_prepare_threshold = 0; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org