This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc524a5f564 [refactor](cleancode) remove unused code from be (#35756)
dc524a5f564 is described below

commit dc524a5f564f180f2514bf19821a22af0afdcbcd
Author: yiguolei <676222...@qq.com>
AuthorDate: Sat Jun 1 22:20:25 2024 +0800

    [refactor](cleancode) remove unused code from be (#35756)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 be/src/exec/data_sink.h                         |  4 ----
 be/src/exec/exec_node.h                         |  2 --
 be/src/pipeline/exec/exchange_sink_buffer.cpp   | 10 ----------
 be/src/pipeline/exec/exchange_sink_buffer.h     |  1 -
 be/src/pipeline/exec/multi_cast_data_streamer.h |  8 --------
 be/src/pipeline/pipeline_fragment_context.cpp   |  3 ---
 be/src/pipeline/pipeline_task.cpp               | 17 +++++++++--------
 be/src/pipeline/pipeline_task.h                 |  6 ++----
 be/src/vec/exec/vanalytic_eval_node.cpp         | 13 -------------
 be/src/vec/exec/vanalytic_eval_node.h           |  2 --
 be/src/vec/exec/vpartition_sort_node.cpp        |  5 -----
 be/src/vec/exec/vpartition_sort_node.h          |  1 -
 be/src/vec/sink/async_writer_sink.h             |  4 ----
 be/src/vec/sink/multi_cast_data_stream_sink.h   |  3 ---
 be/src/vec/sink/vdata_stream_sender.cpp         | 16 ----------------
 be/src/vec/sink/vdata_stream_sender.h           | 13 -------------
 be/src/vec/sink/vmemory_scratch_sink.cpp        |  4 ----
 be/src/vec/sink/vmemory_scratch_sink.h          |  2 --
 be/src/vec/sink/writer/async_result_writer.h    |  7 -------
 19 files changed, 11 insertions(+), 110 deletions(-)

diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index 5258929ba79..2d76078e7e5 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -71,8 +71,6 @@ public:
         return send(state, block, eos);
     }
 
-    [[nodiscard]] virtual bool is_pending_finish() const { return false; }
-
     // Releases all resources that were allocated in prepare()/send().
     // Further send() calls are illegal after calling close().
     // It must be okay to call this multiple times. Subsequent calls should
@@ -102,8 +100,6 @@ public:
 
     const RowDescriptor& row_desc() { return _row_desc; }
 
-    virtual bool can_write() { return true; }
-
     std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
 
 protected:
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 10b035835d7..2dedee61ba5 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -142,8 +142,6 @@ public:
         return Status::OK();
     }
 
-    bool can_read() const { return _can_read; }
-
     [[nodiscard]] virtual bool can_terminate_early() { return false; }
 
     // Sink Data to ExecNode to do some stock work, both need impl with 
method: get_result
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 8893db54cc5..e29991890f5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -106,16 +106,6 @@ void ExchangeSinkBuffer::close() {
     //_instance_to_request.clear();
 }
 
-bool ExchangeSinkBuffer::can_write() const {
-    size_t max_package_size =
-            config::exchg_buffer_queue_capacity_factor * 
_instance_to_package_queue.size();
-    size_t total_package_size = 0;
-    for (auto& [_, q] : _instance_to_package_queue) {
-        total_package_size += q.size();
-    }
-    return total_package_size <= max_package_size;
-}
-
 void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
     if (_finish_dependency && _should_stop && all_done) {
         _finish_dependency->set_ready();
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 8eed559e712..683a485f2ca 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -202,7 +202,6 @@ public:
 
     Status add_block(TransmitInfo&& request);
     Status add_block(BroadcastTransmitInfo&& request);
-    bool can_write() const;
     void close();
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t 
receive_rpc_time);
     void update_profile(RuntimeProfile* profile);
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 2078a729227..e812067e52c 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -56,14 +56,6 @@ public:
 
     Status push(RuntimeState* state, vectorized::Block* block, bool eos);
 
-    // use sink to check can_write, now always true after we support spill to 
disk
-    bool can_write() { return true; }
-
-    bool can_read(int sender_idx) {
-        std::lock_guard l(_mutex);
-        return _sender_pos_to_read[sender_idx] != _multi_cast_blocks.end() || 
_eos;
-    }
-
     const RowDescriptor& row_desc() { return _row_desc; }
 
     RuntimeProfile* profile() { return _profile; }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index dbfdaba6d91..8347892c6bf 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,9 +189,6 @@ void PipelineFragmentContext::cancel(const Status reason) {
     // _exec_env->result_queue_mgr()->update_queue_status(id, 
Status::Aborted(msg));
     for (auto& tasks : _tasks) {
         for (auto& task : tasks) {
-            if (task->is_finished()) {
-                continue;
-            }
             task->clear_blocking_state();
         }
     }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 52a76828804..c43410e68a4 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -429,7 +429,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState* 
state, int64_t revocable_m
 
 void PipelineTask::finalize() {
     std::unique_lock<std::mutex> lc(_dependency_lock);
-    _finished = true;
+    _finalized = true;
     _sink_shared_state.reset();
     _op_shared_states.clear();
     _le_state_map.clear();
@@ -475,17 +475,18 @@ std::string PipelineTask::debug_string() {
             debug_string_buffer,
             "PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run 
= {}, elapse time "
             "= {}s], block dependency = {}, is running = {}\noperators: ",
-            (void*)this, _opened, _eos, _finished, _dry_run, elapsed,
-            cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : 
"NULL", is_running());
+            (void*)this, _opened, _eos, _finalized, _dry_run, elapsed,
+            cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : 
"NULL",
+            is_running());
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}",
-                       _opened && !_finished ? 
_operators[i]->debug_string(_state, i)
-                                             : _operators[i]->debug_string(i));
+                       _opened && !_finalized ? 
_operators[i]->debug_string(_state, i)
+                                              : 
_operators[i]->debug_string(i));
     }
     fmt::format_to(debug_string_buffer, "\n{}\n",
-                   _opened && !_finished ? _sink->debug_string(_state, 
_operators.size())
-                                         : 
_sink->debug_string(_operators.size()));
-    if (_finished) {
+                   _opened && !_finalized ? _sink->debug_string(_state, 
_operators.size())
+                                          : 
_sink->debug_string(_operators.size()));
+    if (_finalized) {
         return fmt::to_string(debug_string_buffer);
     }
 
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 6bc65905be6..20c83f6a97e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -84,8 +84,6 @@ public:
 
     void finalize();
 
-    bool is_finished() const { return _finished.load(); }
-
     std::string debug_string();
 
     bool is_pending_finish() {
@@ -142,7 +140,7 @@ public:
     void clear_blocking_state() {
         // We use a lock to assure all dependencies are not deconstructed here.
         std::unique_lock<std::mutex> lc(_dependency_lock);
-        if (!_finished) {
+        if (!_finalized) {
             _execution_dep->set_always_ready();
             for (auto* dep : _filter_dependencies) {
                 dep->set_always_ready();
@@ -303,7 +301,7 @@ private:
 
     Dependency* _execution_dep = nullptr;
 
-    std::atomic<bool> _finished {false};
+    std::atomic<bool> _finalized {false};
     std::mutex _dependency_lock;
 
     std::atomic<bool> _running {false};
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp 
b/be/src/vec/exec/vanalytic_eval_node.cpp
index fbd49aa145a..410964c1969 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -298,19 +298,6 @@ void VAnalyticEvalNode::release_resource(RuntimeState* 
state) {
     return ExecNode::release_resource(state);
 }
 
-//TODO: maybe could have better strategy, not noly when need data to sink data
-//even could get some resources in advance as soon as possible
-bool VAnalyticEvalNode::can_write() {
-    return _need_more_input;
-}
-
-bool VAnalyticEvalNode::can_read() {
-    if (_need_more_input) {
-        return false;
-    }
-    return true;
-}
-
 Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/vec/exec/vanalytic_eval_node.h 
b/be/src/vec/exec/vanalytic_eval_node.h
index 45f7ce5b1e8..9b302b32ed6 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -83,8 +83,6 @@ public:
     void release_resource(RuntimeState* state) override;
     Status sink(doris::RuntimeState* state, vectorized::Block* input_block, 
bool eos) override;
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block, 
bool* eos) override;
-    bool can_read();
-    bool can_write();
 
 protected:
     using ExecNode::debug_string;
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
index 15d8124c653..a8b986d130e 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -317,11 +317,6 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* 
state) {
     return Status::OK();
 }
 
-bool VPartitionSortNode::can_read() {
-    std::lock_guard<std::mutex> lock(_buffer_mutex);
-    return !_blocks_buffer.empty() || _can_read;
-}
-
 Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* 
output_block,
                                 bool* eos) {
     SCOPED_TIMER(_exec_timer);
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index 481a99719fb..a9edca80df9 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -233,7 +233,6 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) 
override;
 
     void debug_profile();
-    bool can_read();
 
 private:
     Status _init_hash_method();
diff --git a/be/src/vec/sink/async_writer_sink.h 
b/be/src/vec/sink/async_writer_sink.h
index 150ebeaf4f2..526064404a4 100644
--- a/be/src/vec/sink/async_writer_sink.h
+++ b/be/src/vec/sink/async_writer_sink.h
@@ -94,8 +94,6 @@ public:
         return _writer->sink(block, eos);
     }
 
-    bool can_write() override { return _writer->can_write(); }
-
     Status close(RuntimeState* state, Status exec_status) override {
         // if the init failed, the _writer may be nullptr. so here need check
         if (_writer) {
@@ -104,8 +102,6 @@ public:
         return DataSink::close(state, exec_status);
     }
 
-    [[nodiscard]] bool is_pending_finish() const override { return 
_writer->is_pending_finish(); }
-
 protected:
     const std::vector<TExpr>& _t_output_expr;
     VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h 
b/be/src/vec/sink/multi_cast_data_stream_sink.h
index 7cc057013aa..d6b85010c5a 100644
--- a/be/src/vec/sink/multi_cast_data_stream_sink.h
+++ b/be/src/vec/sink/multi_cast_data_stream_sink.h
@@ -41,9 +41,6 @@ public:
 
     Status open(doris::RuntimeState* state) override { return Status::OK(); };
 
-    // use sink to check can_write, now always true after we support spill to 
disk
-    bool can_write() override { return _multi_cast_data_streamer->can_write(); 
}
-
     std::shared_ptr<pipeline::MultiCastDataStreamer>& 
get_multi_cast_data_streamer() {
         return _multi_cast_data_streamer;
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 63f2aa19515..529a8256e77 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -925,22 +925,6 @@ void 
VDataStreamSender::register_pipeline_channels(pipeline::ExchangeSinkBuffer*
     }
 }
 
-bool VDataStreamSender::channel_all_can_write() {
-    if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) 
&&
-        !_only_local_exchange) {
-        // This condition means we need use broadcast buffer, so we should 
make sure
-        // there are available buffer before running pipeline
-        return !_broadcast_pb_blocks->empty();
-    } else {
-        for (auto channel : _channels) {
-            if (!channel->can_write()) {
-                return false;
-            }
-        }
-        return true;
-    }
-}
-
 template class Channel<pipeline::ExchangeSinkLocalState>;
 template class Channel<VDataStreamSender>;
 template class Channel<pipeline::ResultFileSinkLocalState>;
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 5ca31bcbe44..b6346787ebb 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -132,8 +132,6 @@ public:
 
     void register_pipeline_channels(pipeline::ExchangeSinkBuffer* buffer);
 
-    bool channel_all_can_write();
-
     int sender_id() const { return _sender_id; }
 
     RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
@@ -327,17 +325,6 @@ public:
 
     virtual void ch_roll_pb_block();
 
-    bool can_write() {
-        if (!is_local()) {
-            return true;
-        }
-
-        // if local recvr queue mem over the exchange node mem limit, we must 
ensure each queue
-        // has one block to do merge sort in exchange node to prevent the 
logic dead lock
-        return !_local_recvr || _local_recvr->is_closed() || 
!_local_recvr->exceeds_limit(0) ||
-               _local_recvr->sender_queue_empty(_parent->sender_id());
-    }
-
     bool is_receiver_eof() const { return 
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
 
     void set_receiver_eof(Status st) { _receiver_status = st; }
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp 
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index eca9e65ab49..95266ba6de0 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -99,10 +99,6 @@ Status MemoryScratchSink::open(RuntimeState* state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
-bool MemoryScratchSink::can_write() {
-    return _queue->size() < 10;
-}
-
 Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
diff --git a/be/src/vec/sink/vmemory_scratch_sink.h 
b/be/src/vec/sink/vmemory_scratch_sink.h
index 3a1dd9991d4..7e2d042cb8d 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.h
+++ b/be/src/vec/sink/vmemory_scratch_sink.h
@@ -62,8 +62,6 @@ public:
 
     Status close(RuntimeState* state, Status exec_status) override;
 
-    bool can_write() override;
-
 private:
     Status _prepare_vexpr(RuntimeState* state);
     cctz::time_zone _timezone_obj;
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index b1426a48806..5e21dc13e12 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -65,13 +65,6 @@ public:
 
     virtual Status open(RuntimeState* state, RuntimeProfile* profile) = 0;
 
-    bool can_write() {
-        std::lock_guard l(_m);
-        return _data_queue_is_available() || _is_finished();
-    }
-
-    [[nodiscard]] bool is_pending_finish() const { return 
!_writer_thread_closed; }
-
     // sink the block date to date queue, it is async
     Status sink(Block* block, bool eos);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to