github-actions[bot] commented on code in PR #34421: URL: https://github.com/apache/doris/pull/34421#discussion_r1593359912
########## be/src/pipeline/pipeline_fragment_context.cpp: ########## @@ -236,6 +236,97 @@ PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { return pipeline; } +Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request, + const TQueryOptions& query_options) { + if (_prepared) { + return Status::InternalError("Already prepared"); + } + _num_instances = request.local_params.size(); + _total_instances = request.__isset.total_instances ? request.total_instances : _num_instances; + _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext"); + _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); + SCOPED_TIMER(_prepare_timer); + + auto* fragment_context = this; + + LOG_INFO("PipelineFragmentContext::prepare") + .tag("query_id", print_id(_query_id)) + .tag("fragment_id", _fragment_id) + .tag("pthread_id", (uintptr_t)pthread_self()); + + if (query_options.__isset.is_report_success) { + fragment_context->set_is_report_success(query_options.is_report_success); + } + + // 1. Set up the global runtime state. + _runtime_state = + RuntimeState::create_unique(request.query_id, request.fragment_id, query_options, + _query_ctx->query_globals, _exec_env, _query_ctx.get()); + + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); + if (request.__isset.backend_id) { + _runtime_state->set_backend_id(request.backend_id); + } + if (request.__isset.import_label) { + _runtime_state->set_import_label(request.import_label); + } + if (request.__isset.db_name) { + _runtime_state->set_db_name(request.db_name); + } + if (request.__isset.load_job_id) { + _runtime_state->set_load_job_id(request.load_job_id); + } + + _desc_tbl = _query_ctx->desc_tbl; + _runtime_state->set_desc_tbl(_desc_tbl); + _runtime_state->set_num_per_fragment_instances(request.num_senders); + _runtime_state->set_load_stream_per_node(request.load_stream_per_node); + _runtime_state->set_total_load_streams(request.total_load_streams); + _runtime_state->set_num_local_sink(request.num_local_sink); + + _need_local_merge = request.__isset.parallel_instances; + + // 2. Build pipelines with operators in this fragment. + auto root_pipeline = add_pipeline(); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( + _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline)); + + // 3. Create sink operator + if (!request.fragment.__isset.output_sink) { + return Status::InternalError("No output sink in this fragment!"); + } + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( + _runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, + request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl, + root_pipeline->id())); + RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); + RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); + + for (PipelinePtr& pipeline : _pipelines) { + DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); + RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); + } + if (_enable_local_shuffle()) { + RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, + request.bucket_seq_to_instance_idx, + request.shuffle_idx_to_instance_idx)); + } + + // 4. Initialize global states in pipelines. + for (PipelinePtr& pipeline : _pipelines) { + pipeline->children().clear(); + RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); + } + + // 5. Build pipeline tasks and initialize local state. + RETURN_IF_ERROR(_build_pipeline_tasks(request, query_options)); + + _init_next_report_time(); + + _prepared = true; + return Status::OK(); +} + Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { Review Comment: warning: function '_build_pipeline_tasks' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/pipeline_fragment_context.cpp:329:** 177 lines including whitespace and comments (threshold 80) ```cpp ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org