yiguolei commented on code in PR #45375:
URL: https://github.com/apache/doris/pull/45375#discussion_r1946332074


##########
be/src/pipeline/exec/exchange_sink_operator.cpp:
##########
@@ -355,138 +397,61 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
         return Status::EndOfFile("all data stream channels EOF");
     }
 
-    if (_part_type == TPartitionType::UNPARTITIONED || 
local_state.channels.size() == 1) {
-        // 1. serialize depends on it is not local exchange
-        // 2. send block
-        // 3. rollover block
-        if (local_state.only_local_exchange) {
-            if (!block->empty()) {
-                Status status;
-                size_t idx = 0;
-                for (auto& channel : local_state.channels) {
-                    if (!channel->is_receiver_eof()) {
-                        // If this channel is the last, we can move this block 
to downstream pipeline.
-                        // Otherwise, this block also need to be broadcasted 
to other channels so should be copied.
-                        DCHECK_GE(local_state._last_local_channel_idx, 0);
-                        status = channel->send_local_block(
-                                block, eos, idx == 
local_state._last_local_channel_idx);
-                        HANDLE_CHANNEL_STATUS(state, channel, status);
-                    }
-                    idx++;
-                }
-            }
-        } else {
-            auto block_holder = 
vectorized::BroadcastPBlockHolder::create_shared();
-            {
-                bool serialized = false;
-                RETURN_IF_ERROR(local_state._serializer.next_serialized_block(
-                        block, block_holder->get_block(), 
local_state._rpc_channels_num,
-                        &serialized, eos));
-                if (serialized) {
-                    auto cur_block = 
local_state._serializer.get_block()->to_block();
-                    if (!cur_block.empty()) {
-                        DCHECK(eos || local_state._serializer.is_local()) << 
debug_string(state, 0);
-                        
RETURN_IF_ERROR(local_state._serializer.serialize_block(
-                                &cur_block, block_holder->get_block(),
-                                local_state._rpc_channels_num));
-                    } else {
-                        block_holder->reset_block();
-                    }
-
-                    
local_state._broadcast_pb_mem_limiter->acquire(*block_holder);
-
-                    size_t idx = 0;
-                    bool moved = false;
-                    for (auto& channel : local_state.channels) {
-                        if (!channel->is_receiver_eof()) {
-                            Status status;
-                            if (channel->is_local()) {
-                                // If this channel is the last, we can move 
this block to downstream pipeline.
-                                // Otherwise, this block also need to be 
broadcasted to other channels so should be copied.
-                                DCHECK_GE(local_state._last_local_channel_idx, 
0);
-                                status = channel->send_local_block(
-                                        &cur_block, eos,
-                                        idx == 
local_state._last_local_channel_idx);
-                                moved = idx == 
local_state._last_local_channel_idx;
-                            } else {
-                                status = 
channel->send_broadcast_block(block_holder, eos);
-                            }
-                            HANDLE_CHANNEL_STATUS(state, channel, status);
-                        }
-                        idx++;
-                    }
-                    if (moved) {
-                        local_state._serializer.reset_block();
-                    } else {
-                        cur_block.clear_column_data();
-                        
local_state._serializer.get_block()->set_mutable_columns(
-                                cur_block.mutate_columns());
-                    }
-                }
+    auto block_holder = vectorized::BroadcastPBlockHolder::create_shared();
+    bool should_output = !block->empty() || eos;
+    size_t data_size =
+            _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED ? 
block->bytes() : 0;
+    if (is_broadcast()) {
+        // For broadcast shuffling, we do accumulate a full data block in 
`local_state._serializer`.
+        RETURN_IF_ERROR(local_state._serializer.next_serialized_block(block, 
&should_output, eos));
+        if (should_output) {
+            local_state._serializer.swap_data(block);
+            if (!local_state.only_local_exchange) {
+                RETURN_IF_ERROR(local_state._serializer.serialize_block(
+                        block, block_holder->get_block(), 
local_state._rpc_channels_num));
             }
         }
-    } else if (_part_type == TPartitionType::RANDOM) {
-        // 1. select channel
-        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
-        if (!current_channel->is_receiver_eof()) {
-            // 2. serialize, send and rollover block
-            if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, eos, 
true);
-                HANDLE_CHANNEL_STATUS(state, current_channel, status);
-            } else {
-                auto pblock = std::make_unique<PBlock>();
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
-                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
-                HANDLE_CHANNEL_STATUS(state, current_channel, status);
-            }
+        local_state._broadcast_pb_mem_limiter->acquire(*block_holder);
+    }
+    if (!should_output) {
+        return Status::OK();
+    }
+    local_state._channel_selector->process_next_block(data_size);
+    RETURN_IF_ERROR(_writer->write(&local_state, state, block, eos));
+
+    {
+        SCOPED_TIMER(local_state._test_timer1);
+        bool moved = local_state._could_be_moved;
+        auto& channel_ids = local_state._channel_selector->next_channel_ids();
+        for (auto channel_id : channel_ids) {
+            moved = moved && local_state.channels[channel_id]->is_local();
         }
-        local_state.current_channel_idx =
-                (local_state.current_channel_idx + 1) % 
local_state.channels.size();
-    } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
-               _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
-               _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
-               _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
-        RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, 
eos));
-    } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
-        // Control the number of channels according to the flow, thereby 
controlling the number of table sink writers.
-        // 1. select channel
-        auto& current_channel = 
local_state.channels[local_state.current_channel_idx];
-        if (!current_channel->is_receiver_eof()) {
-            // 2. serialize, send and rollover block
-            if (current_channel->is_local()) {
-                auto status = current_channel->send_local_block(block, eos, 
true);
-                HANDLE_CHANNEL_STATUS(state, current_channel, status);
-            } else {
-                auto pblock = std::make_unique<PBlock>();
-                RETURN_IF_ERROR(local_state._serializer.serialize_block(block, 
pblock.get()));
-                auto status = 
current_channel->send_remote_block(std::move(pblock), eos);
-                HANDLE_CHANNEL_STATUS(state, current_channel, status);
-            }
-            _data_processed += block->bytes();
+        if (moved) {
+            *block = block->clone_empty();
         }
-
-        if (_writer_count < local_state.channels.size()) {
-            if (_data_processed >=
-                _writer_count *
-                        
config::table_sink_non_partition_write_scaling_data_processed_threshold) {
-                _writer_count++;
-            }
+        for (auto channel_id : channel_ids) {
+            RETURN_IF_ERROR(_writer->send_to_channels(
+                    &local_state, state, 
local_state.channels[channel_id].get(), channel_id, eos,
+                    is_broadcast() ? block_holder : nullptr, 
local_state._test_timer2,

Review Comment:
   为啥非broadcast,就是null了呢?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to