github-actions[bot] commented on code in PR #44014: URL: https://github.com/apache/doris/pull/44014#discussion_r1843262771
########## be/src/pipeline/exec/multi_cast_data_streamer.cpp: ########## @@ -30,37 +47,115 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_ block->clear(); } -Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, Review Comment: warning: function 'pull' has cognitive complexity of 61 (threshold 50) [readability-function-cognitive-complexity] ```cpp Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/multi_cast_data_streamer.cpp:58:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!_cached_blocks[sender_idx].empty()) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:73:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (!_spill_readers[sender_idx].empty()) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:75:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (!reader_item->stream->ready_for_reading()) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:80:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(reader->open()); ^ ``` **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:80:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(reader->open()); ^ ``` **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:81:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp if (reader_item->block_offset != 0) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:86:** nesting level increased to 2 ```cpp auto spill_func = [this, reader_item, sender_idx]() { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:90:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp while (!spill_eos) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:91:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos)); ^ ``` **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:91:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_ERROR(reader_item->reader->read(&block, &spill_eos)); ^ ``` **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:92:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp if (!block.empty()) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:96:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp if (_cached_blocks[sender_idx].size() >= 32 || ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:103:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp if (spill_eos || !_cached_blocks[sender_idx].empty()) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:110:** nesting level increased to 2 ```cpp auto catch_exception_func = [spill_func = std::move(spill_func)]() { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_CATCH_EXCEPTION(return spill_func();); ^ ``` **be/src/common/exception.h:79:** expanded from macro 'RETURN_IF_CATCH_EXCEPTION' ```cpp do { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +4, including nesting penalty of 3, nesting level increased to 4 ```cpp RETURN_IF_CATCH_EXCEPTION(return spill_func();); ^ ``` **be/src/common/exception.h:84:** expanded from macro 'RETURN_IF_CATCH_EXCEPTION' ```cpp } catch (const doris::Exception& e) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +5, including nesting penalty of 4, nesting level increased to 5 ```cpp RETURN_IF_CATCH_EXCEPTION(return spill_func();); ^ ``` **be/src/common/exception.h:85:** expanded from macro 'RETURN_IF_CATCH_EXCEPTION' ```cpp if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:120:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable))); ^ ``` **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:120:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable))); ^ ``` **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:126:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (pos_to_pull == end) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:143:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (pos_to_pull == end) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:146:** +2, including nesting penalty of 1, nesting level increased to 2 ```cpp RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); ^ ``` **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR' ```cpp do { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:146:** +3, including nesting penalty of 2, nesting level increased to 3 ```cpp RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); ^ ``` **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR' ```cpp if (UNLIKELY(!_status_.ok())) { \ ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:149:** +1, including nesting penalty of 0, nesting level increased to 1 ```cpp if (use_count == 0) { ^ ``` **be/src/pipeline/exec/multi_cast_data_streamer.cpp:155:** +1, nesting level increased to 1 ```cpp } else { ^ ``` </details> ########## be/src/pipeline/exec/multi_cast_data_streamer.h: ########## @@ -31,30 +42,53 @@ struct MultiCastBlock { size_t _mem_size; }; +struct SpillingReader { + vectorized::SpillReaderUPtr reader; + vectorized::SpillStreamSPtr stream; + int64_t block_offset {0}; + bool all_data_read {false}; +}; + // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and refactor the // code class MultiCastDataStreamer { public: - MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int cast_sender_count, + MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState* shared_state, + ObjectPool* pool, int cast_sender_count, int32_t node_id, bool with_dependencies = false) : _row_desc(row_desc), + _shared_state(shared_state), _profile(pool->add(new RuntimeProfile("MultiCastDataStreamSink"))), - _cast_sender_count(cast_sender_count) { + _cached_blocks(cast_sender_count), + _cast_sender_count(cast_sender_count), + _node_id(node_id), + _spill_readers(cast_sender_count), + _source_profiles(cast_sender_count) { _sender_pos_to_read.resize(cast_sender_count, _multi_cast_blocks.end()); if (with_dependencies) { _dependencies.resize(cast_sender_count, nullptr); } + _spill_dependency = Dependency::create_shared(_node_id, _node_id, + "MultiCastDataStreamerDependency", true); + + for (int i = 0; i != cast_sender_count; ++i) { + _spill_read_dependencies.emplace_back(Dependency::create_shared( + node_id, node_id, "MultiCastReadSpillDependency", true)); + } _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES); _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT); }; - ~MultiCastDataStreamer() = default; + ~MultiCastDataStreamer() { Review Comment: warning: use '= default' to define a trivial destructor [modernize-use-equals-default] ```cpp ~MultiCastDataStreamer() { ^ ``` ########## be/src/pipeline/exec/multi_cast_data_streamer.cpp: ########## @@ -30,37 +47,115 @@ block->clear(); } -Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { +Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, Review Comment: warning: function 'pull' exceeds recommended size/complexity thresholds [readability-function-size] ```cpp Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, ^ ``` <details> <summary>Additional context</summary> **be/src/pipeline/exec/multi_cast_data_streamer.cpp:49:** 111 lines including whitespace and comments (threshold 80) ```cpp Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectorized::Block* block, ^ ``` </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org