Gabriel39 commented on code in PR #34557:
URL: https://github.com/apache/doris/pull/34557#discussion_r1594888265


##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -46,36 +46,31 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
 
     auto get_data = [&](vectorized::Block* result_block) {
         do {
-            const auto* offset_start = &((
-                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+            const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
+                                       partitioned_block.second.offset_start;
             auto block_wrapper = partitioned_block.first;
             local_state._shared_state->sub_mem_usage(
                     local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
             mutable_block->add_rows(&block_wrapper->data_block, offset_start,
-                                    offset_start + 
std::get<2>(partitioned_block.second));
+                                    offset_start + 
partitioned_block.second.length);
             block_wrapper->unref(local_state._shared_state);
         } while (mutable_block->rows() < state->batch_size() &&
                  
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
         *result_block = mutable_block->to_block();
     };
-    if (_running_sink_operators == 0) {
-        if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-            SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = vectorized::MutableBlock::create_unique(
-                    partitioned_block.first->data_block.clone_empty());
-            get_data(block);
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            *eos = true;
-        }
-    } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+
+    if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {

Review Comment:
   Assume the following events occur in order:
   1. Source operator 
`_data_queue[local_state._channel_id].try_dequeue(partitioned_block)` return 
false.
   2. The last sink operator push the last block into `_data_queue`.
   3. The last sink operator sub `_running_sink_operators` to zero.
   4. Source operator thinks `_running_sink_operators` is true.
   
   And then, eos is true because of event 4. So we ignore this last block and 
its results will be incorrect.



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -186,23 +180,17 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            _free_blocks.enqueue(std::move(next_block));
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {

Review Comment:
   ditto



##########
be/src/pipeline/local_exchange/local_exchanger.cpp:
##########
@@ -361,23 +343,17 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            _free_blocks.enqueue(std::move(next_block));
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            *eos = true;
-        }
-    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+    if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to