This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 82f2878bd4e [fix](scan) Avoid memory allocated by buffered_reader from being traced (#42558) 82f2878bd4e is described below commit 82f2878bd4e632ec4564b5e61f51b84fcce7056c Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Oct 28 15:07:06 2024 +0800 [fix](scan) Avoid memory allocated by buffered_reader from being traced (#42558) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --------- Co-authored-by: Pxl <pxl...@qq.com> Co-authored-by: Gabriel <gabrielleeb...@gmail.com> --- be/src/io/fs/buffered_reader.cpp | 19 +- be/src/io/fs/buffered_reader.h | 11 +- be/src/pipeline/exec/exchange_sink_buffer.cpp | 49 +--- be/src/pipeline/exec/exchange_sink_buffer.h | 28 +- be/src/pipeline/exec/exchange_sink_operator.cpp | 135 +++++----- be/src/pipeline/exec/exchange_sink_operator.h | 29 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 12 +- be/src/pipeline/exec/result_file_sink_operator.h | 9 - be/src/util/slice.h | 6 + be/src/vec/runtime/partitioner.cpp | 41 +-- be/src/vec/runtime/partitioner.h | 49 +--- be/src/vec/sink/vdata_stream_sender.cpp | 291 +++++---------------- be/src/vec/sink/vdata_stream_sender.h | 232 +++++----------- 13 files changed, 290 insertions(+), 621 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 43445ed42ef..ce4767dd040 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -23,6 +23,7 @@ #include <algorithm> #include <chrono> +#include <memory> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" @@ -31,6 +32,7 @@ #include "runtime/thread_context.h" #include "runtime/workload_management/io_throttle.h" #include "util/runtime_profile.h" +#include "util/slice.h" #include "util/threadpool.h" namespace doris { @@ -270,7 +272,7 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off } if (copy_out != nullptr) { memcpy(copy_out + to_handle - remaining, - _boxes[box_index] + cached_data.box_start_offset[i], box_to_handle); + _boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle); } remaining -= box_to_handle; cached_data.box_start_offset[i] += box_to_handle; @@ -307,14 +309,15 @@ void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t off Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read, const IOContext* io_ctx) { - if (_read_slice == nullptr) { - _read_slice = new char[READ_SLICE_SIZE]; + if (!_read_slice) { + _read_slice = std::make_unique<OwnedSlice>(READ_SLICE_SIZE); } + *bytes_read = 0; { SCOPED_RAW_TIMER(&_statistics.read_time); - RETURN_IF_ERROR( - _reader->read_at(start_offset, Slice(_read_slice, to_read), bytes_read, io_ctx)); + RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read), + bytes_read, io_ctx)); _statistics.merged_io++; _statistics.merged_bytes += *bytes_read; } @@ -328,8 +331,8 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz auto fill_box = [&](int16 fill_box_ref, uint32 box_usage, size_t box_copy_end) { size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage); - memcpy(_boxes[fill_box_ref] + box_usage, _read_slice + copy_start - start_offset, - copy_size); + memcpy(_boxes[fill_box_ref].data() + box_usage, + _read_slice->data() + copy_start - start_offset, copy_size); filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size); copy_start += copy_size; _last_box_ref = fill_box_ref; @@ -367,7 +370,7 @@ Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, siz } // apply for new box to copy data while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) { - _boxes.emplace_back(new char[BOX_SIZE]); + _boxes.emplace_back(BOX_SIZE); _box_ref.emplace_back(0); fill_box(_boxes.size() - 1, 0, range_copy_end); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 70c8445db23..907ea11b216 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -168,12 +168,7 @@ public: } } - ~MergeRangeFileReader() override { - delete[] _read_slice; - for (char* box : _boxes) { - delete[] box; - } - } + ~MergeRangeFileReader() override = default; Status close() override { if (!_closed) { @@ -244,8 +239,8 @@ private: bool _closed = false; size_t _remaining; - char* _read_slice = nullptr; - std::vector<char*> _boxes; + std::unique_ptr<OwnedSlice> _read_slice; + std::vector<OwnedSlice> _boxes; int16 _last_box_ref = -1; uint32 _last_box_usage = 0; std::vector<int16> _box_ref; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index f0f66774092..31973997059 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -86,14 +86,13 @@ void BroadcastPBlockHolderMemLimiter::release(const BroadcastPBlockHolder& holde } // namespace vectorized namespace pipeline { - ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, PlanNodeId node_id, int be_number, RuntimeState* state, ExchangeSinkLocalState* parent) : HasTaskExecutionCtx(state), _queue_capacity(0), _is_finishing(false), - _query_id(query_id), + _query_id(std::move(query_id)), _dest_node_id(dest_node_id), _sender_id(send_id), _node_id(node_id), @@ -111,12 +110,6 @@ void ExchangeSinkBuffer::close() { //_instance_to_request.clear(); } -void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) { - if (_finish_dependency && _should_stop && all_done) { - _finish_dependency->set_ready(); - } -} - void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { if (_is_finishing) { return; @@ -136,7 +129,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; - _instance_to_rpc_ctx[low_id] = {}; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_time[low_id] = 0; _construct_request(low_id, finst_id); @@ -161,7 +153,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (_rpc_channel_is_idle[ins_id]) { send_now = true; _rpc_channel_is_idle[ins_id] = false; - _busy_channels++; } if (request.block) { RETURN_IF_ERROR( @@ -202,7 +193,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { if (_rpc_channel_is_idle[ins_id]) { send_now = true; _rpc_channel_is_idle[ins_id] = false; - _busy_channels++; } if (request.block_holder->get_block()) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( @@ -227,7 +217,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _instance_to_broadcast_package_queue[id]; if (_is_finishing) { - _turn_off_channel(id); + _turn_off_channel(id, lock); return Status::OK(); } @@ -245,9 +235,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } auto send_callback = request.channel->get_send_callback(id, request.eos); - _instance_to_rpc_ctx[id]._send_callback = send_callback; - _instance_to_rpc_ctx[id].is_cancelled = false; - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); @@ -326,12 +313,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { brpc_request->set_allocated_block(request.block_holder->get_block()); } auto send_callback = request.channel->get_send_callback(id, request.eos); - - ExchangeRpcContext rpc_ctx; - rpc_ctx._send_callback = send_callback; - rpc_ctx.is_cancelled = false; - _instance_to_rpc_ctx[id] = rpc_ctx; - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::exchange_sink_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); @@ -395,7 +376,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } broadcast_q.pop(); } else { - _turn_off_channel(id); + _rpc_channel_is_idle[id] = true; } return Status::OK(); @@ -425,7 +406,7 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id); + _turn_off_channel(id, lock); } } @@ -434,14 +415,12 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { << ",_sender_id: " << _sender_id << ", node id: " << _node_id << ", err: " << err; _is_finishing = true; _context->cancel(Status::Cancelled(err)); - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id, true); } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; - _turn_off_channel(id, true); + _turn_off_channel(id, lock); std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; for (; !broadcast_q.empty(); broadcast_q.pop()) { @@ -473,17 +452,17 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { return _instance_to_receiver_eof[id]; } -void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) { +// The unused parameter `with_lock` is to ensure that the function is called when the lock is held. +void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, + std::unique_lock<std::mutex>& /*with_lock*/) { if (!_rpc_channel_is_idle[id]) { _rpc_channel_is_idle[id] = true; - auto all_done = _busy_channels.fetch_sub(1) == 1; - _set_ready_to_finish(all_done); - if (cleanup && all_done) { - auto weak_task_ctx = weak_task_exec_ctx(); - if (auto pip_ctx = weak_task_ctx.lock()) { - _parent->set_reach_limit(); - } - } + } + _instance_to_receiver_eof[id] = true; + + auto weak_task_ctx = weak_task_exec_ctx(); + if (auto pip_ctx = weak_task_ctx.lock()) { + _parent->on_channel_finished(id); } } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 60193b1d7e6..972e366c027 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -24,6 +24,7 @@ #include <parallel_hashmap/phmap.h> #include <atomic> +#include <cstdint> #include <list> #include <memory> #include <mutex> @@ -50,7 +51,7 @@ class ExchangeSinkLocalState; } // namespace pipeline namespace vectorized { -class PipChannel; +class Channel; // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock // will be shared between different channel, so we have to use a ref count to mark if this @@ -110,14 +111,14 @@ private: namespace pipeline { struct TransmitInfo { - vectorized::PipChannel* channel = nullptr; + vectorized::Channel* channel = nullptr; std::unique_ptr<PBlock> block; bool eos; Status exec_status; }; struct BroadcastTransmitInfo { - vectorized::PipChannel* channel = nullptr; + vectorized::Channel* channel = nullptr; std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr; bool eos; }; @@ -177,11 +178,6 @@ private: bool _eos; }; -struct ExchangeRpcContext { - std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback; - bool is_cancelled = false; -}; - // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: @@ -202,12 +198,10 @@ public: _finish_dependency = finish_dependency; } - void set_should_stop() { - _should_stop = true; - _set_ready_to_finish(_busy_channels == 0); - } - void set_low_memory_mode() { _queue_capacity = 8; } + void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) { + _broadcast_dependency = broadcast_dependency; + } private: friend class ExchangeSinkLocalState; @@ -230,11 +224,9 @@ private: phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> _instance_to_request; // One channel is corresponding to a downstream instance. phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle; - // Number of busy channels; - std::atomic<int> _busy_channels = 0; + phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof; phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time; - phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext> _instance_to_rpc_ctx; std::atomic<bool> _is_finishing; PUniqueId _query_id; @@ -254,14 +246,14 @@ private: inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); inline bool _is_receiver_eof(InstanceLoId id); - inline void _turn_off_channel(InstanceLoId id, bool cleanup = false); + inline void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); std::atomic<int> _total_queue_size = 0; std::shared_ptr<Dependency> _queue_dependency = nullptr; std::shared_ptr<Dependency> _finish_dependency = nullptr; - std::atomic<bool> _should_stop = false; + std::shared_ptr<Dependency> _broadcast_dependency = nullptr; ExchangeSinkLocalState* _parent = nullptr; }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 3cfc5e45cfe..14290145664 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -23,6 +23,7 @@ #include <gen_cpp/types.pb.h> #include <memory> +#include <mutex> #include <random> #include "common/status.h" @@ -31,6 +32,8 @@ #include "pipeline/exec/operator.h" #include "pipeline/exec/sort_source_operator.h" #include "pipeline/local_exchange/local_exchange_sink_operator.h" +#include "util/runtime_profile.h" +#include "util/uid_util.h" #include "vec/columns/column_const.h" #include "vec/exprs/vexpr.h" @@ -68,8 +71,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); _overall_throughput = _profile->add_derived_counter( "OverallThroughput", TUnit::BYTES_PER_SECOND, - std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter, - _profile->total_time_counter()), + [this]() { + return RuntimeProfile::units_per_second(_bytes_sent_counter, + _profile->total_time_counter()); + }, ""); _merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime"); _local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent", TUnit::BYTES); @@ -84,15 +89,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf const auto& fragment_instance_id = p._dests[i].fragment_instance_id; if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) { - channel_shared_ptrs.emplace_back( - new vectorized::PipChannel(this, p._row_desc, p._dests[i].brpc_server, - fragment_instance_id, p._dest_node_id)); - fragment_id_to_channel_index.emplace(fragment_instance_id.lo, - channel_shared_ptrs.size() - 1); - channels.push_back(channel_shared_ptrs.back().get()); + channels.push_back(std::make_shared<vectorized::Channel>( + this, p._dests[i].brpc_server, fragment_instance_id, p._dest_node_id)); + fragment_id_to_channel_index.emplace(fragment_instance_id.lo, channels.size() - 1); + + if (fragment_instance_id.hi != -1 && fragment_instance_id.lo != -1) { + _working_channels_count++; + } } else { - channel_shared_ptrs.emplace_back( - channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]); + channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]); } } @@ -106,6 +111,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf return Status::OK(); } +void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { + std::lock_guard<std::mutex> lock(_finished_channels_mutex); + + if (_finished_channels.contains(channel_id)) { + LOG(WARNING) << "query: " << print_id(_state->query_id()) + << ", on_channel_finished on already finished channel: " << channel_id; + return; + } else { + _finished_channels.emplace(channel_id); + if (_working_channels_count.fetch_sub(1) == 1) { + set_reach_limit(); + if (_finish_dependency) { + _finish_dependency->set_ready(); + } + } + } +} + Status ExchangeSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); @@ -140,7 +163,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "ExchangeSinkQueueDependency", true); _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); - _finish_dependency->block(); } if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && @@ -151,7 +173,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); } else if (local_size > 0) { size_t dep_id = 0; - for (auto* channel : channels) { + for (auto& channel : channels) { if (channel->is_local()) { if (auto dep = channel->get_local_channel_dependency()) { _local_channels_dependency.push_back(dep); @@ -166,16 +188,18 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } if (_part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); - _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( - channels.size())); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + channels.size()); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", fmt::format("Crc32HashPartitioner({})", _partition_count)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { - _partition_count = channel_shared_ptrs.size(); - _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( - channel_shared_ptrs.size())); + _partition_count = channels.size(); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + channels.size()); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner", @@ -221,12 +245,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { } else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { _partition_count = channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer; - _partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>( - _partition_count)); - _partition_function.reset(new HashPartitionFunction(_partitioner.get())); + _partitioner = + std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>( + _partition_count); + _partition_function = std::make_unique<HashPartitionFunction>(_partitioner.get()); - scale_writer_partitioning_exchanger.reset(new vectorized::ScaleWriterPartitioningExchanger< - HashPartitionFunction>( + scale_writer_partitioning_exchanger = std::make_unique< + vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>( channels.size(), *_partition_function, _partition_count, channels.size(), 1, config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold / state->task_num() == @@ -239,7 +264,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { 0 ? config::table_sink_partition_write_min_data_processed_rebalance_threshold : config::table_sink_partition_write_min_data_processed_rebalance_threshold / - state->task_num())); + state->task_num()); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); @@ -358,7 +383,7 @@ void ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT Status st) { channel->set_receiver_eof(st); // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. - static_cast<void>(channel->close(state, Status::OK())); + static_cast<void>(channel->close(state)); } Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block, bool eos) { @@ -367,7 +392,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows()); SCOPED_TIMER(local_state.exec_time_counter()); bool all_receiver_eof = true; - for (auto* channel : local_state.channels) { + for (auto& channel : local_state.channels) { if (!channel->is_receiver_eof()) { all_receiver_eof = false; break; @@ -394,13 +419,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block if (!block->empty()) { Status status; size_t idx = 0; - for (auto* channel : local_state.channels) { + for (auto& channel : local_state.channels) { if (!channel->is_receiver_eof()) { // If this channel is the last, we can move this block to downstream pipeline. // Otherwise, this block also need to be broadcasted to other channels so should be copied. DCHECK_GE(local_state._last_local_channel_idx, 0); status = channel->send_local_block( - block, idx == local_state._last_local_channel_idx); + block, eos, idx == local_state._last_local_channel_idx); HANDLE_CHANNEL_STATUS(state, channel, status); } idx++; @@ -430,7 +455,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block size_t idx = 0; bool moved = false; - for (auto* channel : local_state.channels) { + for (auto& channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { @@ -438,7 +463,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block // Otherwise, this block also need to be broadcasted to other channels so should be copied. DCHECK_GE(local_state._last_local_channel_idx, 0); status = channel->send_local_block( - &cur_block, idx == local_state._last_local_channel_idx); + &cur_block, eos, + idx == local_state._last_local_channel_idx); moved = idx == local_state._last_local_channel_idx; } else { status = channel->send_broadcast_block(block_holder, eos); @@ -459,20 +485,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } else if (_part_type == TPartitionType::RANDOM) { // 1. select channel - vectorized::PipChannel* current_channel = - local_state.channels[local_state.current_channel_idx]; + auto& current_channel = local_state.channels[local_state.current_channel_idx]; if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, true); + auto status = current_channel->send_local_block(block, eos, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - RETURN_IF_ERROR(local_state._serializer.serialize_block( - block, current_channel->ch_cur_pb_block())); - auto status = - current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); + auto pblock = std::make_unique<PBlock>(); + RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); + auto status = current_channel->send_remote_block(std::move(pblock), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); - current_channel->ch_roll_pb_block(); } } local_state.current_channel_idx = @@ -494,7 +517,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos)); } else { RETURN_IF_ERROR(channel_add_rows( - state, local_state.channel_shared_ptrs, local_state._partition_count, + state, local_state.channels, local_state._partition_count, local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos)); } int64_t new_channel_mem_usage = 0; @@ -578,20 +601,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) { // Control the number of channels according to the flow, thereby controlling the number of table sink writers. // 1. select channel - vectorized::PipChannel* current_channel = - local_state.channels[local_state.current_channel_idx]; + auto& current_channel = local_state.channels[local_state.current_channel_idx]; if (!current_channel->is_receiver_eof()) { // 2. serialize, send and rollover block if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block, true); + auto status = current_channel->send_local_block(block, eos, true); HANDLE_CHANNEL_STATUS(state, current_channel, status); } else { - RETURN_IF_ERROR(local_state._serializer.serialize_block( - block, current_channel->ch_cur_pb_block())); - auto status = - current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos); + auto pblock = std::make_unique<PBlock>(); + RETURN_IF_ERROR(local_state._serializer.serialize_block(block, pblock.get())); + auto status = current_channel->send_remote_block(std::move(pblock), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); - current_channel->ch_roll_pb_block(); } _data_processed += block->bytes(); } @@ -613,15 +633,12 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block Status final_st = Status::OK(); if (eos) { local_state._serializer.reset_block(); - for (int i = 0; i < local_state.channels.size(); ++i) { - Status st = local_state.channels[i]->close(state, Status::OK()); + for (auto& channel : local_state.channels) { + Status st = channel->close(state); if (!st.ok() && final_st.ok()) { final_st = st; } } - if (local_state._sink_buffer) { - local_state._sink_buffer->set_should_stop(); - } } return final_st; } @@ -645,8 +662,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec } void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer* buffer) { - for (auto channel : channels) { - ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer); + for (auto& channel : channels) { + channel->register_exchange_buffer(buffer); } } @@ -693,12 +710,12 @@ std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); if (_sink_buffer) { - fmt::format_to( - debug_string_buffer, - ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " - "_reach_limit: {}", - _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), - _sink_buffer->_is_finishing.load(), _reach_limit.load()); + fmt::format_to(debug_string_buffer, + ", Sink Buffer: (_is_finishing = {}, blocks in queue: {}, queue capacity: " + "{}, queue dep: {}), _reach_limit: {}, working channels: {}", + _sink_buffer->_is_finishing.load(), _sink_buffer->_total_queue_size, + _sink_buffer->_queue_capacity, (void*)_sink_buffer->_queue_dependency.get(), + _reach_limit.load(), _working_channels_count.load()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index a34237145a7..8af944728a2 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -19,7 +19,9 @@ #include <stdint.h> +#include <atomic> #include <memory> +#include <mutex> #include "common/status.h" #include "exchange_sink_buffer.h" @@ -53,13 +55,10 @@ private: public: ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state), - current_channel_idx(0), - only_local_exchange(false), - _serializer(this) { + : Base(parent, state), _serializer(this) { _finish_dependency = std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", true); + parent->get_name() + "_FINISH_DEPENDENCY", false); } std::vector<Dependency*> dependencies() const override { @@ -112,10 +111,11 @@ public: return Status::OK(); } Status _send_new_partition_batch(); - std::vector<vectorized::PipChannel*> channels; - std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs; - int current_channel_idx; // index of current channel to send to if _random == true - bool only_local_exchange; + std::vector<std::shared_ptr<vectorized::Channel>> channels; + int current_channel_idx {0}; // index of current channel to send to if _random == true + bool only_local_exchange {false}; + + void on_channel_finished(InstanceLoId channel_id); // for external table sink hash partition std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>> @@ -123,9 +123,8 @@ public: private: friend class ExchangeSinkOperatorX; - friend class vectorized::Channel<ExchangeSinkLocalState>; - friend class vectorized::PipChannel; - friend class vectorized::BlockSerializer<ExchangeSinkLocalState>; + friend class vectorized::Channel; + friend class vectorized::BlockSerializer; std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr; RuntimeProfile::Counter* _serialize_batch_timer = nullptr; @@ -154,7 +153,7 @@ private: int _sender_id; std::shared_ptr<vectorized::BroadcastPBlockHolderMemLimiter> _broadcast_pb_mem_limiter; - vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer; + vectorized::BlockSerializer _serializer; std::shared_ptr<Dependency> _queue_dependency = nullptr; std::shared_ptr<Dependency> _broadcast_dependency = nullptr; @@ -203,6 +202,10 @@ private: std::unique_ptr<HashPartitionFunction> _partition_function = nullptr; std::atomic<bool> _reach_limit = false; int _last_local_channel_idx = -1; + + std::atomic_int _working_channels_count = 0; + std::set<InstanceLoId> _finished_channels; + std::mutex _finished_channels_mutex; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> { diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index a11c4df6625..93026427b86 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -31,9 +31,7 @@ namespace doris::pipeline { ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>(parent, state), - _serializer( - std::make_unique<vectorized::BlockSerializer<ResultFileSinkLocalState>>(this)) {} + : AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>(parent, state) {} ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr) @@ -145,14 +143,6 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) return Base::close(state, exec_status); } -template <typename ChannelPtrType> -void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, - Status st) { - channel->set_receiver_eof(st); - // Chanel will not send RPC to the downstream when eof, so close chanel by OK status. - static_cast<void>(channel->close(state, Status::OK())); -} - Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index e99eb709a9f..7268efe4de4 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -21,10 +21,6 @@ #include "vec/sink/writer/vfile_result_writer.h" namespace doris::vectorized { -template <typename Parent> -class BlockSerializer; -template <typename Parent> -class Channel; class BroadcastPBlockHolder; } // namespace doris::vectorized @@ -55,13 +51,8 @@ public: private: friend class ResultFileSinkOperatorX; - template <typename ChannelPtrType> - void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); - std::shared_ptr<BufferControlBlock> _sender; - std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels; - std::unique_ptr<vectorized::BlockSerializer<ResultFileSinkLocalState>> _serializer; std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder; RuntimeProfile::Counter* _brpc_wait_timer = nullptr; RuntimeProfile::Counter* _local_send_timer = nullptr; diff --git a/be/src/util/slice.h b/be/src/util/slice.h index b38b1147894..fd6bcf0adfb 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -344,6 +344,10 @@ class OwnedSlice : private Allocator<false, false, false, DefaultMemoryAllocator public: OwnedSlice() : _slice((uint8_t*)nullptr, 0) {} + OwnedSlice(size_t length) + : _slice(reinterpret_cast<char*>(Allocator::alloc(length)), length), + _capacity(length) {} + OwnedSlice(OwnedSlice&& src) : _slice(src._slice), _capacity(src._capacity) { src._slice.data = nullptr; src._slice.size = 0; @@ -369,6 +373,8 @@ public: } } + char* data() const { return _slice.data; } + const Slice& slice() const { return _slice; } private: diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index 89656a74508..660ffe51a83 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -24,9 +24,8 @@ namespace doris::vectorized { -template <typename HashValueType, typename ChannelIds> -Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* state, - Block* block) const { +template <typename ChannelIds> +Status Crc32HashPartitioner<ChannelIds>::do_partitioning(RuntimeState* state, Block* block) const { int rows = block->rows(); if (rows > 0) { @@ -55,47 +54,23 @@ Status Partitioner<HashValueType, ChannelIds>::do_partitioning(RuntimeState* sta template <typename ChannelIds> void Crc32HashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const { - column->update_crcs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, + column->update_crcs_with_value(result, _partition_expr_ctxs[idx]->root()->type().type, column->size()); } -template <typename ChannelIds> -void XXHashPartitioner<ChannelIds>::_do_hash(const ColumnPtr& column, uint64_t* __restrict result, - int /*idx*/) const { - column->update_hashes_with_value(result); -} - -template <typename ChannelIds> -Status XXHashPartitioner<ChannelIds>::clone(RuntimeState* state, - std::unique_ptr<PartitionerBase>& partitioner) { - auto* new_partitioner = new XXHashPartitioner(Base::_partition_count); - partitioner.reset(new_partitioner); - new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); - for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( - state, new_partitioner->_partition_expr_ctxs[i])); - } - return Status::OK(); -} - template <typename ChannelIds> Status Crc32HashPartitioner<ChannelIds>::clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) { - auto* new_partitioner = new Crc32HashPartitioner(Base::_partition_count); + auto* new_partitioner = new Crc32HashPartitioner<ChannelIds>(_partition_count); partitioner.reset(new_partitioner); - new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); - for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( - state, new_partitioner->_partition_expr_ctxs[i])); + new_partitioner->_partition_expr_ctxs.resize(_partition_expr_ctxs.size()); + for (size_t i = 0; i < _partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR( + _partition_expr_ctxs[i]->clone(state, new_partitioner->_partition_expr_ctxs[i])); } return Status::OK(); } -template class Partitioner<size_t, pipeline::LocalExchangeChannelIds>; -template class XXHashPartitioner<pipeline::LocalExchangeChannelIds>; -template class Partitioner<size_t, ShuffleChannelIds>; -template class XXHashPartitioner<ShuffleChannelIds>; -template class Partitioner<uint32_t, ShuffleChannelIds>; template class Crc32HashPartitioner<ShuffleChannelIds>; template class Crc32HashPartitioner<SpillPartitionChannelIds>; diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 5607a83327b..e8feb74335a 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -58,11 +58,11 @@ protected: const size_t _partition_count; }; -template <typename HashValueType, typename ChannelIds> -class Partitioner : public PartitionerBase { +template <typename ChannelIds> +class Crc32HashPartitioner : public PartitionerBase { public: - Partitioner(int partition_count) : PartitionerBase(partition_count) {} - ~Partitioner() override = default; + Crc32HashPartitioner(int partition_count) : PartitionerBase(partition_count) {} + ~Crc32HashPartitioner() override = default; Status init(const std::vector<TExpr>& texprs) override { return VExpr::create_expr_trees(texprs, _partition_expr_ctxs); @@ -76,9 +76,9 @@ public: Status do_partitioning(RuntimeState* state, Block* block) const override; - ChannelField get_channel_ids() const override { - return {_hash_vals.data(), sizeof(HashValueType)}; - } + ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(uint32_t)}; } + + Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override; protected: Status _get_partition_column_result(Block* block, std::vector<int>& result) const { @@ -89,38 +89,17 @@ protected: return Status::OK(); } - virtual void _do_hash(const ColumnPtr& column, HashValueType* __restrict result, - int idx) const = 0; + void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const; VExprContextSPtrs _partition_expr_ctxs; - mutable std::vector<HashValueType> _hash_vals; + mutable std::vector<uint32_t> _hash_vals; }; -template <typename ChannelIds> -class XXHashPartitioner final : public Partitioner<uint64_t, ChannelIds> { -public: - using Base = Partitioner<uint64_t, ChannelIds>; - XXHashPartitioner(int partition_count) : Partitioner<uint64_t, ChannelIds>(partition_count) {} - ~XXHashPartitioner() override = default; - - Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override; - -private: - void _do_hash(const ColumnPtr& column, uint64_t* __restrict result, int idx) const override; -}; - -template <typename ChannelIds> -class Crc32HashPartitioner final : public Partitioner<uint32_t, ChannelIds> { -public: - using Base = Partitioner<uint32_t, ChannelIds>; - Crc32HashPartitioner(int partition_count) - : Partitioner<uint32_t, ChannelIds>(partition_count) {} - ~Crc32HashPartitioner() override = default; - - Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) override; - -private: - void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; +struct ShuffleChannelIds { + template <typename HashValueType> + HashValueType operator()(HashValueType l, size_t r) { + return l % r; + } }; struct SpillPartitionChannelIds { diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9c9a11d60aa..9e437afa8e7 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -54,8 +54,7 @@ namespace doris::vectorized { -template <typename Parent> -Status Channel<Parent>::init(RuntimeState* state) { +Status Channel::init(RuntimeState* state) { if (_brpc_dest_addr.hostname.empty()) { LOG(WARNING) << "there is no brpc destination address's hostname" ", maybe version is not compatible."; @@ -64,9 +63,11 @@ Status Channel<Parent>::init(RuntimeState* state) { if (state->query_options().__isset.enable_local_exchange) { _is_local &= state->query_options().enable_local_exchange; } + if (_is_local) { return Status::OK(); } + if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) { _brpc_stub = state->exec_env()->brpc_internal_client_cache()->get_client( "127.0.0.1", _brpc_dest_addr.port); @@ -83,8 +84,7 @@ Status Channel<Parent>::init(RuntimeState* state) { return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::open(RuntimeState* state) { +Status Channel::open(RuntimeState* state) { if (_is_local) { auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr( _fragment_instance_id, _dest_node_id, &_local_recvr); @@ -94,19 +94,6 @@ Status Channel<Parent>::open(RuntimeState* state) { } } _be_number = state->be_number(); - _brpc_request = std::make_shared<PTransmitDataParams>(); - // initialize brpc request - _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi); - _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo); - _finst_id = _brpc_request->finst_id(); - - _brpc_request->mutable_query_id()->set_hi(state->query_id().hi); - _brpc_request->mutable_query_id()->set_lo(state->query_id().lo); - _query_id = _brpc_request->query_id(); - - _brpc_request->set_node_id(_dest_node_id); - _brpc_request->set_sender_id(_parent->sender_id()); - _brpc_request->set_be_number(_be_number); const auto& query_options = state->query_options(); if (query_options.__isset.query_timeout) { @@ -121,28 +108,26 @@ Status Channel<Parent>::open(RuntimeState* state) { // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1); + _state = state; return Status::OK(); } -std::shared_ptr<pipeline::Dependency> PipChannel::get_local_channel_dependency() { - if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) { +std::shared_ptr<pipeline::Dependency> Channel::get_local_channel_dependency() { + if (!_local_recvr) { return nullptr; } - return Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency( - Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id()); + return _local_recvr->get_local_channel_dependency(_parent->sender_id()); } -int64_t PipChannel::mem_usage() const { - auto* mutable_block = Channel<pipeline::ExchangeSinkLocalState>::_serializer.get_block(); +int64_t Channel::mem_usage() const { + auto* mutable_block = _serializer.get_block(); int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0; return mem_usage; } -Status PipChannel::send_remote_block(PBlock* block, bool eos, Status exec_status) { - COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(), 1); - std::unique_ptr<PBlock> pblock_ptr; - pblock_ptr.reset(block); +Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) { + COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); if (eos) { if (_eos_send) { @@ -152,13 +137,13 @@ Status PipChannel::send_remote_block(PBlock* block, bool eos, Status exec_status } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos, exec_status})); + RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, Status::OK()})); } return Status::OK(); } -Status PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos) { - COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(), 1); +Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos) { + COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); if (eos) { if (_eos_send) { return Status::OK(); @@ -171,210 +156,88 @@ Status PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& return Status::OK(); } -Status PipChannel::send_current_block(bool eos, Status exec_status) { - if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) { - return Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos); +Status Channel::_send_current_block(bool eos) { + if (is_local()) { + return _send_local_block(eos); } - RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status)); - return Status::OK(); + return send_remote_block(std::move(_pblock), eos); } -template <typename Parent> -Status Channel<Parent>::send_current_block(bool eos, Status exec_status) { - // FIXME: Now, local exchange will cause the performance problem is in a multi-threaded scenario - // so this feature is turned off here by default. We need to re-examine this logic - if (is_local()) { - return send_local_block(exec_status, eos); +Status Channel::_send_local_block(bool eos) { + Block block; + if (_serializer.get_block() != nullptr) { + block = _serializer.get_block()->to_block(); + _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); } - if (eos) { - RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); + + if (!block.empty() || eos) { + RETURN_IF_ERROR(send_local_block(&block, eos, true)); } - RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status)); - ch_roll_pb_block(); return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::send_local_block(Status exec_status, bool eos) { +Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) { SCOPED_TIMER(_parent->local_send_timer()); - Block block = _serializer.get_block()->to_block(); - _serializer.get_block()->set_mutable_columns(block.clone_empty_columns()); - if (_recvr_is_valid()) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes()); - COUNTER_UPDATE(_parent->local_sent_rows(), block.rows()); - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); - } - _local_recvr->add_block(&block, _parent->sender_id(), true); - if (eos) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); + if (eos) { + if (_eos_send) { + return Status::OK(); + } else { + _eos_send = true; } - return Status::OK(); - } else { - _serializer.reset_block(); - return _receiver_status; } -} -template <typename Parent> -Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) { - SCOPED_TIMER(_parent->local_send_timer()); - if (_recvr_is_valid()) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); - COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); - COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); - } - _local_recvr->add_block(block, _parent->sender_id(), can_be_moved); - return Status::OK(); - } else { + if (is_receiver_eof()) { return _receiver_status; } -} -template <typename Parent> -Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { + auto receiver_status = _recvr_status(); + if (receiver_status.ok()) { + COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes()); + COUNTER_UPDATE(_parent->local_sent_rows(), block->rows()); COUNTER_UPDATE(_parent->blocks_sent_counter(), 1); - } - SCOPED_TIMER(_parent->brpc_send_timer()); - if (_send_remote_block_callback == nullptr) { - _send_remote_block_callback = DummyBrpcCallback<PTransmitDataResult>::create_shared(); - } else { - RETURN_IF_ERROR(_wait_last_brpc()); - _send_remote_block_callback->cntl_->Reset(); - } - VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" << print_id(_fragment_instance_id) - << " dest_node=" << _dest_node_id << " to_host=" << _brpc_dest_addr.hostname - << " _packet_seq=" << _packet_seq << " row_desc=" << _row_desc.debug_string(); - - _brpc_request->set_eos(eos); - if (!exec_status.ok()) { - exec_status.to_protobuf(_brpc_request->mutable_exec_status()); - } - if (block != nullptr && !block->column_metas().empty()) { - _brpc_request->set_allocated_block(block); - } - _brpc_request->set_packet_seq(_packet_seq++); - - _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { - _send_remote_block_callback->cntl_->ignore_eovercrowded(); - } - - { - auto send_remote_block_closure = - AutoReleaseClosure<PTransmitDataParams, DummyBrpcCallback<PTransmitDataResult>>:: - create_unique(_brpc_request, _send_remote_block_callback); - if (enable_http_send_block(*_brpc_request)) { - RETURN_IF_ERROR(transmit_block_httpv2( - _state->exec_env(), std::move(send_remote_block_closure), _brpc_dest_addr)); - } else { - transmit_blockv2(*_brpc_stub, std::move(send_remote_block_closure)); + const auto sender_id = _parent->sender_id(); + if (!block->empty()) [[likely]] { + _local_recvr->add_block(block, sender_id, can_be_moved); } - } - - if (block != nullptr) { - static_cast<void>(_brpc_request->release_block()); - } - return Status::OK(); -} -template <typename Parent> -Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { - if (_fragment_instance_id.lo == -1) { + if (eos) [[unlikely]] { + _local_recvr->remove_sender(sender_id, _be_number, Status::OK()); + _parent->on_channel_finished(_fragment_instance_id.lo); + } return Status::OK(); + } else { + _receiver_status = std::move(receiver_status); + _parent->on_channel_finished(_fragment_instance_id.lo); + return _receiver_status; } - - bool serialized = false; - RETURN_IF_ERROR( - _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, eos, &rows)); - if (serialized) { - RETURN_IF_ERROR(send_current_block(false, Status::OK())); - } - - return Status::OK(); } -template <typename Parent> -Status Channel<Parent>::close_wait(RuntimeState* state) { - if (_need_close) { - Status st = _wait_last_brpc(); - if (st.is<ErrorCode::END_OF_FILE>()) { - st = Status::OK(); - } else if (!st.ok()) { - state->log_error(st.to_string()); - } - _need_close = false; - return st; +Status Channel::close(RuntimeState* state) { + if (_closed) { + return Status::OK(); } - _serializer.reset_block(); - return Status::OK(); -} + _closed = true; -template <typename Parent> -Status Channel<Parent>::close_internal(Status exec_status) { if (!_need_close) { return Status::OK(); } - VLOG_RPC << "Channel::close_internal() instance_id=" << print_id(_fragment_instance_id) - << " dest_node=" << _dest_node_id << " #rows= " - << ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows()) - << " receiver status: " << _receiver_status << ", exec_status: " << exec_status; + if (is_receiver_eof()) { _serializer.reset_block(); return Status::OK(); - } - Status status; - if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { - status = send_current_block(true, exec_status); - } else { - if (is_local()) { - if (_recvr_is_valid()) { - _local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status); - } - } else { - // Non pipeline engine will send an empty eos block - status = send_remote_block((PBlock*)nullptr, true, exec_status); - } - } - // Don't wait for the last packet to finish, left it to close_wait. - if (status.is<ErrorCode::END_OF_FILE>()) { - return Status::OK(); } else { - return status; + return _send_current_block(true); } } -template <typename Parent> -Status Channel<Parent>::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - _closed = true; - - Status st = close_internal(exec_status); - if (!st.ok()) { - state->log_error(st.to_string()); - } - return st; -} - -template <typename Parent> -void Channel<Parent>::ch_roll_pb_block() { - _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 : &_ch_pb_block1); -} - -template <typename Parent> -BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local) +BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local) : _parent(parent), _is_local(is_local), _batch_size(parent->state()->batch_size()) {} -template <typename Parent> -Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest, int num_receivers, - bool* serialized, bool eos, - const std::vector<uint32_t>* rows) { +Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int num_receivers, + bool* serialized, bool eos, + const std::vector<uint32_t>* rows) { if (_mutable_block == nullptr) { _mutable_block = MutableBlock::create_unique(block->clone_empty()); } @@ -403,8 +266,7 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest return Status::OK(); } -template <typename Parent> -Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int num_receivers) { +Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { if (_mutable_block && _mutable_block->rows() > 0) { auto block = _mutable_block->to_block(); RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers)); @@ -415,29 +277,20 @@ Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int num_receivers) return Status::OK(); } -template <typename Parent> -Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* dest, int num_receivers) { - if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) { - SCOPED_TIMER(_parent->_serialize_batch_timer); - dest->Clear(); - size_t uncompressed_bytes = 0, compressed_bytes = 0; - RETURN_IF_ERROR(src->serialize( - _parent->_state->be_exec_version(), dest, &uncompressed_bytes, &compressed_bytes, - _parent->compression_type(), _parent->transfer_large_data_by_brpc())); - COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); - COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); - COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); - _parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * - num_receivers); - _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers); - } +Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) { + SCOPED_TIMER(_parent->_serialize_batch_timer); + dest->Clear(); + size_t uncompressed_bytes = 0, compressed_bytes = 0; + RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, &uncompressed_bytes, + &compressed_bytes, _parent->compression_type(), + _parent->transfer_large_data_by_brpc())); + COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers); + COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers); + COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time()); + _parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes * num_receivers); + _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers); return Status::OK(); } -template class Channel<pipeline::ExchangeSinkLocalState>; -template class Channel<pipeline::ResultFileSinkLocalState>; -template class BlockSerializer<pipeline::ResultFileSinkLocalState>; -template class BlockSerializer<pipeline::ExchangeSinkLocalState>; - } // namespace doris::vectorized diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 43d00b0164a..da0ee22ac14 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -72,13 +72,10 @@ class ExchangeSinkLocalState; } // namespace pipeline namespace vectorized { -template <typename> -class Channel; -template <typename Parent> class BlockSerializer { public: - BlockSerializer(Parent* parent, bool is_local = true); + BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local = true); Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, bool eos, const std::vector<uint32_t>* rows = nullptr); Status serialize_block(PBlock* dest, int num_receivers = 1); @@ -91,21 +88,13 @@ public: void set_is_local(bool is_local) { _is_local = is_local; } private: - Parent* _parent; + pipeline::ExchangeSinkLocalState* _parent; std::unique_ptr<MutableBlock> _mutable_block; bool _is_local; const int _batch_size; }; -struct ShuffleChannelIds { - template <typename HashValueType> - HashValueType operator()(HashValueType l, size_t r) { - return l % r; - } -}; - -template <typename Parent> class Channel { public: friend class pipeline::ExchangeSinkBuffer; @@ -113,23 +102,15 @@ public: // combination. buffer_size is specified in bytes and a soft limit on // how much tuple data is getting accumulated before being sent; it only applies // when data is added via add_row() and not sent directly via send_batch(). - Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress brpc_dest, + Channel(pipeline::ExchangeSinkLocalState* parent, TNetworkAddress brpc_dest, TUniqueId fragment_instance_id, PlanNodeId dest_node_id) : _parent(parent), - _row_desc(row_desc), _fragment_instance_id(std::move(fragment_instance_id)), _dest_node_id(dest_node_id), - _need_close(false), - _closed(false), _brpc_dest_addr(std::move(brpc_dest)), _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) && (_brpc_dest_addr.port == config::brpc_port)), - _serializer(_parent, _is_local) { - if (_is_local) { - VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << _dest_node_id; - } - _ch_cur_pb_block = &_ch_pb_block1; - } + _serializer(_parent, _is_local) {} virtual ~Channel() = default; @@ -138,37 +119,12 @@ public: Status init(RuntimeState* state); Status open(RuntimeState* state); - // Asynchronously sends a row batch. - // Returns the status of the most recently finished transmit_data - // rpc (or OK if there wasn't one that hasn't been reported yet). - // if batch is nullptr, send the eof packet - virtual Status send_remote_block(PBlock* block, bool eos = false, - Status exec_status = Status::OK()); - - virtual Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, - bool eos = false) { - return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); - } - - virtual Status add_rows(Block* block, const std::vector<uint32_t>& row, bool eos); - - virtual Status send_current_block(bool eos, Status exec_status); - - Status send_local_block(Status exec_status, bool eos = false); - - Status send_local_block(Block* block, bool can_be_moved); + Status send_local_block(Block* block, bool eos, bool can_be_moved); // Flush buffered rows and close channel. This function don't wait the response // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - Status close(RuntimeState* state, Status exec_status); - - // Get close wait's response, to finish channel close operation. - Status close_wait(RuntimeState* state); - - int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; } - - PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; } + Status close(RuntimeState* state); std::string get_fragment_instance_id_str() { UniqueId uid(_fragment_instance_id); @@ -177,151 +133,40 @@ public: bool is_local() const { return _is_local; } - virtual void ch_roll_pb_block(); - bool is_receiver_eof() const { return _receiver_status.is<ErrorCode::END_OF_FILE>(); } void set_receiver_eof(Status st) { _receiver_status = st; } -protected: - bool _recvr_is_valid() { - if (_local_recvr && !_local_recvr->is_closed()) { - return true; - } - _receiver_status = Status::EndOfFile( - "local data stream receiver closed"); // local data stream receiver closed - return false; - } - - Status _wait_last_brpc() { - SCOPED_TIMER(_parent->brpc_wait_timer()); - if (_send_remote_block_callback == nullptr) { - return Status::OK(); - } - _send_remote_block_callback->join(); - if (_send_remote_block_callback->cntl_->Failed()) { - std::string err = fmt::format( - "failed to send brpc batch, error={}, error_text={}, client: {}, " - "latency = {}", - berror(_send_remote_block_callback->cntl_->ErrorCode()), - _send_remote_block_callback->cntl_->ErrorText(), - BackendOptions::get_localhost(), - _send_remote_block_callback->cntl_->latency_us()); - LOG(WARNING) << err; - return Status::RpcError(err); - } - _receiver_status = Status::create(_send_remote_block_callback->response_->status()); - return _receiver_status; - } - - Status close_internal(Status exec_status); - - Parent* _parent = nullptr; - - const RowDescriptor& _row_desc; - const TUniqueId _fragment_instance_id; - PlanNodeId _dest_node_id; - - // the number of RowBatch.data bytes sent successfully - int64_t _num_data_bytes_sent {}; - int64_t _packet_seq {}; - - bool _need_close; - bool _closed; - int _be_number; - - TNetworkAddress _brpc_dest_addr; - - PUniqueId _finst_id; - PUniqueId _query_id; - PBlock _pb_block; - std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr; - std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; - std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>> _send_remote_block_callback; - Status _receiver_status; - int32_t _brpc_timeout_ms = 500; - RuntimeState* _state = nullptr; - - bool _is_local; - std::shared_ptr<VDataStreamRecvr> _local_recvr; - // serialized blocks for broadcasting; we need two so we can write - // one while the other one is still being sent. - // Which is for same reason as `_cur_pb_block`, `_pb_block1` and `_pb_block2` - // in VDataStreamSender. - PBlock* _ch_cur_pb_block = nullptr; - PBlock _ch_pb_block1; - PBlock _ch_pb_block2; - - BlockSerializer<Parent> _serializer; -}; - -#define HANDLE_CHANNEL_STATUS(state, channel, status) \ - do { \ - if (status.is<ErrorCode::END_OF_FILE>()) { \ - _handle_eof_channel(state, channel, status); \ - } else { \ - RETURN_IF_ERROR(status); \ - } \ - } while (0) - -class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> { -public: - PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor& row_desc, - const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, - PlanNodeId dest_node_id) - : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc, brpc_dest, - fragment_instance_id, dest_node_id) { - ch_roll_pb_block(); - } - - ~PipChannel() override { delete Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; } - int64_t mem_usage() const; - void ch_roll_pb_block() override { - // We have two choices here. - // 1. Use a PBlock pool and fetch an available PBlock if we need one. In this way, we can - // reuse the memory, but we have to use a lock to synchronize. - // 2. Create a new PBlock every time. In this way we don't need a lock but have to allocate - // new memory. - // Now we use the second way. - Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new PBlock(); - } - // Asynchronously sends a block // Returns the status of the most recently finished transmit_data // rpc (or OK if there wasn't one that hasn't been reported yet). // if batch is nullptr, send the eof packet - Status send_remote_block(PBlock* block, bool eos = false, - Status exec_status = Status::OK()) override; + Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos = false); + Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, bool eos = false); - Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block, - bool eos = false) override; - - Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) override { - if (Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) { + Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos) { + if (_fragment_instance_id.lo == -1) { return Status::OK(); } bool serialized = false; - _pblock = std::make_unique<PBlock>(); - RETURN_IF_ERROR( - Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block( - block, _pblock.get(), 1, &serialized, eos, &rows)); + if (_pblock == nullptr) { + _pblock = std::make_unique<PBlock>(); + } + RETURN_IF_ERROR(_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, eos, + &rows)); if (serialized) { - Status exec_status = Status::OK(); - RETURN_IF_ERROR(send_current_block(eos, exec_status)); + RETURN_IF_ERROR(_send_current_block(eos)); } return Status::OK(); } - // send _mutable_block - Status send_current_block(bool eos, Status exec_status) override; - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; - _buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id); + _buffer->register_sink(_fragment_instance_id); } std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback( @@ -337,12 +182,53 @@ public: std::shared_ptr<pipeline::Dependency> get_local_channel_dependency(); -private: +protected: + Status _send_local_block(bool eos); + Status _send_current_block(bool eos); + + Status _recvr_status() const { + if (_local_recvr && !_local_recvr->is_closed()) { + return Status::OK(); + } + return Status::EndOfFile( + "local data stream receiver closed"); // local data stream receiver closed + } + + pipeline::ExchangeSinkLocalState* _parent = nullptr; + + const TUniqueId _fragment_instance_id; + PlanNodeId _dest_node_id; + bool _closed {false}; + bool _need_close {false}; + int _be_number; + + TNetworkAddress _brpc_dest_addr; + + PBlock _pb_block; + std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr; + Status _receiver_status; + int32_t _brpc_timeout_ms = 500; + RuntimeState* _state = nullptr; + + bool _is_local; + std::shared_ptr<VDataStreamRecvr> _local_recvr; + + BlockSerializer _serializer; + pipeline::ExchangeSinkBuffer* _buffer = nullptr; bool _eos_send = false; std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> _send_callback; std::unique_ptr<PBlock> _pblock; }; +#define HANDLE_CHANNEL_STATUS(state, channel, status) \ + do { \ + if (status.is<ErrorCode::END_OF_FILE>()) { \ + _handle_eof_channel(state, channel, status); \ + } else { \ + RETURN_IF_ERROR(status); \ + } \ + } while (0) + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org