This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e5b0d7a5cd [CTE](eof) Support cte reuse reduce counter by eof status and pipeline task mem can release (#20056) e5b0d7a5cd is described below commit e5b0d7a5cd8c0dfd5492b6e0a2ecc3ef812a7b6e Author: HappenLee <happen...@hotmail.com> AuthorDate: Fri May 26 22:03:29 2023 +0800 [CTE](eof) Support cte reuse reduce counter by eof status and pipeline task mem can release (#20056) --- .../exec/multi_cast_data_stream_source.cpp | 6 ++++++ .../pipeline/exec/multi_cast_data_stream_source.h | 2 ++ be/src/pipeline/exec/multi_cast_data_streamer.cpp | 25 ++++++++++++++++++++-- be/src/pipeline/exec/multi_cast_data_streamer.h | 6 ++++-- be/src/pipeline/pipeline_task.cpp | 4 +++- be/src/vec/sink/vdata_stream_sender.cpp | 5 ----- 6 files changed, 38 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 5334d2a669..9854d63120 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -61,4 +61,10 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto } return Status::OK(); } + +Status MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) { + _multi_cast_data_streamer->close_sender(_consumer_id); + return OperatorBase::close(state); +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index c44b37ee2e..3198c4a408 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -69,6 +69,8 @@ public: bool can_read() override; + Status close(doris::RuntimeState* state) override; + private: const int _consumer_id; std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer; diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 2262fb3930..3929c6ced0 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -48,13 +48,33 @@ void MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); } -void MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { +void MultiCastDataStreamer::close_sender(int sender_idx) { + std::lock_guard l(_mutex); + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; + while (pos_to_pull != _multi_cast_blocks.end()) { + if (pos_to_pull->_used_count == 1) { + DCHECK(pos_to_pull == _multi_cast_blocks.begin()); + _cumulative_mem_size -= pos_to_pull->_mem_size; + pos_to_pull++; + _multi_cast_blocks.pop_front(); + } else { + pos_to_pull->_used_count--; + pos_to_pull++; + } + } + _closed_sender_count++; +} + +Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { auto rows = block->rows(); COUNTER_UPDATE(_process_rows, rows); auto block_mem_size = block->allocated_bytes(); std::lock_guard l(_mutex); - int need_process_count = _cast_sender_count - _opened_sender_count; + int need_process_count = _cast_sender_count - _closed_sender_count; + if (need_process_count == 0) { + return Status::EndOfFile("All data streamer is EOF"); + } // TODO: if the [queue back block rows + block->rows()] < batch_size, better // do merge block. but need check the need_process_count and used_count whether // equal @@ -70,6 +90,7 @@ void MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* } } _eos = eos; + return Status::OK(); } } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 022761bf3d..92c0e24079 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -46,7 +46,9 @@ public: void pull(int sender_idx, vectorized::Block* block, bool* eos); - void push(RuntimeState* state, vectorized::Block* block, bool eos); + void close_sender(int sender_idx); + + Status push(RuntimeState* state, vectorized::Block* block, bool eos); // use sink to check can_write, now always true after we support spill to disk bool can_write() { return true; } @@ -73,7 +75,7 @@ private: std::mutex _mutex; bool _eos = false; int _cast_sender_count = 0; - int _opened_sender_count = 0; + int _closed_sender_count = 0; int64_t _cumulative_mem_size = 0; RuntimeProfile::Counter* _process_rows; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 41c4e6b549..853cd8ec0d 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -218,10 +218,12 @@ Status PipelineTask::execute(bool* eos) { *eos = _data_state == SourceState::FINISHED; if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - RETURN_IF_ERROR(_sink->sink(_state, block, _data_state)); + auto status = _sink->sink(_state, block, _data_state); + *eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos; if (*eos) { // just return, the scheduler will do finish work break; } + RETURN_IF_ERROR(status); } } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index a72c86d44d..6b03753c84 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -17,12 +17,10 @@ #include "vec/sink/vdata_stream_sender.h" -#include <butil/iobuf_inl.h> #include <fmt/format.h> #include <fmt/ranges.h> // IWYU pragma: keep #include <gen_cpp/DataSinks_types.h> #include <gen_cpp/Metrics_types.h> -#include <gen_cpp/PaloInternalService_types.h> #include <gen_cpp/data.pb.h> #include <gen_cpp/internal_service.pb.h> #include <opentelemetry/nostd/shared_ptr.h> @@ -36,13 +34,10 @@ #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" -#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/query_statistics.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "runtime/types.h" -#include "util/brpc_client_cache.h" #include "util/proto_util.h" #include "util/telemetry/telemetry.h" #include "vec/common/sip_hash.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org