github-actions[bot] commented on code in PR #44469:
URL: https://github.com/apache/doris/pull/44469#discussion_r1883734068


##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +116,83 @@ size_t 
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' has cognitive complexity of 
63 (threshold 50) [readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:161:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (auto* tmp_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state()) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:165:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:174:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (inner_sink_state) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:179:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.rows() <= 1) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:182:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (spill_context) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:188:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (build_block.columns() > num_slots) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:193:** 
nesting level increased to 1
   ```cpp
       auto spill_func = [build_block = std::move(build_block), state, this]() 
mutable {
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:201:** 
nesting level increased to 2
   ```cpp
                         [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:205:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           while (offset < total_rows) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:209:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != build_block.columns(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:227:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (size_t i = 0; i != sub_block.rows(); ++i) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:231:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               for (uint32_t partition_idx = 0; partition_idx != 
p._partition_count; ++partition_idx) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:237:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (UNLIKELY(!partition_block)) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:245:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:245:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(partition_block->add_rows(&sub_block, 
begin, end));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:250:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:250:** +1
   ```cpp
                   if (partition_block->rows() >= reserved_size || 
is_last_block) {
                                                                ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:252:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:252:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(spilling_stream->spill_block(state, 
block, false));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:256:** +1, 
nesting level increased to 4
   ```cpp
                   } else {
                     ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:265:** 
nesting level increased to 1
   ```cpp
       auto exception_catch_func = [spill_func]() mutable {
                                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** 
nesting level increased to 2
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:79:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
       do {                                                                     
                    \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:84:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
           } catch (const doris::Exception& e) {                                
                    \
             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:266:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
           auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return 
spill_func()); }();
                                 ^
   ```
   **be/src/common/exception.h:85:** expanded from macro 
'RETURN_IF_CATCH_EXCEPTION'
   ```cpp
               if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {            
                    \
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:277:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:277:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
       DBUG_EXECUTE_IF(
       ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -99,29 +116,83 @@
     return mem_size;
 }
 
-Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState* 
state) {
+void PartitionedHashJoinSinkLocalState::update_memory_usage() {
+    if (!_shared_state->need_to_spill) {
+        if (_shared_state->inner_shared_state) {
+            auto* inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto* inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                COUNTER_SET(_memory_used_counter, 
inner_sink_state->_memory_used_counter->value());
+            }
+        }
+        return;
+    }
+
+    int64_t mem_size = 0;
+    auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+    for (auto& block : partitioned_blocks) {
+        if (block) {
+            mem_size += block->allocated_bytes();
+        }
+    }
+    COUNTER_SET(_memory_used_counter, mem_size);
+}
+
+size_t PartitionedHashJoinSinkLocalState::get_reserve_mem_size(RuntimeState* 
state, bool eos) {
+    size_t size_to_reserve = 0;
+    auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+    if (_shared_state->need_to_spill) {
+        size_to_reserve = p._partition_count * 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM;
+    } else {
+        if (_shared_state->inner_runtime_state) {
+            size_to_reserve = p._inner_sink_operator->get_reserve_mem_size(
+                    _shared_state->inner_runtime_state.get(), eos);
+        }
+    }
+
+    COUNTER_SET(_memory_usage_reserved, int64_t(size_to_reserve));
+    return size_to_reserve;
+}
+
+Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(

Review Comment:
   warning: function '_revoke_unpartitioned_block' exceeds recommended 
size/complexity thresholds [readability-function-size]
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:157:** 126 
lines including whitespace and comments (threshold 80)
   ```cpp
   Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                                             ^
   ```
   
   </details>
   



##########
be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:
##########
@@ -479,12 +521,9 @@
 Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,

Review Comment:
   warning: function 'sink' has cognitive complexity of 75 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
   Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                            ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:532:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (rows == 0) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:533:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:538:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               if (need_to_spill) {
               ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:540:** +1, 
nesting level increased to 3
   ```cpp
               } else {
                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:541:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   if 
(UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
                   ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:542:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(_setup_internal_operator(state));
                       ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:542:** +6, 
including nesting penalty of 5, nesting level increased to 6
   ```cpp
                       RETURN_IF_ERROR(_setup_internal_operator(state));
                       ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
                   ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:544:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink_eos", {
                   ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:550:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                   RETURN_IF_ERROR(_inner_sink_operator->sink(
                   ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:550:** +5, 
including nesting penalty of 4, nesting level increased to 5
   ```cpp
                   RETURN_IF_ERROR(_inner_sink_operator->sink(
                   ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:561:** 
nesting level increased to 3
   ```cpp
                             [&](auto& block) {
                             ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:562:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
                                 if (block) {
                                 ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:572:** +1, 
including nesting penalty of 0, nesting level increased to 1
   ```cpp
       if (need_to_spill) {
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:573:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:573:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0, 
rows));
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:574:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:576:** +1, 
nesting level increased to 2
   ```cpp
           } else if (revocable_mem_size(state) > 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
                  ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:579:** +1, 
nesting level increased to 1
   ```cpp
       } else {
         ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:580:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:581:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(_setup_internal_operator(state));
               ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:581:** +4, 
including nesting penalty of 3, nesting level increased to 4
   ```cpp
               RETURN_IF_ERROR(_setup_internal_operator(state));
               ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
           ^
   ```
   **be/src/util/debug_points.h:36:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
       if (UNLIKELY(config::enable_debug_points)) {                             
 \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:583:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::sink", {
           ^
   ```
   **be/src/util/debug_points.h:38:** expanded from macro 'DBUG_EXECUTE_IF'
   ```cpp
           if (dp) {                                                            
 \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:588:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           RETURN_IF_ERROR(_inner_sink_operator->sink(
           ^
   ```
   **be/src/common/status.h:642:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:588:** +3, 
including nesting penalty of 2, nesting level increased to 3
   ```cpp
           RETURN_IF_ERROR(_inner_sink_operator->sink(
           ^
   ```
   **be/src/common/status.h:644:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp:591:** +2, 
including nesting penalty of 1, nesting level increased to 2
   ```cpp
           if (eos) {
           ^
   ```
   
   </details>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to