yiguolei commented on code in PR #45375: URL: https://github.com/apache/doris/pull/45375#discussion_r1946332074
########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -355,138 +397,61 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block return Status::EndOfFile("all data stream channels EOF"); } - if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { - // 1. serialize depends on it is not local exchange - // 2. send block - // 3. rollover block - if (local_state.only_local_exchange) { - if (!block->empty()) { - Status status; - size_t idx = 0; - for (auto& channel : local_state.channels) { - if (!channel->is_receiver_eof()) { - // If this channel is the last, we can move this block to downstream pipeline. - // Otherwise, this block also need to be broadcasted to other channels so should be copied. - DCHECK_GE(local_state._last_local_channel_idx, 0); - status = channel->send_local_block( - block, eos, idx == local_state._last_local_channel_idx); - HANDLE_CHANNEL_STATUS(state, channel, status); - } - idx++; - } - } - } else { - auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); - { - bool serialized = false; - RETURN_IF_ERROR(local_state._serializer.next_serialized_block( - block, block_holder->get_block(), local_state._rpc_channels_num, - &serialized, eos)); - if (serialized) { - auto cur_block = local_state._serializer.get_block()->to_block(); - if (!cur_block.empty()) { - DCHECK(eos || local_state._serializer.is_local()) << debug_string(state, 0); - RETURN_IF_ERROR(local_state._serializer.serialize_block( - &cur_block, block_holder->get_block(), - local_state._rpc_channels_num)); - } else { - block_holder->reset_block(); - } - - local_state._broadcast_pb_mem_limiter->acquire(*block_holder); - - size_t idx = 0; - bool moved = false; - for (auto& channel : local_state.channels) { - if (!channel->is_receiver_eof()) { - Status status; - if (channel->is_local()) { - // If this channel is the last, we can move this block to downstream pipeline. - // Otherwise, this block also need to be broadcasted to other channels so should be copied. - DCHECK_GE(local_state._last_local_channel_idx, 0); - status = channel->send_local_block( - &cur_block, eos, - idx == local_state._last_local_channel_idx); - moved = idx == local_state._last_local_channel_idx; - } else { - status = channel->send_broadcast_block(block_holder, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); - } - idx++; - } - if (moved) { - local_state._serializer.reset_block(); - } else { - cur_block.clear_column_data(); - local_state._serializer.get_block()->set_mutable_columns( - cur_block.mutate_columns()); - } - } + auto block_holder = vectorized::BroadcastPBlockHolder::create_shared(); + bool should_output = !block->empty() || eos; + size_t data_size = + _part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED ? block->bytes() : 0; + if (is_broadcast()) { + // For broadcast shuffling, we do accumulate a full data block in `local_state._serializer`. + RETURN_IF_ERROR(local_state._serializer.next_serialized_block(block, &should_output, eos)); + if (should_output) { + local_state._serializer.swap_data(block); + if (!local_state.only_local_exchange) { + RETURN_IF_ERROR(local_state._serializer.serialize_block( + block, block_holder->get_block(), local_state._rpc_channels_num)); } } - } else if (_part_type == TPartitionType::RANDOM) { - // 1. select channel - auto& current_channel = local_state.channels[local_state.current_channel_idx]; - if (!current_channel->is_receiver_eof()) { - // 2. serialize, send and rollover block - if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, eos, true); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } else { - auto pblock = std::make_unique<PBlock>(); - RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); - auto status = current_channel->send_remote_block(std::move(pblock), eos); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } + local_state._broadcast_pb_mem_limiter->acquire(*block_holder); + } + if (!should_output) { + return Status::OK(); + } + local_state._channel_selector->process_next_block(data_size); + RETURN_IF_ERROR(_writer->write(&local_state, state, block, eos)); + + { + SCOPED_TIMER(local_state._test_timer1); + bool moved = local_state._could_be_moved; + auto& channel_ids = local_state._channel_selector->next_channel_ids(); + for (auto channel_id : channel_ids) { + moved = moved && local_state.channels[channel_id]->is_local(); } - local_state.current_channel_idx = - (local_state.current_channel_idx + 1) % local_state.channels.size(); - } else if (_part_type == TPartitionType::HASH_PARTITIONED || - _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || - _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || - _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { - RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos)); - } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { - // Control the number of channels according to the flow, thereby controlling the number of table sink writers. - // 1. select channel - auto& current_channel = local_state.channels[local_state.current_channel_idx]; - if (!current_channel->is_receiver_eof()) { - // 2. serialize, send and rollover block - if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, eos, true); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } else { - auto pblock = std::make_unique<PBlock>(); - RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); - auto status = current_channel->send_remote_block(std::move(pblock), eos); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } - _data_processed += block->bytes(); + if (moved) { + *block = block->clone_empty(); } - - if (_writer_count < local_state.channels.size()) { - if (_data_processed >= - _writer_count * - config::table_sink_non_partition_write_scaling_data_processed_threshold) { - _writer_count++; - } + for (auto channel_id : channel_ids) { + RETURN_IF_ERROR(_writer->send_to_channels( + &local_state, state, local_state.channels[channel_id].get(), channel_id, eos, + is_broadcast() ? block_holder : nullptr, local_state._test_timer2, Review Comment: 为啥非broadcast,就是null了呢? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org