Gabriel39 commented on code in PR #44850: URL: https://github.com/apache/doris/pull/44850#discussion_r1868661486
########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -724,4 +744,33 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { return DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution(); } +std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() { + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state()); + for (const auto& _dest : _dests) { + const auto& dest_fragment_instance_id = _dest.fragment_instance_id; + // There is no need to check for duplicate dest_fragment_instance_id here. + // The construct_request function already handles this check internally. + sink_buffer->construct_request(dest_fragment_instance_id); + } + return sink_buffer; +} + +std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() { + if (_child) { Review Comment: Is `_child` always true? ########## be/src/pipeline/exec/exchange_sink_operator.cpp: ########## @@ -724,4 +744,33 @@ DataDistribution ExchangeSinkOperatorX::required_data_distribution() const { return DataSinkOperatorX<ExchangeSinkLocalState>::required_data_distribution(); } +std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::_create_buffer() { + PUniqueId id; + id.set_hi(_state->query_id().hi); + id.set_lo(_state->query_id().lo); + auto sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id, state()); + for (const auto& _dest : _dests) { + const auto& dest_fragment_instance_id = _dest.fragment_instance_id; + // There is no need to check for duplicate dest_fragment_instance_id here. + // The construct_request function already handles this check internally. + sink_buffer->construct_request(dest_fragment_instance_id); + } + return sink_buffer; +} + +std::shared_ptr<ExchangeSinkBuffer> ExchangeSinkOperatorX::get_sink_buffer() { + if (_child) { + if (std::dynamic_pointer_cast<SortSourceOperatorX>(_child)) { + return _create_buffer(); + } + if (std::dynamic_pointer_cast<LocalExchangeSourceOperatorX>(_child)) { Review Comment: Make abstraction ########## fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java: ########## @@ -1133,6 +1135,9 @@ public enum IgnoreSplitType { @VariableMgr.VarAttr(name = ENABLE_LOCAL_MERGE_SORT) private boolean enableLocalMergeSort = true; + @VariableMgr.VarAttr(name = ENABLE_SHARED_EXCHANGE_SINK_BUFFER) + private boolean enableSharedExchangeSinkBuffer = true; Review Comment: Add it to fuzzy variables. ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -166,10 +166,9 @@ class Channel { return Status::OK(); } - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } + void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } + + InstanceLoId ins_id() const { return _fragment_instance_id.lo; } Review Comment: ```suggestion InstanceLoId ins_lo_id() const { return _fragment_instance_id.lo; } ``` ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -158,12 +156,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(_parent->memory_used_counter(), request.block->ByteSizeLong()); + auto* parent = request.channel->_parent; + COUNTER_UPDATE(parent->memory_used_counter(), request.block->ByteSizeLong()); Review Comment: ```suggestion COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); ``` ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -209,23 +207,27 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); - DCHECK(_rpc_channel_is_idle[id] == false); - std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = _instance_to_broadcast_package_queue[id]; - if (_is_finishing) { + if (_is_failed) { _turn_off_channel(id, lock); return Status::OK(); } + if (_instance_to_receiver_eof[id]) { + DCHECK(_rpc_channel_is_turn_off[id]); + return Status::OK(); + } if (!q.empty()) { // If we have data to shuffle which is not broadcasted auto& request = q.front(); auto& brpc_request = _instance_to_request[id]; brpc_request->set_eos(request.eos); brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_sender_id(request.channel->_parent->sender_id()); + brpc_request->set_be_number(request.channel->_parent->be_number()); Review Comment: Add some comments to explain what is `be_number` is used for ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -411,48 +413,25 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); - _turn_off_channel(id, lock); + _running_sink_count[id]--; + if (_running_sink_count[id] == 0) { + _turn_off_channel(id, lock); + } } } void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { - _is_finishing = true; + _is_failed = true; _context->cancel(Status::Cancelled(err)); } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; + // When the receiving side reaches eof, it means the receiver has finished early. + // The remaining data in the current rpc_channel does not need to be sent, + // and the rpc_channel should be turned off immediately. _turn_off_channel(id, lock); - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = Review Comment: Keep this code block ########## be/src/vec/sink/vdata_stream_sender.h: ########## @@ -166,10 +166,9 @@ class Channel { return Status::OK(); } - void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { - _buffer = buffer; - _buffer->register_sink(_fragment_instance_id); - } + void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) { _buffer = buffer; } Review Comment: change `register_exchange_buffer` to `set_exchange_buffer` ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -129,24 +125,26 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); _rpc_channel_is_idle[low_id] = true; + _rpc_channel_is_turn_off[low_id] = false; _instance_to_receiver_eof[low_id] = false; _instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id)); _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); - _construct_request(low_id, finst_id); + _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>(); + _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id); + _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id); + + _instance_to_request[low_id]->set_node_id(_dest_node_id); } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { Review Comment: Should not delete this judgement. ########## be/src/pipeline/exec/exchange_sink_buffer.h: ########## @@ -184,13 +223,10 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { void update_profile(RuntimeProfile* profile); void set_dependency(std::shared_ptr<Dependency> queue_dependency, - std::shared_ptr<Dependency> finish_dependency) { - _queue_dependency = queue_dependency; - _finish_dependency = finish_dependency; - } - - void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) { - _broadcast_dependency = broadcast_dependency; + ExchangeSinkLocalState* local_state) { + std::lock_guard lc(_init_lock); Review Comment: Better to reduce locking scope ########## be/src/pipeline/exec/exchange_sink_buffer.h: ########## @@ -169,13 +169,52 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> { bool _eos; }; -// Each ExchangeSinkOperator have one ExchangeSinkBuffer +// ExchangeSinkBuffer can either be shared among multiple ExchangeSinkLocalState instances +// or be individually owned by each ExchangeSinkLocalState. +// The following describes the scenario where ExchangeSinkBuffer is shared among multiple ExchangeSinkLocalState instances. +// Of course, individual ownership can be seen as a special case where only one ExchangeSinkLocalState shares the buffer. + +// A sink buffer contains multiple rpc_channels. +// Each rpc_channel corresponds to a target instance on the receiving side. +// Data is sent using a ping-pong mode within each rpc_channel, +// meaning that at most one RPC can exist in a single rpc_channel at a time. +// The next RPC can only be sent after the previous one has completed. +// +// Each exchange sink sends data to all target instances on the receiving side. +// If the concurrency is 3, a single rpc_channel will be used simultaneously by three exchange sinks. + +/* + +-----------+ +-----------+ +-----------+ + |dest ins id| |dest ins id| |dest ins id| + | | | | | | + +----+------+ +-----+-----+ +------+----+ + | | | + | | | + +----------------+ +----------------+ +----------------+ + | | | | | | + sink buffer -------- | rpc_channel | | rpc_channel | | rpc_channel | + | | | | | | + +-------+--------+ +----------------+ +----------------+ + | | | + |------------------------+----------------------+ + | | | + | | | + +-----------------+ +-------+---------+ +-------+---------+ + | | | | | | + | exchange sink | | exchange sink | | exchange sink | + | | | | | | + +-----------------+ +-----------------+ +-----------------+ +*/ + class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: - ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state, ExchangeSinkLocalState* parent); + ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, RuntimeState* state); ~ExchangeSinkBuffer() override = default; - void register_sink(TUniqueId); + void register_sink(InstanceLoId id) { + std::lock_guard lc(_init_lock); Review Comment: Delete this lock. ########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -174,17 +175,14 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { } Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { - if (_is_finishing) { + if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->_fragment_instance_id.lo; + auto ins_id = request.channel->ins_id(); if (!_instance_to_package_queue_mutex.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_is_receiver_eof(ins_id)) { Review Comment: Should not delete this judgement. ########## be/src/pipeline/exec/exchange_sink_buffer.h: ########## @@ -214,6 +250,10 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { // One channel is corresponding to a downstream instance. phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle; + // There could be multiple situations that cause an rpc_channel to be turned off, + // such as receiving the eof, manual cancellation by the user, or all sinks reaching eos. + // Therefore, it is necessary to prevent an rpc_channel from being turned off multiple times. + phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off; Review Comment: Could we unify these 2 maps? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org