This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch remove_code in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3d70d7ab90435a7919e49503c88e5bd0a6a7a86a Author: yiguolei <yiguo...@gmail.com> AuthorDate: Sat Jun 1 18:02:03 2024 +0800 [chore](cleancode) remove unused code from be --- be/src/exec/data_sink.h | 2 -- be/src/exec/exec_node.h | 2 -- be/src/vec/exec/vanalytic_eval_node.cpp | 13 ------------- be/src/vec/exec/vanalytic_eval_node.h | 2 -- be/src/vec/exec/vpartition_sort_node.cpp | 5 ----- be/src/vec/exec/vpartition_sort_node.h | 1 - be/src/vec/sink/async_writer_sink.h | 4 ---- be/src/vec/sink/multi_cast_data_stream_sink.h | 3 --- be/src/vec/sink/vdata_stream_sender.cpp | 16 ---------------- be/src/vec/sink/vdata_stream_sender.h | 13 ------------- be/src/vec/sink/vmemory_scratch_sink.cpp | 4 ---- be/src/vec/sink/vmemory_scratch_sink.h | 2 -- be/src/vec/sink/writer/async_result_writer.h | 7 ------- 13 files changed, 74 deletions(-) diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 5258929ba79..97842f5acb4 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -71,8 +71,6 @@ public: return send(state, block, eos); } - [[nodiscard]] virtual bool is_pending_finish() const { return false; } - // Releases all resources that were allocated in prepare()/send(). // Further send() calls are illegal after calling close(). // It must be okay to call this multiple times. Subsequent calls should diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 10b035835d7..2dedee61ba5 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -142,8 +142,6 @@ public: return Status::OK(); } - bool can_read() const { return _can_read; } - [[nodiscard]] virtual bool can_terminate_early() { return false; } // Sink Data to ExecNode to do some stock work, both need impl with method: get_result diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index fbd49aa145a..410964c1969 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -298,19 +298,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* state) { return ExecNode::release_resource(state); } -//TODO: maybe could have better strategy, not noly when need data to sink data -//even could get some resources in advance as soon as possible -bool VAnalyticEvalNode::can_write() { - return _need_more_input; -} - -bool VAnalyticEvalNode::can_read() { - if (_need_more_input) { - return false; - } - return true; -} - Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_CANCELLED(state); diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h index 45f7ce5b1e8..9b302b32ed6 100644 --- a/be/src/vec/exec/vanalytic_eval_node.h +++ b/be/src/vec/exec/vanalytic_eval_node.h @@ -83,8 +83,6 @@ public: void release_resource(RuntimeState* state) override; Status sink(doris::RuntimeState* state, vectorized::Block* input_block, bool eos) override; Status pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) override; - bool can_read(); - bool can_write(); protected: using ExecNode::debug_string; diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 15d8124c653..a8b986d130e 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -317,11 +317,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) { return Status::OK(); } -bool VPartitionSortNode::can_read() { - std::lock_guard<std::mutex> lock(_buffer_mutex); - return !_blocks_buffer.empty() || _can_read; -} - Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { SCOPED_TIMER(_exec_timer); diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 481a99719fb..a9edca80df9 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -233,7 +233,6 @@ public: Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void debug_profile(); - bool can_read(); private: Status _init_hash_method(); diff --git a/be/src/vec/sink/async_writer_sink.h b/be/src/vec/sink/async_writer_sink.h index 150ebeaf4f2..526064404a4 100644 --- a/be/src/vec/sink/async_writer_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -94,8 +94,6 @@ public: return _writer->sink(block, eos); } - bool can_write() override { return _writer->can_write(); } - Status close(RuntimeState* state, Status exec_status) override { // if the init failed, the _writer may be nullptr. so here need check if (_writer) { @@ -104,8 +102,6 @@ public: return DataSink::close(state, exec_status); } - [[nodiscard]] bool is_pending_finish() const override { return _writer->is_pending_finish(); } - protected: const std::vector<TExpr>& _t_output_expr; VExprContextSPtrs _output_vexpr_ctxs; diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h index 7cc057013aa..d6b85010c5a 100644 --- a/be/src/vec/sink/multi_cast_data_stream_sink.h +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -41,9 +41,6 @@ public: Status open(doris::RuntimeState* state) override { return Status::OK(); }; - // use sink to check can_write, now always true after we support spill to disk - bool can_write() override { return _multi_cast_data_streamer->can_write(); } - std::shared_ptr<pipeline::MultiCastDataStreamer>& get_multi_cast_data_streamer() { return _multi_cast_data_streamer; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 63f2aa19515..529a8256e77 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -925,22 +925,6 @@ void VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer* } } -bool VDataStreamSender::channel_all_can_write() { - if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) && - !_only_local_exchange) { - // This condition means we need use broadcast buffer, so we should make sure - // there are available buffer before running pipeline - return !_broadcast_pb_blocks->empty(); - } else { - for (auto channel : _channels) { - if (!channel->can_write()) { - return false; - } - } - return true; - } -} - template class Channel<pipeline::ExchangeSinkLocalState>; template class Channel<VDataStreamSender>; template class Channel<pipeline::ResultFileSinkLocalState>; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5ca31bcbe44..b6346787ebb 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -132,8 +132,6 @@ public: void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer); - bool channel_all_can_write(); - int sender_id() const { return _sender_id; } RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } @@ -327,17 +325,6 @@ public: virtual void ch_roll_pb_block(); - bool can_write() { - if (!is_local()) { - return true; - } - - // if local recvr queue mem over the exchange node mem limit, we must ensure each queue - // has one block to do merge sort in exchange node to prevent the logic dead lock - return !_local_recvr || _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0) || - _local_recvr->sender_queue_empty(_parent->sender_id()); - } - bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index eca9e65ab49..95266ba6de0 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -99,10 +99,6 @@ Status MemoryScratchSink::open(RuntimeState* state) { return VExpr::open(_output_vexpr_ctxs, state); } -bool MemoryScratchSink::can_write() { - return _queue->size() < 10; -} - Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); diff --git a/be/src/vec/sink/vmemory_scratch_sink.h b/be/src/vec/sink/vmemory_scratch_sink.h index 3a1dd9991d4..7e2d042cb8d 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.h +++ b/be/src/vec/sink/vmemory_scratch_sink.h @@ -62,8 +62,6 @@ public: Status close(RuntimeState* state, Status exec_status) override; - bool can_write() override; - private: Status _prepare_vexpr(RuntimeState* state); cctz::time_zone _timezone_obj; diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index b1426a48806..5e21dc13e12 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -65,13 +65,6 @@ public: virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0; - bool can_write() { - std::lock_guard l(_m); - return _data_queue_is_available() || _is_finished(); - } - - [[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; } - // sink the block date to date queue, it is async Status sink(Block* block, bool eos); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org