This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dc524a5f564 [refactor](cleancode) remove unused code from be (#35756) dc524a5f564 is described below commit dc524a5f564f180f2514bf19821a22af0afdcbcd Author: yiguolei <676222...@qq.com> AuthorDate: Sat Jun 1 22:20:25 2024 +0800 [refactor](cleancode) remove unused code from be (#35756) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: yiguolei <yiguo...@gmail.com> --- be/src/exec/data_sink.h | 4 ---- be/src/exec/exec_node.h | 2 -- be/src/pipeline/exec/exchange_sink_buffer.cpp | 10 ---------- be/src/pipeline/exec/exchange_sink_buffer.h | 1 - be/src/pipeline/exec/multi_cast_data_streamer.h | 8 -------- be/src/pipeline/pipeline_fragment_context.cpp | 3 --- be/src/pipeline/pipeline_task.cpp | 17 +++++++++-------- be/src/pipeline/pipeline_task.h | 6 ++---- be/src/vec/exec/vanalytic_eval_node.cpp | 13 ------------- be/src/vec/exec/vanalytic_eval_node.h | 2 -- be/src/vec/exec/vpartition_sort_node.cpp | 5 ----- be/src/vec/exec/vpartition_sort_node.h | 1 - be/src/vec/sink/async_writer_sink.h | 4 ---- be/src/vec/sink/multi_cast_data_stream_sink.h | 3 --- be/src/vec/sink/vdata_stream_sender.cpp | 16 ---------------- be/src/vec/sink/vdata_stream_sender.h | 13 ------------- be/src/vec/sink/vmemory_scratch_sink.cpp | 4 ---- be/src/vec/sink/vmemory_scratch_sink.h | 2 -- be/src/vec/sink/writer/async_result_writer.h | 7 ------- 19 files changed, 11 insertions(+), 110 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 5258929ba79..2d76078e7e5 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -71,8 +71,6 @@ public: return send(state, block, eos); } - [[nodiscard]] virtual bool is_pending_finish() const { return false; } - // Releases all resources that were allocated in prepare()/send(). // Further send() calls are illegal after calling close(). // It must be okay to call this multiple times. Subsequent calls should @@ -102,8 +100,6 @@ public: const RowDescriptor& row_desc() { return _row_desc; } - virtual bool can_write() { return true; } - std::shared_ptr<QueryStatistics> get_query_statistics_ptr(); protected: diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 10b035835d7..2dedee61ba5 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -142,8 +142,6 @@ public: return Status::OK(); } - bool can_read() const { return _can_read; } - [[nodiscard]] virtual bool can_terminate_early() { return false; } // Sink Data to ExecNode to do some stock work, both need impl with method: get_result diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 8893db54cc5..e29991890f5 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -106,16 +106,6 @@ void ExchangeSinkBuffer::close() { //_instance_to_request.clear(); } -bool ExchangeSinkBuffer::can_write() const { - size_t max_package_size = - config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); - size_t total_package_size = 0; - for (auto& [_, q] : _instance_to_package_queue) { - total_package_size += q.size(); - } - return total_package_size <= max_package_size; -} - void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) { if (_finish_dependency && _should_stop && all_done) { _finish_dependency->set_ready(); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 8eed559e712..683a485f2ca 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -202,7 +202,6 @@ public: Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); - bool can_write() const; void close(); void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 2078a729227..e812067e52c 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -56,14 +56,6 @@ public: Status push(RuntimeState* state, vectorized::Block* block, bool eos); - // use sink to check can_write, now always true after we support spill to disk - bool can_write() { return true; } - - bool can_read(int sender_idx) { - std::lock_guard l(_mutex); - return _sender_pos_to_read[sender_idx] != _multi_cast_blocks.end() || _eos; - } - const RowDescriptor& row_desc() { return _row_desc; } RuntimeProfile* profile() { return _profile; } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dbfdaba6d91..8347892c6bf 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -189,9 +189,6 @@ void PipelineFragmentContext::cancel(const Status reason) { // _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); for (auto& tasks : _tasks) { for (auto& task : tasks) { - if (task->is_finished()) { - continue; - } task->clear_blocking_state(); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 52a76828804..c43410e68a4 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -429,7 +429,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* state, int64_t revocable_m void PipelineTask::finalize() { std::unique_lock<std::mutex> lc(_dependency_lock); - _finished = true; + _finalized = true; _sink_shared_state.reset(); _op_shared_states.clear(); _le_state_map.clear(); @@ -475,17 +475,18 @@ std::string PipelineTask::debug_string() { debug_string_buffer, "PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run = {}, elapse time " "= {}s], block dependency = {}, is running = {}\noperators: ", - (void*)this, _opened, _eos, _finished, _dry_run, elapsed, - cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", is_running()); + (void*)this, _opened, _eos, _finalized, _dry_run, elapsed, + cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", + is_running()); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", - _opened && !_finished ? _operators[i]->debug_string(_state, i) - : _operators[i]->debug_string(i)); + _opened && !_finalized ? _operators[i]->debug_string(_state, i) + : _operators[i]->debug_string(i)); } fmt::format_to(debug_string_buffer, "\n{}\n", - _opened && !_finished ? _sink->debug_string(_state, _operators.size()) - : _sink->debug_string(_operators.size())); - if (_finished) { + _opened && !_finalized ? _sink->debug_string(_state, _operators.size()) + : _sink->debug_string(_operators.size())); + if (_finalized) { return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 6bc65905be6..20c83f6a97e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -84,8 +84,6 @@ public: void finalize(); - bool is_finished() const { return _finished.load(); } - std::string debug_string(); bool is_pending_finish() { @@ -142,7 +140,7 @@ public: void clear_blocking_state() { // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock<std::mutex> lc(_dependency_lock); - if (!_finished) { + if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { dep->set_always_ready(); @@ -303,7 +301,7 @@ private: Dependency* _execution_dep = nullptr; - std::atomic<bool> _finished {false}; + std::atomic<bool> _finalized {false}; std::mutex _dependency_lock; std::atomic<bool> _running {false}; diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index fbd49aa145a..410964c1969 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -298,19 +298,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* state) { return ExecNode::release_resource(state); } -//TODO: maybe could have better strategy, not noly when need data to sink data -//even could get some resources in advance as soon as possible -bool VAnalyticEvalNode::can_write() { - return _need_more_input; -} - -bool VAnalyticEvalNode::can_read() { - if (_need_more_input) { - return false; - } - return true; -} - Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index 45f7ce5b1e8..9b302b32ed6 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -83,8 +83,6 @@ public: void release_resource(RuntimeState* state) override; Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - bool can_read(); - bool can_write(); protected: using ExecNode::debug_string; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 15d8124c653..a8b986d130e 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -317,11 +317,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) { return Status::OK(); } -bool VPartitionSortNode::can_read() { - std::lock_guard<std::mutex> lock(_buffer_mutex); - return !_blocks_buffer.empty() || _can_read; -} - Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_exec_timer); diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 481a99719fb..a9edca80df9 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -233,7 +233,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void debug_profile(); - bool can_read(); private: Status _init_hash_method(); diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 150ebeaf4f2..526064404a4 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -94,8 +94,6 @@ public: return _writer->sink(block, eos); } - bool can_write() override { return _writer->can_write(); } - Status close(RuntimeState* state, Status exec_status) override { // if the init failed, the _writer may be nullptr. so here need check if (_writer) { @@ -104,8 +102,6 @@ public: return DataSink::close(state, exec_status); } - [[nodiscard]] bool is_pending_finish() const override { return _writer->is_pending_finish(); } - protected: const std::vector<TExpr>& _t_output_expr; VExprContextSPtrs _output_vexpr_ctxs; diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h index 7cc057013aa..d6b85010c5a 100644 --- a/be/src/vec/sink/multi_cast_data_stream_sink.h +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -41,9 +41,6 @@ public: Status open(doris::RuntimeState* state) override { return Status::OK(); }; - // use sink to check can_write, now always true after we support spill to disk - bool can_write() override { return _multi_cast_data_streamer->can_write(); } - std::shared_ptr<pipeline::MultiCastDataStreamer>& get_multi_cast_data_streamer() { return _multi_cast_data_streamer; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 63f2aa19515..529a8256e77 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -925,22 +925,6 @@ void VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer* } } -bool VDataStreamSender::channel_all_can_write() { - if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) && - !_only_local_exchange) { - // This condition means we need use broadcast buffer, so we should make sure - // there are available buffer before running pipeline - return !_broadcast_pb_blocks->empty(); - } else { - for (auto channel : _channels) { - if (!channel->can_write()) { - return false; - } - } - return true; - } -} - template class Channel<pipeline::ExchangeSinkLocalState>; template class Channel<VDataStreamSender>; template class Channel<pipeline::ResultFileSinkLocalState>; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5ca31bcbe44..b6346787ebb 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -132,8 +132,6 @@ public: void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer); - bool channel_all_can_write(); - int sender_id() const { return _sender_id; } RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } @@ -327,17 +325,6 @@ public: virtual void ch_roll_pb_block(); - bool can_write() { - if (!is_local()) { - return true; - } - - // if local recvr queue mem over the exchange node mem limit, we must ensure each queue - // has one block to do merge sort in exchange node to prevent the logic dead lock - return !_local_recvr || _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0) || - _local_recvr->sender_queue_empty(_parent->sender_id()); - } - bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index eca9e65ab49..95266ba6de0 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -99,10 +99,6 @@ Status MemoryScratchSink::open(RuntimeState* state) { return VExpr::open(_output_vexpr_ctxs, state); } -bool MemoryScratchSink::can_write() { - return _queue->size() < 10; -} - Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); diff --git a/be/src/vec/sink/vmemory_scratch_sink.h b/be/src/vec/sink/vmemory_scratch_sink.h index 3a1dd9991d4..7e2d042cb8d 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.h +++ b/be/src/vec/sink/vmemory_scratch_sink.h @@ -62,8 +62,6 @@ public: Status close(RuntimeState* state, Status exec_status) override; - bool can_write() override; - private: Status _prepare_vexpr(RuntimeState* state); cctz::time_zone _timezone_obj; diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index b1426a48806..5e21dc13e12 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -65,13 +65,6 @@ public: virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0; - bool can_write() { - std::lock_guard l(_m); - return _data_queue_is_available() || _is_finished(); - } - - [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } - // sink the block date to date queue, it is async Status sink(Block* block, bool eos); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org