This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b0d7a5cd [CTE](eof) Support cte reuse reduce counter by eof status 
and pipeline task mem can release (#20056)
e5b0d7a5cd is described below

commit e5b0d7a5cd8c0dfd5492b6e0a2ecc3ef812a7b6e
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Fri May 26 22:03:29 2023 +0800

    [CTE](eof) Support cte reuse reduce counter by eof status and pipeline task 
mem can release (#20056)
---
 .../exec/multi_cast_data_stream_source.cpp         |  6 ++++++
 .../pipeline/exec/multi_cast_data_stream_source.h  |  2 ++
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  | 25 ++++++++++++++++++++--
 be/src/pipeline/exec/multi_cast_data_streamer.h    |  6 ++++--
 be/src/pipeline/pipeline_task.cpp                  |  4 +++-
 be/src/vec/sink/vdata_stream_sender.cpp            |  5 -----
 6 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 5334d2a669..9854d63120 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -61,4 +61,10 @@ Status 
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
     }
     return Status::OK();
 }
+
+Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
+    _multi_cast_data_streamer->close_sender(_consumer_id);
+    return OperatorBase::close(state);
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index c44b37ee2e..3198c4a408 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -69,6 +69,8 @@ public:
 
     bool can_read() override;
 
+    Status close(doris::RuntimeState* state) override;
+
 private:
     const int _consumer_id;
     std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 2262fb3930..3929c6ced0 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -48,13 +48,33 @@ void MultiCastDataStreamer::pull(int sender_idx, 
doris::vectorized::Block* block
     *eos = _eos and pos_to_pull == _multi_cast_blocks.end();
 }
 
-void MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
+void MultiCastDataStreamer::close_sender(int sender_idx) {
+    std::lock_guard l(_mutex);
+    auto& pos_to_pull = _sender_pos_to_read[sender_idx];
+    while (pos_to_pull != _multi_cast_blocks.end()) {
+        if (pos_to_pull->_used_count == 1) {
+            DCHECK(pos_to_pull == _multi_cast_blocks.begin());
+            _cumulative_mem_size -= pos_to_pull->_mem_size;
+            pos_to_pull++;
+            _multi_cast_blocks.pop_front();
+        } else {
+            pos_to_pull->_used_count--;
+            pos_to_pull++;
+        }
+    }
+    _closed_sender_count++;
+}
+
+Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
     auto rows = block->rows();
     COUNTER_UPDATE(_process_rows, rows);
 
     auto block_mem_size = block->allocated_bytes();
     std::lock_guard l(_mutex);
-    int need_process_count = _cast_sender_count - _opened_sender_count;
+    int need_process_count = _cast_sender_count - _closed_sender_count;
+    if (need_process_count == 0) {
+        return Status::EndOfFile("All data streamer is EOF");
+    }
     // TODO: if the [queue back block rows + block->rows()] < batch_size, 
better
     // do merge block. but need check the need_process_count and used_count 
whether
     // equal
@@ -70,6 +90,7 @@ void MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block*
         }
     }
     _eos = eos;
+    return Status::OK();
 }
 
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 022761bf3d..92c0e24079 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -46,7 +46,9 @@ public:
 
     void pull(int sender_idx, vectorized::Block* block, bool* eos);
 
-    void push(RuntimeState* state, vectorized::Block* block, bool eos);
+    void close_sender(int sender_idx);
+
+    Status push(RuntimeState* state, vectorized::Block* block, bool eos);
 
     // use sink to check can_write, now always true after we support spill to 
disk
     bool can_write() { return true; }
@@ -73,7 +75,7 @@ private:
     std::mutex _mutex;
     bool _eos = false;
     int _cast_sender_count = 0;
-    int _opened_sender_count = 0;
+    int _closed_sender_count = 0;
     int64_t _cumulative_mem_size = 0;
 
     RuntimeProfile::Counter* _process_rows;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 41c4e6b549..853cd8ec0d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -218,10 +218,12 @@ Status PipelineTask::execute(bool* eos) {
         *eos = _data_state == SourceState::FINISHED;
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
-            RETURN_IF_ERROR(_sink->sink(_state, block, _data_state));
+            auto status = _sink->sink(_state, block, _data_state);
+            *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;
             if (*eos) { // just return, the scheduler will do finish work
                 break;
             }
+            RETURN_IF_ERROR(status);
         }
     }
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index a72c86d44d..6b03753c84 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -17,12 +17,10 @@
 
 #include "vec/sink/vdata_stream_sender.h"
 
-#include <butil/iobuf_inl.h>
 #include <fmt/format.h>
 #include <fmt/ranges.h> // IWYU pragma: keep
 #include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <opentelemetry/nostd/shared_ptr.h>
@@ -36,13 +34,10 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/query_statistics.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
-#include "util/brpc_client_cache.h"
 #include "util/proto_util.h"
 #include "util/telemetry/telemetry.h"
 #include "vec/common/sip_hash.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to