This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3838b6fbaeb [refine](pipelineX) refine some code in pipelineX (#27472) 3838b6fbaeb is described below commit 3838b6fbaeb2034b0bebb04cf605951c23a2e5ed Author: Mryange <59914473+mrya...@users.noreply.github.com> AuthorDate: Mon Nov 27 11:04:16 2023 +0800 [refine](pipelineX) refine some code in pipelineX (#27472) --- be/src/pipeline/pipeline_x/operator.cpp | 5 -- be/src/pipeline/pipeline_x/operator.h | 1 - .../pipeline_x/pipeline_x_fragment_context.cpp | 56 ++++++++++++---------- .../pipeline_x/pipeline_x_fragment_context.h | 12 +++-- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 20 ++++---- be/src/runtime/runtime_state.cpp | 11 +++-- be/src/runtime/runtime_state.h | 2 +- be/src/vec/sink/writer/async_result_writer.cpp | 6 --- be/src/vec/sink/writer/async_result_writer.h | 2 - 9 files changed, 54 insertions(+), 61 deletions(-) diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index c0f3a6f029a..6f305d86efa 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -528,11 +528,6 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Bl return _writer->sink(block, source_state == SourceState::FINISHED); } -template <typename Writer, typename Parent> -Dependency* AsyncWriterSink<Writer, Parent>::write_blocked_by(PipelineXTask* task) { - return _writer->write_blocked_by(task); -} - template <typename Writer, typename Parent> Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) { if (_closed) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 28f156a15a9..2dc71dec96a 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -640,7 +640,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state); - Dependency* write_blocked_by(PipelineXTask* task); Dependency* dependency() override { return _async_writer_dependency.get(); } Status close(RuntimeState* state, Status exec_status) override; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 3d7b8ea42b9..6a40f86dede 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -261,7 +261,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData params.__isset.send_query_statistics_with_every_batch ? params.send_query_statistics_with_every_batch : false; - _sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_operator_id(), + _sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink, params.destinations, send_query_statistics_with_every_batch)); break; @@ -272,18 +272,18 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData } // TODO: figure out good buffer size based on size of output row - _sink.reset(new ResultSinkOperatorX(next_operator_id(), row_desc, output_exprs, + _sink.reset(new ResultSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs, thrift_sink.result_sink)); break; } case TDataSinkType::OLAP_TABLE_SINK: { if (state->query_options().enable_memtable_on_sink_node && !_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) { - _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc, + _sink.reset(new OlapTableSinkV2OperatorX(pool, next_sink_operator_id(), row_desc, output_exprs, false)); } else { - _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs, - false)); + _sink.reset(new OlapTableSinkOperatorX(pool, next_sink_operator_id(), row_desc, + output_exprs, false)); } break; } @@ -292,7 +292,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData return Status::InternalError("Missing data jdbc sink."); } if (config::enable_java_support) { - _sink.reset(new JdbcTableSinkOperatorX(row_desc, next_operator_id(), output_exprs)); + _sink.reset( + new JdbcTableSinkOperatorX(row_desc, next_sink_operator_id(), output_exprs)); } else { return Status::InternalError( "Jdbc table sink is not enabled, you can change be config " @@ -313,10 +314,12 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { _sink.reset(new ResultFileSinkOperatorX( - next_operator_id(), row_desc, thrift_sink.result_file_sink, params.destinations, - send_query_statistics_with_every_batch, output_exprs, desc_tbl)); + next_sink_operator_id(), row_desc, thrift_sink.result_file_sink, + params.destinations, send_query_statistics_with_every_batch, output_exprs, + desc_tbl)); } else { - _sink.reset(new ResultFileSinkOperatorX(next_operator_id(), row_desc, output_exprs)); + _sink.reset( + new ResultFileSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs)); } break; } @@ -324,7 +327,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData DCHECK(thrift_sink.__isset.multi_cast_stream_sink); DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); // TODO: figure out good buffer size based on size of output row - auto sink_id = next_operator_id(); + auto sink_id = next_sink_operator_id(); auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); // one sink has multiple sources. std::vector<int> sources; @@ -359,7 +362,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData DataSinkOperatorXPtr sink_op; sink_op.reset(new ExchangeSinkOperatorX( - state, *_row_desc, next_operator_id(), + state, *_row_desc, next_sink_operator_id(), thrift_sink.multi_cast_stream_sink.sinks[i], thrift_sink.multi_cast_stream_sink.destinations[i], false)); @@ -421,7 +424,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( _runtime_states[i]->set_desc_tbl(_desc_tbl); _runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id); _runtime_states[i]->set_num_per_fragment_instances(request.num_senders); - _runtime_states[i]->resize_op_id_to_local_state(max_operator_id()); + _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(), max_sink_operator_id()); _runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node); _runtime_states[i]->set_total_load_streams(request.total_load_streams); _runtime_states[i]->set_num_local_sink(request.num_local_sink); @@ -675,8 +678,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN cur_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset( - new DistinctStreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new DistinctStreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode, + descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -692,7 +695,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN cur_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new StreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new StreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -708,7 +711,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AggSinkOperatorX<>(pool, next_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX<>(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -733,7 +736,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new HashJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -753,7 +756,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset( + new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -774,7 +778,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN PipelinePtr build_side_pipe = add_pipeline(); _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool, tnode, descs)); + sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -795,7 +799,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new SortSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -813,7 +817,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new PartitionSortSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new PartitionSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -831,7 +835,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -912,11 +916,11 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node( DataSinkOperatorXPtr sink; if (child_id == 0) { - sink.reset(new SetSinkOperatorX<is_intersect>(child_id, next_operator_id(), pool, tnode, - descs)); + sink.reset(new SetSinkOperatorX<is_intersect>(child_id, next_sink_operator_id(), pool, + tnode, descs)); } else { - sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, next_operator_id(), pool, - tnode, descs)); + sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, next_sink_operator_id(), + pool, tnode, descs)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(probe_side_pipe->set_sink(sink)); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 23ff08fcb0a..f579265ab63 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -104,7 +104,7 @@ public: RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override { std::lock_guard<std::mutex> l(_state_map_lock); - if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) { + if (_instance_id_to_runtime_state.contains(fragment_instance_id)) { return _instance_id_to_runtime_state[fragment_instance_id]; } else { return _runtime_state.get(); @@ -115,6 +115,10 @@ public: [[nodiscard]] int max_operator_id() const { return _operator_id; } + [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; } + + [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; } + std::string debug_string() override; private: @@ -203,11 +207,9 @@ private: std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state; std::mutex _state_map_lock; - // We can guarantee that a plan node ID can correspond to an operator ID, - // but some operators do not have a corresponding plan node ID. - // We set these IDs as negative numbers, which are not visible to the user. - int _operator_id = 0; + int _operator_id = 0; + int _sink_operator_id = 0; std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> _op_id_to_le_state; }; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index aaf9c2a16f1..c96f34b213b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -83,19 +83,15 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams std::vector<TScanRangeParams> no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); - + auto* parent_profile = _parent_profile; for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { - auto& deps = get_upstream_dependency(_operators[op_idx]->operator_id()); - LocalStateInfo info { - op_idx == _operators.size() - 1 - ? _parent_profile - : state->get_local_state(_operators[op_idx + 1]->operator_id())->profile(), - scan_ranges, - deps, - _local_exchange_state, - _task_idx, - _source_dependency[_operators[op_idx]->operator_id()]}; - RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info)); + auto& op = _operators[op_idx]; + auto& deps = get_upstream_dependency(op->operator_id()); + LocalStateInfo info {parent_profile, scan_ranges, + deps, _local_exchange_state, + _task_idx, _source_dependency[op->operator_id()]}; + RETURN_IF_ERROR(op->setup_local_state(state, info)); + parent_profile = state->get_local_state(op->operator_id())->profile(); } _block = doris::vectorized::Block::create_unique(); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index daf3e1bc75e..a8027d0d61b 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -23,6 +23,7 @@ #include <fmt/format.h> #include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/Types_types.h> +#include <glog/logging.h> #include <string> @@ -424,13 +425,15 @@ int64_t RuntimeState::get_load_mem_limit() { } } -void RuntimeState::resize_op_id_to_local_state(int size) { - _op_id_to_local_state.resize(size); - _op_id_to_sink_local_state.resize(size); +void RuntimeState::resize_op_id_to_local_state(int operator_size, int sink_size) { + _op_id_to_local_state.resize(operator_size); + _op_id_to_sink_local_state.resize(sink_size); } void RuntimeState::emplace_local_state( int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> state) { + DCHECK(id < _op_id_to_local_state.size()); + DCHECK(!_op_id_to_local_state[id]); _op_id_to_local_state[id] = std::move(state); } @@ -451,6 +454,8 @@ Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id) { void RuntimeState::emplace_sink_local_state( int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> state) { + DCHECK(id < _op_id_to_sink_local_state.size()); + DCHECK(!_op_id_to_sink_local_state[id]); _op_id_to_sink_local_state[id] = std::move(state); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 835bd582894..8c7b3bed9fc 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -500,7 +500,7 @@ public: Result<SinkLocalState*> get_sink_local_state_result(int id); - void resize_op_id_to_local_state(int size); + void resize_op_id_to_local_state(int operator_size, int sink_size); private: Status create_error_log_file(); diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 8edde60adb4..422dc2efef4 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -172,11 +172,5 @@ std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Blo return b; } -pipeline::Dependency* AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) { - std::lock_guard l(_m); - DCHECK(_dependency != nullptr); - return _dependency->is_blocked_by(task); -} - } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 0a217b34e6b..e91ff1a0701 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -78,8 +78,6 @@ public: return _data_queue_is_available() || _is_finished(); } - pipeline::Dependency* write_blocked_by(pipeline::PipelineXTask* task); - [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } void process_block(RuntimeState* state, RuntimeProfile* profile); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org