This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 55dd58d8994 [Refactor](Exchange) do exchange sink buffer refactor (#49335) 55dd58d8994 is described below commit 55dd58d8994bdd28ee560ad6c2b6788ce63aafdc Author: HappenLee <happen...@selectdb.com> AuthorDate: Thu Apr 10 22:30:52 2025 +0800 [Refactor](Exchange) do exchange sink buffer refactor (#49335) Do exchange refactor * remove unless check * change muti map to one struct map, only search one time * rename some variables to match code style --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 240 +++++++++++------------- be/src/pipeline/exec/exchange_sink_buffer.h | 130 +++++++------ be/src/pipeline/exec/exchange_sink_operator.cpp | 32 ++-- be/src/pipeline/exec/exchange_sink_operator.h | 8 +- be/src/vec/sink/vdata_stream_sender.cpp | 2 +- be/src/vec/sink/vdata_stream_sender.h | 4 +- be/test/pipeline/pipeline_test.cpp | 2 +- be/test/vec/exec/exchange_sink_test.cpp | 58 +++--- be/test/vec/exec/exchange_sink_test.h | 6 +- 9 files changed, 241 insertions(+), 241 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 90d54a35aa0..95a5a00f68a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -100,12 +100,7 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _node_id(node_id), _state(state), _context(state->get_query_ctx()), - _exchange_sink_num(sender_ins_ids.size()) { - for (auto sender_ins_id : sender_ins_ids) { - _queue_deps.emplace(sender_ins_id, nullptr); - _parents.emplace(sender_ins_id, nullptr); - } -} + _exchange_sink_num(sender_ins_ids.size()) {} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -121,29 +116,34 @@ void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { return; } auto low_id = fragment_instance_id.lo; - if (_instance_to_package_queue_mutex.count(low_id)) { + if (_rpc_instances.contains(low_id)) { return; } - _instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>(); - _instance_to_seq[low_id] = 0; - _instance_to_package_queue[low_id] = std::queue<TransmitInfo, std::list<TransmitInfo>>(); - _instance_to_broadcast_package_queue[low_id] = + + // Initialize the instance data + auto instance_data = std::make_unique<RpcInstance>(low_id); + instance_data->mutex = std::make_unique<std::mutex>(); + instance_data->seq = 0; + instance_data->package_queue = std::queue<TransmitInfo, std::list<TransmitInfo>>(); + instance_data->broadcast_package_queue = std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>(); - _queue_capacity = - config::exchg_buffer_queue_capacity_factor * _instance_to_package_queue.size(); + _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size(); + PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); - _rpc_channel_is_idle[low_id] = true; - _rpc_channel_is_turn_off[low_id] = false; - _instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id)); - _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get(); - _instance_to_request[low_id] = std::make_shared<PTransmitDataParams>(); - _instance_to_request[low_id]->mutable_finst_id()->CopyFrom(finst_id); - _instance_to_request[low_id]->mutable_query_id()->CopyFrom(_query_id); - - _instance_to_request[low_id]->set_node_id(_dest_node_id); - _running_sink_count[low_id] = _exchange_sink_num; + + instance_data->rpc_channel_is_idle = true; + instance_data->rpc_channel_is_turn_off = false; + + // Initialize request + instance_data->request = std::make_shared<PTransmitDataParams>(); + instance_data->request->mutable_finst_id()->CopyFrom(finst_id); + instance_data->request->mutable_query_id()->CopyFrom(_query_id); + instance_data->request->set_node_id(_dest_node_id); + instance_data->running_sink_count = _exchange_sink_num; + + _rpc_instances[low_id] = std::move(instance_data); } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { @@ -151,20 +151,21 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } auto ins_id = request.channel->dest_ins_id(); - if (!_instance_to_package_queue_mutex.contains(ins_id)) { + if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_rpc_channel_is_turn_off[ins_id]) { + auto& instance_data = *_rpc_instances[ins_id]; + if (instance_data.rpc_channel_is_turn_off) { return Status::EndOfFile("receiver eof"); } bool send_now = false; { - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]); + std::unique_lock<std::mutex> lock(*instance_data.mutex); // Do not have in process rpc, directly send - if (_rpc_channel_is_idle[ins_id]) { + if (instance_data.rpc_channel_is_idle) { send_now = true; - _rpc_channel_is_idle[ins_id] = false; + instance_data.rpc_channel_is_idle = false; } if (request.block) { RETURN_IF_ERROR( @@ -172,16 +173,16 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); } - _instance_to_package_queue[ins_id].emplace(std::move(request)); + instance_data.package_queue.emplace(std::move(request)); _total_queue_size++; if (_total_queue_size > _queue_capacity) { - for (auto& [_, dep] : _queue_deps) { + for (auto& dep : _queue_deps) { dep->block(); } } } if (send_now) { - RETURN_IF_ERROR(_send_rpc(ins_id)); + RETURN_IF_ERROR(_send_rpc(instance_data)); } return Status::OK(); @@ -192,71 +193,69 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { return Status::OK(); } auto ins_id = request.channel->dest_ins_id(); - if (!_instance_to_package_queue_mutex.contains(ins_id)) { + if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", print_id(request.channel->_fragment_instance_id)); } - if (_rpc_channel_is_turn_off[ins_id]) { + auto& instance_data = *_rpc_instances[ins_id]; + if (instance_data.rpc_channel_is_turn_off) { return Status::EndOfFile("receiver eof"); } bool send_now = false; { - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id]); + std::unique_lock<std::mutex> lock(*instance_data.mutex); // Do not have in process rpc, directly send - if (_rpc_channel_is_idle[ins_id]) { + if (instance_data.rpc_channel_is_idle) { send_now = true; - _rpc_channel_is_idle[ins_id] = false; + instance_data.rpc_channel_is_idle = false; } if (request.block_holder->get_block()) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } - _instance_to_broadcast_package_queue[ins_id].emplace(request); + instance_data.broadcast_package_queue.emplace(request); } if (send_now) { - RETURN_IF_ERROR(_send_rpc(ins_id)); + RETURN_IF_ERROR(_send_rpc(instance_data)); } return Status::OK(); } -Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); +Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { + std::unique_lock<std::mutex> lock(*(instance_data.mutex)); - std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; + std::queue<TransmitInfo, std::list<TransmitInfo>>& q = instance_data.package_queue; std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = - _instance_to_broadcast_package_queue[id]; + instance_data.broadcast_package_queue; if (_is_failed) { - _turn_off_channel(id, lock); + _turn_off_channel(instance_data, lock); return Status::OK(); } - if (_rpc_channel_is_turn_off[id]) { + if (instance_data.rpc_channel_is_turn_off) { return Status::OK(); } if (!q.empty()) { // If we have data to shuffle which is not broadcasted auto& request = q.front(); - auto& brpc_request = _instance_to_request[id]; + auto& brpc_request = instance_data.request; brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_packet_seq(instance_data.seq++); brpc_request->set_sender_id(request.channel->_parent->sender_id()); brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block && !request.block->column_metas().empty()) { brpc_request->set_allocated_block(request.block.get()); } - if (!request.exec_status.ok()) { - request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); - } - auto send_callback = request.channel->get_send_callback(id, request.eos); + auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - const InstanceLoId& id, const std::string& err) { + RpcInstance* ins, const std::string& err) { auto task_lock = weak_task_ctx.lock(); if (task_lock == nullptr) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. @@ -264,11 +263,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(id, err); + _failed(ins->id, err); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - const InstanceLoId& id, const bool& eos, + RpcInstance* ins_ptr, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { auto task_lock = weak_task_ctx.lock(); @@ -279,24 +278,25 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); + auto& ins = *ins_ptr; auto end_rpc_time = GetCurrentTimeNanos(); - update_rpc_time(id, start_rpc_time, end_rpc_time); + update_rpc_time(ins, start_rpc_time, end_rpc_time); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { - _set_receiver_eof(id); + _set_receiver_eof(ins); } else if (!s.ok()) { - _failed(id, + _failed(ins.id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); return; } else if (eos) { - _ended(id); + _ended(ins); } // The eos here only indicates that the current exchange sink has reached eos. // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. - s = _send_rpc(id); + s = _send_rpc(ins); if (!s) { - _failed(id, + _failed(ins.id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); @@ -322,29 +322,29 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { q.pop(); _total_queue_size--; if (_total_queue_size <= _queue_capacity) { - for (auto& [_, dep] : _queue_deps) { + for (auto& dep : _queue_deps) { dep->set_ready(); } } } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted auto& request = broadcast_q.front(); - auto& brpc_request = _instance_to_request[id]; + auto& brpc_request = instance_data.request; brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); + brpc_request->set_packet_seq(instance_data.seq++); brpc_request->set_sender_id(request.channel->_parent->sender_id()); brpc_request->set_be_number(request.channel->_parent->be_number()); if (request.block_holder->get_block() && !request.block_holder->get_block()->column_metas().empty()) { brpc_request->set_allocated_block(request.block_holder->get_block()); } - auto send_callback = request.channel->get_send_callback(id, request.eos); + auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - const InstanceLoId& id, const std::string& err) { + RpcInstance* ins, const std::string& err) { auto task_lock = weak_task_ctx.lock(); if (task_lock == nullptr) { // This means ExchangeSinkBuffer Ojbect already destroyed, not need run failed any more. @@ -352,11 +352,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - _failed(id, err); + _failed(ins->id, err); }); send_callback->start_rpc_time = GetCurrentTimeNanos(); send_callback->addSuccessHandler([&, weak_task_ctx = weak_task_exec_ctx()]( - const InstanceLoId& id, const bool& eos, + RpcInstance* ins_ptr, const bool& eos, const PTransmitDataResult& result, const int64_t& start_rpc_time) { auto task_lock = weak_task_ctx.lock(); @@ -366,26 +366,26 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } // attach task for memory tracker and query id when core SCOPED_ATTACH_TASK(_state); - + auto& ins = *ins_ptr; auto end_rpc_time = GetCurrentTimeNanos(); - update_rpc_time(id, start_rpc_time, end_rpc_time); + update_rpc_time(ins, start_rpc_time, end_rpc_time); Status s(Status::create(result.status())); if (s.is<ErrorCode::END_OF_FILE>()) { - _set_receiver_eof(id); + _set_receiver_eof(ins); } else if (!s.ok()) { - _failed(id, + _failed(ins.id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); return; } else if (eos) { - _ended(id); + _ended(ins); } // The eos here only indicates that the current exchange sink has reached eos. // However, the queue still contains data from other exchange sinks, so RPCs need to continue being sent. - s = _send_rpc(id); + s = _send_rpc(ins); if (!s) { - _failed(id, + _failed(ins.id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } }); @@ -408,29 +408,17 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } broadcast_q.pop(); } else { - _rpc_channel_is_idle[id] = true; + instance_data.rpc_channel_is_idle = true; } return Status::OK(); } -void ExchangeSinkBuffer::_ended(InstanceLoId id) { - if (!_instance_to_package_queue_mutex.contains(id)) { - std::stringstream ss; - ss << "failed find the instance id:" << id - << " now mutex map size:" << _instance_to_package_queue_mutex.size(); - for (const auto& p : _instance_to_package_queue_mutex) { - ss << " key:" << p.first << " value:" << p.second << "\n"; - } - LOG(INFO) << ss.str(); - - throw Exception(Status::FatalError("not find the instance id")); - } else { - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); - _running_sink_count[id]--; - if (_running_sink_count[id] == 0) { - _turn_off_channel(id, lock); - } +void ExchangeSinkBuffer::_ended(RpcInstance& ins) { + std::unique_lock<std::mutex> lock(*ins.mutex); + ins.running_sink_count--; + if (ins.running_sink_count == 0) { + _turn_off_channel(ins, lock); } } @@ -441,15 +429,15 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { _context->cancel(Status::Cancelled(err)); } -void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { - std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); +void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { + std::unique_lock<std::mutex> lock(*ins.mutex); // When the receiving side reaches eof, it means the receiver has finished early. // The remaining data in the current rpc_channel does not need to be sent, // and the rpc_channel should be turned off immediately. - Defer turn_off([&]() { _turn_off_channel(id, lock); }); + Defer turn_off([&]() { _turn_off_channel(ins, lock); }); std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = - _instance_to_broadcast_package_queue[id]; + ins.broadcast_package_queue; for (; !broadcast_q.empty(); broadcast_q.pop()) { if (broadcast_q.front().block_holder->get_block()) { COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), @@ -461,7 +449,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { swap(empty, broadcast_q); } - std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; + std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue; for (; !q.empty(); q.pop()) { // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked @@ -474,7 +462,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { // Try to wake up pipeline after clearing the queue if (_total_queue_size <= _queue_capacity) { - for (auto& [_, dep] : _queue_deps) { + for (auto& dep : _queue_deps) { dep->set_ready(); } } @@ -486,20 +474,20 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { } // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. -void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, +void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& /*with_lock*/) { - if (!_rpc_channel_is_idle[id]) { - _rpc_channel_is_idle[id] = true; + if (!ins.rpc_channel_is_idle) { + ins.rpc_channel_is_idle = true; } // Ensure that each RPC is turned off only once. - if (_rpc_channel_is_turn_off[id]) { + if (ins.rpc_channel_is_turn_off) { return; } - _rpc_channel_is_turn_off[id] = true; + ins.rpc_channel_is_turn_off = true; auto weak_task_ctx = weak_task_exec_ctx(); if (auto pip_ctx = weak_task_ctx.lock()) { - for (auto& [_, parent] : _parents) { - parent->on_channel_finished(id); + for (auto& parent : _parents) { + parent->on_channel_finished(ins.id); } } } @@ -507,10 +495,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; - for (auto& [id, stats] : _instance_to_rpc_stats) { - if (stats->sum_time != 0) { - local_max_time = std::max(local_max_time, stats->sum_time); - local_min_time = std::min(local_min_time, stats->sum_time); + for (auto& [_, ins] : _rpc_instances) { + if (ins->stats.sum_time != 0) { + local_max_time = std::max(local_max_time, ins->stats.sum_time); + local_min_time = std::min(local_min_time, ins->stats.sum_time); } } *max_time = local_max_time; @@ -519,24 +507,22 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_ti int64_t ExchangeSinkBuffer::get_sum_rpc_time() { int64_t sum_time = 0; - for (auto& [id, stats] : _instance_to_rpc_stats) { - sum_time += stats->sum_time; + for (auto& [_, ins] : _rpc_instances) { + sum_time += ins->stats.sum_time; } return sum_time; } -void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t start_rpc_time, +void ExchangeSinkBuffer::update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time) { _rpc_count++; int64_t rpc_spend_time = receive_rpc_time - start_rpc_time; - DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end()); if (rpc_spend_time > 0) { - ++_instance_to_rpc_stats[id]->rpc_count; - _instance_to_rpc_stats[id]->sum_time += rpc_spend_time; - _instance_to_rpc_stats[id]->max_time = - std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time); - _instance_to_rpc_stats[id]->min_time = - std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time); + auto& stats = ins.stats; + ++stats.rpc_count; + stats.sum_time += rpc_spend_time; + stats.max_time = std::max(stats.max_time, rpc_spend_time); + stats.min_time = std::min(stats.min_time, rpc_spend_time); } } @@ -561,21 +547,21 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { // This counter will lead to performance degradation. // So only collect this information when the profile level is greater than 3. if (_state->profile_level() > 3 && max_count > 0) { - std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec; - for (const auto& stats : _instance_to_rpc_stats_vec) { - tmp_rpc_stats_vec.emplace_back(*stats); + std::vector<std::pair<InstanceLoId, RpcInstanceStatistics>> tmp_rpc_stats_vec; + for (const auto& [id, ins] : _rpc_instances) { + tmp_rpc_stats_vec.emplace_back(id, ins->stats); } pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(), - [](const auto& a, const auto& b) { return a.max_time > b.max_time; }); + [](const auto& a, const auto& b) { return a.second.max_time > b.second.max_time; }); auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size()); int i = 0; auto* detail_profile = profile->create_child("RpcInstanceDetails", true, true); - for (const auto& stats : tmp_rpc_stats_vec) { + for (const auto& [id, stats] : tmp_rpc_stats_vec) { if (0 == stats.rpc_count) { continue; } std::stringstream out; - out << "Instance " << std::hex << stats.inst_lo_id; + out << "Instance " << std::hex << id; auto stats_str = fmt::format( "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {}, SumTime: {}", stats.rpc_count, PrettyPrinter::print(stats.max_time, TUnit::TIME_NS), @@ -594,10 +580,10 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) { std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { fmt::memory_buffer debug_string_buffer; - for (auto& [id, m] : _instance_to_package_queue_mutex) { - std::unique_lock<std::mutex> lock(*m); + for (auto& [id, instance_data] : _rpc_instances) { + std::unique_lock<std::mutex> lock(*instance_data->mutex); fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, - _instance_to_package_queue[id].size()); + instance_data->package_queue.size()); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 90be213c9d6..9a00ef072d5 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -114,7 +114,6 @@ struct TransmitInfo { vectorized::Channel* channel = nullptr; std::unique_ptr<PBlock> block; bool eos; - Status exec_status; }; struct BroadcastTransmitInfo { @@ -123,6 +122,49 @@ struct BroadcastTransmitInfo { bool eos; }; +struct RpcInstanceStatistics { + int64_t rpc_count = 0; + int64_t max_time = 0; + int64_t min_time = INT64_MAX; + int64_t sum_time = 0; +}; + +// Consolidated structure for RPC instance data +struct RpcInstance { + // Constructor initializes the instance with the given ID + RpcInstance(InstanceLoId id) : id(id) {} + + // Unique identifier for this RPC instance + InstanceLoId id; + + // Mutex for thread-safe access to this instance's data + std::unique_ptr<std::mutex> mutex; + + // Sequence number for RPC packets, incremented for each packet sent + int64_t seq = 0; + + // Queue for regular data transmission requests + std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue; + + // Queue for broadcast data transmission requests + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> broadcast_package_queue; + + // RPC request parameters for data transmission + std::shared_ptr<PTransmitDataParams> request; + + // Flag indicating if the RPC channel is currently idle (no active RPC) + bool rpc_channel_is_idle = true; + + // Flag indicating if the RPC channel has been turned off (no more RPCs will be sent) + bool rpc_channel_is_turn_off = false; + + // Statistics for monitoring RPC performance (latency, counts, etc.) + RpcInstanceStatistics stats; + + // Count of active exchange sinks using this RPC instance + int64_t running_sink_count = 0; +}; + template <typename Response> class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> { ENABLE_FACTORY_CREATOR(ExchangeSendCallback); @@ -130,20 +172,19 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> { public: ExchangeSendCallback() = default; - void init(InstanceLoId id, bool eos) { - _id = id; + void init(pipeline::RpcInstance* ins, bool eos) { + _ins = ins; _eos = eos; } ~ExchangeSendCallback() override = default; ExchangeSendCallback(const ExchangeSendCallback& other) = delete; ExchangeSendCallback& operator=(const ExchangeSendCallback& other) = delete; - void addFailedHandler( - const std::function<void(const InstanceLoId&, const std::string&)>& fail_fn) { + void addFailedHandler(const std::function<void(RpcInstance*, const std::string&)>& fail_fn) { _fail_fn = fail_fn; } - void addSuccessHandler(const std::function<void(const InstanceLoId&, const bool&, - const Response&, const int64_t&)>& suc_fn) { + void addSuccessHandler(const std::function<void(RpcInstance*, const bool&, const Response&, + const int64_t&)>& suc_fn) { _suc_fn = suc_fn; } @@ -157,9 +198,9 @@ public: ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(), BackendOptions::get_localhost(), ::doris::DummyBrpcCallback<Response>::cntl_->latency_us()); - _fail_fn(_id, err); + _fail_fn(_ins, err); } else { - _suc_fn(_id, _eos, *(::doris::DummyBrpcCallback<Response>::response_), + _suc_fn(_ins, _eos, *(::doris::DummyBrpcCallback<Response>::response_), start_rpc_time); } } catch (const std::exception& exp) { @@ -172,9 +213,9 @@ public: int64_t start_rpc_time; private: - std::function<void(const InstanceLoId&, const std::string&)> _fail_fn; - std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn; - InstanceLoId _id; + std::function<void(RpcInstance*, const std::string&)> _fail_fn; + std::function<void(RpcInstance*, const bool&, const Response&, const int64_t&)> _suc_fn; + RpcInstance* _ins; bool _eos; }; @@ -237,15 +278,14 @@ public: Status add_block(TransmitInfo&& request); Status add_block(BroadcastTransmitInfo&& request); void close(); - void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time); + void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency, ExchangeSinkLocalState* local_state) { - DCHECK(_queue_deps.contains(sender_ins_id)); - DCHECK(_parents.contains(sender_ins_id)); - _queue_deps[sender_ins_id] = queue_dependency; - _parents[sender_ins_id] = local_state; + std::lock_guard l(_m); + _queue_deps.push_back(queue_dependency); + _parents.push_back(local_state); } void set_low_memory_mode() { _queue_capacity = 8; } @@ -257,38 +297,9 @@ private: #endif friend class ExchangeSinkLocalState; - phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>> - _instance_to_package_queue_mutex; - // store data in non-broadcast shuffle - phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, std::list<TransmitInfo>>> - _instance_to_package_queue; + // Single map to store all RPC instance data + phmap::flat_hash_map<InstanceLoId, std::unique_ptr<RpcInstance>> _rpc_instances; std::atomic<size_t> _queue_capacity; - // store data in broadcast shuffle - phmap::flat_hash_map<InstanceLoId, - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>> - _instance_to_broadcast_package_queue; - using PackageSeq = int64_t; - // must init zero - // TODO: make all flat_hash_map to a STRUT - phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq; - phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>> _instance_to_request; - // One channel is corresponding to a downstream instance. - phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle; - - // There could be multiple situations that cause an rpc_channel to be turned off, - // such as receiving the eof, manual cancellation by the user, or all sinks reaching eos. - // Therefore, it is necessary to prevent an rpc_channel from being turned off multiple times. - phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_turn_off; - struct RpcInstanceStatistics { - RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {} - InstanceLoId inst_lo_id; - int64_t rpc_count = 0; - int64_t max_time = 0; - int64_t min_time = INT64_MAX; - int64_t sum_time = 0; - }; - std::vector<std::shared_ptr<RpcInstanceStatistics>> _instance_to_rpc_stats_vec; - phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*> _instance_to_rpc_stats; // It is set to true only when an RPC fails. Currently, we do not have an error retry mechanism. // If an RPC error occurs, the query will be canceled. @@ -301,19 +312,19 @@ private: RuntimeState* _state = nullptr; QueryContext* _context = nullptr; - Status _send_rpc(InstanceLoId); + Status _send_rpc(RpcInstance& ins); #ifndef BE_TEST - inline void _ended(InstanceLoId id); + inline void _ended(RpcInstance& ins); inline void _failed(InstanceLoId id, const std::string& err); - inline void _set_receiver_eof(InstanceLoId id); - inline void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock); + inline void _set_receiver_eof(RpcInstance& ins); + inline void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); #else - virtual void _ended(InstanceLoId id); + virtual void _ended(RpcInstance& ins); virtual void _failed(InstanceLoId id, const std::string& err); - virtual void _set_receiver_eof(InstanceLoId id); - virtual void _turn_off_channel(InstanceLoId id, std::unique_lock<std::mutex>& with_lock); + virtual void _set_receiver_eof(RpcInstance& ins); + virtual void _turn_off_channel(RpcInstance& ins, std::unique_lock<std::mutex>& with_lock); #endif void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); @@ -323,13 +334,12 @@ private: // Any modification to instance_to_package_queue requires a corresponding modification to _total_queue_size. std::atomic<int> _total_queue_size = 0; - // _running_sink_count is used to track how many sinks have not finished yet. - // It is only decremented when eos is reached. - phmap::flat_hash_map<InstanceLoId, int64_t> _running_sink_count; + // protected the `_queue_deps` and `_parents` + std::mutex _m; // _queue_deps is used for memory control. - phmap::flat_hash_map<InstanceLoId, std::shared_ptr<Dependency>> _queue_deps; + std::vector<std::shared_ptr<Dependency>> _queue_deps; // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. - phmap::flat_hash_map<InstanceLoId, ExchangeSinkLocalState*> _parents; + std::vector<ExchangeSinkLocalState*> _parents; const int64_t _exchange_sink_num; }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index b105985e11a..e944cb78f67 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -102,10 +102,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _last_local_channel_idx = i; } } - only_local_exchange = local_size == channels.size(); + _only_local_exchange = local_size == channels.size(); _rpc_channels_num = channels.size() - local_size; - if (!only_local_exchange) { + if (!_only_local_exchange) { _sink_buffer = p.get_sink_buffer(state->fragment_instance_id().lo); register_channels(_sink_buffer.get()); _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), @@ -220,12 +220,9 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && - !only_local_exchange) { - _broadcast_dependency = Dependency::create_shared( - _parent->operator_id(), _parent->node_id(), "BroadcastDependency", true); + if ((_part_type == TPartitionType::UNPARTITIONED) && !_only_local_exchange) { _broadcast_pb_mem_limiter = - vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency); + vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_queue_dependency); } else if (_last_local_channel_idx > -1) { size_t dep_id = 0; for (auto& channel : channels) { @@ -298,6 +295,20 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX( if (sink.__isset.output_tuple_id) { _output_tuple_id = sink.output_tuple_id; } + + // Bucket shuffle may contain some same bucket so no need change the BUCKET_SHFFULE_HASH_PARTITIONED + if (_part_type != TPartitionType::UNPARTITIONED && + _part_type != TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + // if the destinations only one dest, we need to use broadcast + std::unordered_set<UniqueId> dest_fragment_ids_set; + for (auto& dest : _dests) { + dest_fragment_ids_set.insert(dest.fragment_instance_id); + if (dest_fragment_ids_set.size() > 1) { + break; + } + } + _part_type = dest_fragment_ids_set.size() == 1 ? TPartitionType::UNPARTITIONED : _part_type; + } } Status ExchangeSinkOperatorX::init(const TDataSink& tsink) { @@ -368,11 +379,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block set_low_memory_mode(state); } - if (_part_type == TPartitionType::UNPARTITIONED || local_state.channels.size() == 1) { + if (_part_type == TPartitionType::UNPARTITIONED) { // 1. serialize depends on it is not local exchange // 2. send block // 3. rollover block - if (local_state.only_local_exchange) { + if (local_state._only_local_exchange) { if (!block->empty()) { Status status; size_t idx = 0; @@ -549,9 +560,6 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { } COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); - if (_broadcast_dependency) { - COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time()); - } for (size_t i = 0; i < _local_channels_dependency.size(); i++) { COUNTER_UPDATE(_wait_channel_timer[i], _local_channels_dependency[i]->watcher_elapse_time()); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index f9cf4f90748..cdef5e5e119 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -62,9 +62,6 @@ public: if (_queue_dependency) { dep_vec.push_back(_queue_dependency.get()); } - if (_broadcast_dependency) { - dep_vec.push_back(_broadcast_dependency.get()); - } std::for_each(_local_channels_dependency.begin(), _local_channels_dependency.end(), [&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); }); return dep_vec; @@ -104,7 +101,7 @@ public: } std::vector<std::shared_ptr<vectorized::Channel>> channels; int current_channel_idx {0}; // index of current channel to send to if _random == true - bool only_local_exchange {false}; + bool _only_local_exchange {false}; void on_channel_finished(InstanceLoId channel_id); vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); } @@ -145,7 +142,6 @@ private: vectorized::BlockSerializer _serializer; std::shared_ptr<Dependency> _queue_dependency = nullptr; - std::shared_ptr<Dependency> _broadcast_dependency = nullptr; /** * We use this to control the execution for local exchange. @@ -202,7 +198,7 @@ public: bool is_serial_operator() const override { return true; } void set_low_memory_mode(RuntimeState* state) override { auto& local_state = get_local_state(state); - // When `local_state.only_local_exchange` the `sink_buffer` is nullptr. + // When `local_state._only_local_exchange` the `sink_buffer` is nullptr. if (local_state._sink_buffer) { local_state._sink_buffer->set_low_memory_mode(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b4d6331c6f6..3f09d02f20f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) { } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos, Status::OK()})); + RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos})); } return Status::OK(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index c7b96c8638c..f62391d66b1 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -166,13 +166,13 @@ public: InstanceLoId dest_ins_id() const { return _fragment_instance_id.lo; } std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback( - InstanceLoId id, bool eos) { + pipeline::RpcInstance* ins, bool eos) { if (!_send_callback) { _send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared(); } else { _send_callback->cntl_->Reset(); } - _send_callback->init(id, eos); + _send_callback->init(ins, eos); return _send_callback; } diff --git a/be/test/pipeline/pipeline_test.cpp b/be/test/pipeline/pipeline_test.cpp index 6c2c99931a4..305e895b7b3 100644 --- a/be/test/pipeline/pipeline_test.cpp +++ b/be/test/pipeline/pipeline_test.cpp @@ -321,7 +321,7 @@ TEST_F(PipelineTest, HAPPY_PATH) { auto& sink_local_state = _runtime_states.back().front()->get_sink_local_state()->cast<ExchangeSinkLocalState>(); EXPECT_EQ(sink_local_state.channels.size(), 1); - EXPECT_EQ(sink_local_state.only_local_exchange, true); + EXPECT_EQ(sink_local_state._only_local_exchange, true); EXPECT_EQ(local_state.stream_recvr->sender_queues().size(), 1); diff --git a/be/test/vec/exec/exchange_sink_test.cpp b/be/test/vec/exec/exchange_sink_test.cpp index 7dbd352bd3a..0643d3c67b8 100644 --- a/be/test/vec/exec/exchange_sink_test.cpp +++ b/be/test/vec/exec/exchange_sink_test.cpp @@ -47,12 +47,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) { EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK()); EXPECT_EQ(sink3.add_block(dest_ins_id_3, true), Status::OK()); - for (auto [id, count] : buffer->_running_sink_count) { - EXPECT_EQ(count, 3) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id; } - for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) { - EXPECT_EQ(is_turn_off, false) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << id; } pop_block(dest_ins_id_1, PopState::accept); @@ -67,12 +67,12 @@ TEST_F(ExchangeSInkTest, test_normal_end) { pop_block(dest_ins_id_3, PopState::accept); pop_block(dest_ins_id_3, PopState::accept); - for (auto [id, count] : buffer->_running_sink_count) { - EXPECT_EQ(count, 0) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->running_sink_count, 0) << "id : " << id; } - for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) { - EXPECT_EQ(is_turn_off, true) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->rpc_channel_is_turn_off, true) << "id : " << id; } clear_all_done(); } @@ -99,17 +99,17 @@ TEST_F(ExchangeSInkTest, test_eof_end) { EXPECT_EQ(sink3.add_block(dest_ins_id_2, true), Status::OK()); EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK()); - for (auto [id, count] : buffer->_running_sink_count) { - EXPECT_EQ(count, 3) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id; } - for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) { - EXPECT_EQ(is_turn_off, false) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << id; } pop_block(dest_ins_id_1, PopState::eof); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true); - EXPECT_TRUE(buffer->_instance_to_package_queue[dest_ins_id_1].empty()); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true); + EXPECT_TRUE(buffer->_rpc_instances[dest_ins_id_1]->package_queue.empty()); pop_block(dest_ins_id_2, PopState::accept); pop_block(dest_ins_id_2, PopState::accept); @@ -119,9 +119,11 @@ TEST_F(ExchangeSInkTest, test_eof_end) { pop_block(dest_ins_id_3, PopState::accept); pop_block(dest_ins_id_3, PopState::accept); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], false) << "not all eos"; - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false) << " not all eos"; + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, false) + << "not all eos"; + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, false) + << " not all eos"; EXPECT_TRUE(sink1.add_block(dest_ins_id_1, true).is<ErrorCode::END_OF_FILE>()); EXPECT_EQ(sink1.add_block(dest_ins_id_2, true), Status::OK()); @@ -129,10 +131,10 @@ TEST_F(ExchangeSInkTest, test_eof_end) { pop_block(dest_ins_id_2, PopState::accept); pop_block(dest_ins_id_3, PopState::accept); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], true); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false); - EXPECT_EQ(buffer->_running_sink_count[dest_ins_id_3], 1); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, true); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, false); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->running_sink_count, 1); clear_all_done(); } @@ -159,12 +161,12 @@ TEST_F(ExchangeSInkTest, test_error_end) { EXPECT_EQ(sink3.add_block(dest_ins_id_2, false), Status::OK()); EXPECT_EQ(sink3.add_block(dest_ins_id_3, false), Status::OK()); - for (auto [id, count] : buffer->_running_sink_count) { - EXPECT_EQ(count, 3) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->running_sink_count, 3) << "id : " << id; } - for (auto [id, is_turn_off] : buffer->_rpc_channel_is_turn_off) { - EXPECT_EQ(is_turn_off, false) << "id : " << id; + for (const auto& [id, instance] : buffer->_rpc_instances) { + EXPECT_EQ(instance->rpc_channel_is_turn_off, false) << "id : " << id; } pop_block(dest_ins_id_2, PopState::error); @@ -226,9 +228,9 @@ TEST_F(ExchangeSInkTest, test_queue_size) { std::cout << "each queue size : \n" << buffer->debug_each_instance_queue_size() << "\n"; - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_1], false); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_2], true); - EXPECT_EQ(buffer->_rpc_channel_is_turn_off[dest_ins_id_3], false); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_1]->rpc_channel_is_turn_off, false); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_2]->rpc_channel_is_turn_off, true); + EXPECT_EQ(buffer->_rpc_instances[dest_ins_id_3]->rpc_channel_is_turn_off, false); clear_all_done(); } } diff --git a/be/test/vec/exec/exchange_sink_test.h b/be/test/vec/exec/exchange_sink_test.h index 253d7b267f9..59004a53a9e 100644 --- a/be/test/vec/exec/exchange_sink_test.h +++ b/be/test/vec/exec/exchange_sink_test.h @@ -138,10 +138,8 @@ struct SinkWithChannel { std::map<int64_t, std::shared_ptr<Channel>> channels; Status add_block(int64_t id, bool eos) { auto channel = channels[id]; - TransmitInfo transmitInfo {.channel = channel.get(), - .block = std::make_unique<PBlock>(), - .eos = eos, - .exec_status = Status::OK()}; + TransmitInfo transmitInfo { + .channel = channel.get(), .block = std::make_unique<PBlock>(), .eos = eos}; return buffer->add_block(std::move(transmitInfo)); } }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org