github-actions[bot] commented on code in PR #44014:
URL: https://github.com/apache/doris/pull/44014#discussion_r1843262771


##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -30,37 +47,115 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, 
int used_count, int un_
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' has cognitive complexity of 61 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:58:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_cached_blocks[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:73:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (!_spill_readers[sender_idx].empty()) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:75:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (!reader_item->stream->ready_for_reading()) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:80:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:80:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(reader->open());
               ^
   ```
   **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:81:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               if (reader_item->block_offset != 0) {
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:86:** nesting level 
increased to 2
   ```cpp
               auto spill_func = [this, reader_item, sender_idx]() {
                                 ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:90:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   while (!spill_eos) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:91:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:91:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
                       ^
   ```
   **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:92:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                       if (!block.empty()) {
                       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:96:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                           if (_cached_blocks[sender_idx].size() >= 32 ||
                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:103:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   if (spill_eos || !_cached_blocks[sender_idx].empty()) {
                   ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:110:** nesting level 
increased to 2
   ```cpp
               auto catch_exception_func = [spill_func = 
std::move(spill_func)]() {
                                           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +4, including 
nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:111:** +5, including 
nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_CATCH_EXCEPTION(return spill_func(););
                   ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:120:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:120:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
               ^
   ```
   **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:126:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:143:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (pos_to_pull == end) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:146:** +2, including 
nesting penalty of 1, nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
               ^
   ```
   **be/src/common/status.h:639:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:146:** +3, including 
nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
               ^
   ```
   **be/src/common/status.h:641:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:149:** +1, including 
nesting penalty of 0, nesting level increased to 1
   ```cpp
           if (use_count == 0) {
           ^
   ```
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:155:** +1, nesting level 
increased to 1
   ```cpp
           } else {
             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.h:
##########
@@ -31,30 +42,53 @@ struct MultiCastBlock {
     size_t _mem_size;
 };
 
+struct SpillingReader {
+    vectorized::SpillReaderUPtr reader;
+    vectorized::SpillStreamSPtr stream;
+    int64_t block_offset {0};
+    bool all_data_read {false};
+};
+
 // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and 
refactor the
 // code
 class MultiCastDataStreamer {
 public:
-    MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count,
+    MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState* 
shared_state,
+                          ObjectPool* pool, int cast_sender_count, int32_t 
node_id,
                           bool with_dependencies = false)
             : _row_desc(row_desc),
+              _shared_state(shared_state),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
-              _cast_sender_count(cast_sender_count) {
+              _cached_blocks(cast_sender_count),
+              _cast_sender_count(cast_sender_count),
+              _node_id(node_id),
+              _spill_readers(cast_sender_count),
+              _source_profiles(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
         if (with_dependencies) {
             _dependencies.resize(cast_sender_count, nullptr);
         }
 
+        _spill_dependency = Dependency::create_shared(_node_id, _node_id,
+                                                      
"MultiCastDataStreamerDependency", true);
+
+        for (int i = 0; i != cast_sender_count; ++i) {
+            _spill_read_dependencies.emplace_back(Dependency::create_shared(
+                    node_id, node_id, "MultiCastReadSpillDependency", true));
+        }
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
         _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
     };
 
-    ~MultiCastDataStreamer() = default;
+    ~MultiCastDataStreamer() {

Review Comment:
   warning: use '= default' to define a trivial destructor 
[modernize-use-equals-default]
   ```cpp
       ~MultiCastDataStreamer() {
       ^
   ```
   



##########
be/src/pipeline/exec/multi_cast_data_streamer.cpp:
##########
@@ -30,37 +47,115 @@
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,

Review Comment:
   warning: function 'pull' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/multi_cast_data_streamer.cpp:49:** 111 lines 
including whitespace and comments (threshold 80)
   ```cpp
   Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
                                 ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to