zhangstar333 commented on code in PR #46262:
URL: https://github.com/apache/doris/pull/46262#discussion_r1900696080


##########
be/src/pipeline/exec/partition_sort_source_operator.cpp:
##########
@@ -41,44 +41,45 @@ Status 
PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     output_block->clear_column_data();
+
+    auto get_data_from_blocks_buffer = false;
     {
         std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
-        if (local_state._shared_state->blocks_buffer.empty() == false) {
+        get_data_from_blocks_buffer = 
!local_state._shared_state->blocks_buffer.empty();
+        if (get_data_from_blocks_buffer) {
             
local_state._shared_state->blocks_buffer.front().swap(*output_block);
             local_state._shared_state->blocks_buffer.pop();
-            //if buffer have no data and sink not eos, block reading and wait 
for signal again
-            RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
-                    local_state._conjuncts, output_block, 
output_block->columns()));
+
             if (local_state._shared_state->blocks_buffer.empty() &&
-                local_state._shared_state->sink_eos == false) {
+                !local_state._shared_state->sink_eos) {
                 // add this mutex to check, as in some case maybe is doing 
block(), and the sink is doing set eos.
                 // so have to hold mutex to set block(), avoid to sink have 
set eos and set ready, but here set block() by mistake
                 std::unique_lock<std::mutex> 
lc(local_state._shared_state->sink_eos_lock);
-                if (local_state._shared_state->sink_eos == false) {
+                if (!local_state._shared_state->sink_eos) {
                     local_state._dependency->block();
                 }
             }
-            if (!output_block->empty()) {
-                local_state._num_rows_returned += output_block->rows();
-            }
-            return Status::OK();
         }
     }
 
-    // is_ready_for_read: this is set by sink node using: 
local_state._dependency->set_ready_for_read()
-    // notice: must output block from _blocks_buffer firstly, and then 
get_sorted_block.
-    // as when the child is eos, then set _can_read = true, and 
_partition_sorts have push_back sorter.
-    // if we move the _blocks_buffer output at last(behind 286 line),
-    // it's maybe eos but not output all data: when _blocks_buffer.empty() and 
_can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
-    RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
-    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
-                                                           
output_block->columns()));
-    {
-        std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
+    if (!get_data_from_blocks_buffer) {
+        // is_ready_for_read: this is set by sink node using: 
local_state._dependency->set_ready_for_read()
+        // notice: must output block from _blocks_buffer firstly, and then 
get_sorted_block.
+        // as when the child is eos, then set _can_read = true, and 
_partition_sorts have push_back sorter.
+        // if we move the _blocks_buffer output at last(behind 286 line),
+        // it's maybe eos but not output all data: when _blocks_buffer.empty() 
and _can_read = false (this: _sort_idx && _partition_sorts.size() are 0)
+        RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
+        {
+            std::lock_guard<std::mutex> 
lock(local_state._shared_state->buffer_mutex);
 
-        *eos = local_state._shared_state->blocks_buffer.empty() &&
-               local_state._sort_idx >= 
local_state._shared_state->partition_sorts.size();
+            *eos = local_state._shared_state->blocks_buffer.empty() &&
+                   local_state._sort_idx >= 
local_state._shared_state->partition_sorts.size();
+        }
     }
+
+    //if buffer have no data and sink not eos, block reading and wait for 
signal again
+    
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,

Review Comment:
   could move into 84L



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to