BiteTheDDDDt commented on code in PR #22562: URL: https://github.com/apache/doris/pull/22562#discussion_r1299679279
########## be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp: ########## @@ -0,0 +1,585 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline_x_fragment_context.h" + +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/PaloInternalService_types.h> +#include <gen_cpp/PlanNodes_types.h> +#include <gen_cpp/Planner_types.h> +#include <opentelemetry/nostd/shared_ptr.h> +#include <opentelemetry/trace/span.h> +#include <opentelemetry/trace/span_context.h> +#include <opentelemetry/trace/tracer.h> +#include <pthread.h> +#include <stdlib.h> +// IWYU pragma: no_include <bits/chrono.h> +#include <chrono> // IWYU pragma: keep +#include <map> +#include <ostream> +#include <typeinfo> +#include <utility> + +#include "common/config.h" +#include "common/logging.h" +#include "exec/data_sink.h" +#include "exec/exec_node.h" +#include "exec/scan_node.h" +#include "io/fs/stream_load_pipe.h" +#include "pipeline/exec/aggregation_sink_operator.h" +#include "pipeline/exec/aggregation_source_operator.h" +#include "pipeline/exec/data_queue.h" +#include "pipeline/exec/datagen_operator.h" +#include "pipeline/exec/exchange_sink_operator.h" +#include "pipeline/exec/exchange_source_operator.h" +#include "pipeline/exec/olap_scan_operator.h" +#include "pipeline/exec/result_sink_operator.h" +#include "pipeline/exec/scan_operator.h" +#include "pipeline/task_scheduler.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/runtime_filter_mgr.h" +#include "runtime/runtime_state.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/thread_context.h" +#include "service/backend_options.h" +#include "util/container_util.hpp" +#include "util/debug_util.h" +#include "util/telemetry/telemetry.h" +#include "util/uid_util.h" +#include "vec/common/assert_cast.h" +#include "vec/exec/join/vhash_join_node.h" +#include "vec/exec/scan/new_es_scan_node.h" +#include "vec/exec/scan/new_file_scan_node.h" +#include "vec/exec/scan/new_odbc_scan_node.h" +#include "vec/exec/scan/new_olap_scan_node.h" +#include "vec/exec/scan/vmeta_scan_node.h" +#include "vec/exec/scan/vscan_node.h" +#include "vec/exec/vaggregation_node.h" +#include "vec/exec/vexchange_node.h" +#include "vec/exec/vunion_node.h" +#include "vec/runtime/vdata_stream_mgr.h" + +namespace doris::pipeline { + +#define FOR_EACH_RUNTIME_STATE(stmt) \ + for (auto& runtime_state : _runtime_states) { \ + stmt \ + } + +PipelineXFragmentContext::PipelineXFragmentContext( + const TUniqueId& query_id, const int fragment_id, std::shared_ptr<QueryContext> query_ctx, + ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back, + const report_status_callback& report_status_cb) + : PipelineFragmentContext(query_id, TUniqueId(), fragment_id, -1, query_ctx, exec_env, + call_back, report_status_cb) {} + +PipelineXFragmentContext::~PipelineXFragmentContext() { + if (!_runtime_states.empty()) { + // The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state. + SCOPED_ATTACH_TASK(_runtime_state.get()); + FOR_EACH_RUNTIME_STATE(_call_back(runtime_state.get(), &_exec_status); + runtime_state.reset();) + } else { + _call_back(nullptr, &_exec_status); + } + _runtime_state.reset(); + DCHECK(!_report_thread_active); +} + +void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason, + const std::string& msg) { + if (!_runtime_state->is_cancelled()) { + std::lock_guard<std::mutex> l(_status_lock); + if (_runtime_state->is_cancelled()) { + return; + } + if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { + _exec_status = Status::Cancelled(msg); + } + + for (auto& rs : _runtime_states) { + rs->set_is_cancelled(true, msg); + rs->set_process_status(_exec_status); + _exec_env->vstream_mgr()->cancel(rs->fragment_instance_id()); + } + + LOG(WARNING) << "PipelineFragmentContext Canceled. reason=" << msg; + + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } + _cancel_reason = reason; + _cancel_msg = msg; + // To notify wait_for_start() + _query_ctx->set_ready_to_execute(true); + + // must close stream_mgr to avoid dead lock in Exchange Node + // + // Cancel the result queue manager used by spark doris connector + // TODO pipeline incomp + // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); + } +} + +Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& request) { + if (_prepared) { + return Status::InternalError("Already prepared"); + } + _runtime_profile.reset(new RuntimeProfile("PipelineContext")); + _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); + COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); + _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); + SCOPED_TIMER(_prepare_timer); + + auto* fragment_context = this; + OpentelemetryTracer tracer = telemetry::get_noop_tracer(); + if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) { + tracer = telemetry::get_tracer(print_id(_query_id)); + } + + LOG_INFO("PipelineXFragmentContext::prepare") + .tag("query_id", _query_id) + .tag("fragment_id", _fragment_id) + .tag("pthread_id", (uintptr_t)pthread_self()); + + if (request.query_options.__isset.is_report_success) { + fragment_context->set_is_report_success(request.query_options.is_report_success); + } + + // 1. Set up the global runtime state. + _runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id, + request.query_options, _query_ctx->query_globals, + _exec_env); + _runtime_state->set_query_ctx(_query_ctx.get()); + _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); + _runtime_state->set_tracer(std::move(tracer)); + + SCOPED_ATTACH_TASK(get_runtime_state()); + if (request.__isset.backend_id) { + _runtime_state->set_backend_id(request.backend_id); + } + if (request.__isset.import_label) { + _runtime_state->set_import_label(request.import_label); + } + if (request.__isset.db_name) { + _runtime_state->set_db_name(request.db_name); + } + if (request.__isset.load_job_id) { + _runtime_state->set_load_job_id(request.load_job_id); + } + + auto* desc_tbl = _query_ctx->desc_tbl; + _runtime_state->set_desc_tbl(desc_tbl); + _runtime_state->set_num_per_fragment_instances(request.num_senders); + + // 2. Build pipelines with operators in this fragment. + auto root_pipeline = add_pipeline(); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines( + _runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline)); + + // 3. Create sink operator + if (request.fragment.__isset.output_sink) { + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink( + _runtime_state->obj_pool(), request.fragment.output_sink, + request.fragment.output_exprs, request, root_pipeline->output_row_desc(), + _runtime_state.get(), *desc_tbl)); + RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); + root_pipeline->set_sink(_sink); + } + + // 4. Initialize global states in pipelines. + for (PipelinePtr& pipeline : _pipelines) { + pipeline->sink_x()->set_child(pipeline->operator_xs().back()); + RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); + } + + // 5. Build pipeline tasks and initialize local state. + RETURN_IF_ERROR(_build_pipeline_tasks(request)); + _runtime_state->runtime_profile()->add_child(_root_op->get_runtime_profile(), true, nullptr); + _runtime_state->runtime_profile()->add_child(_sink->get_runtime_profile(), true, nullptr); + _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); + + _prepared = true; + return Status::OK(); +} + +Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, + const std::vector<TExpr>& output_exprs, + const TPipelineFragmentParams& params, + const RowDescriptor& row_desc, + RuntimeState* state, DescriptorTbl& desc_tbl) { + switch (thrift_sink.type) { + case TDataSinkType::DATA_STREAM_SINK: { + if (!thrift_sink.__isset.stream_sink) { + return Status::InternalError("Missing data stream sink."); + } + bool send_query_statistics_with_every_batch = + params.__isset.send_query_statistics_with_every_batch + ? params.send_query_statistics_with_every_batch + : false; + _sink.reset(new ExchangeSinkOperatorX( + _sink_idx++, state, pool, row_desc, thrift_sink.stream_sink, params.destinations, + 16 * 1024, send_query_statistics_with_every_batch, this)); + break; + } + case TDataSinkType::RESULT_SINK: { + if (!thrift_sink.__isset.result_sink) { + return Status::InternalError("Missing data buffer sink."); + } + + // TODO: figure out good buffer size based on size of output row + _sink.reset(new ResultSinkOperatorX(_sink_idx++, row_desc, output_exprs, + thrift_sink.result_sink, 4096)); Review Comment: ditto of 4096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org