HappenLee commented on code in PR #16463: URL: https://github.com/apache/doris/pull/16463#discussion_r1100956402
########## be/src/pipeline/exec/exchange_sink_buffer.cpp: ########## @@ -159,54 +187,110 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]); std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id]; - if (q.empty() || _is_finishing) { + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + + if (_is_finishing) { _instance_to_sending_by_pipeline[id] = true; return Status::OK(); } - TransmitInfo& request = q.front(); + if (!q.empty()) { + // If we have data to shuffle which is not broadcasted + TransmitInfo& request = q.front(); - if (!_instance_to_request[id]) { - _construct_request(id); - } + if (!_instance_to_request[id]) { + _construct_request(id); + } - auto brpc_request = _instance_to_request[id]; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); - if (request.block) { - brpc_request->set_allocated_block(request.block.get()); - } + auto brpc_request = _instance_to_request[id]; + brpc_request->set_eos(request.eos); + brpc_request->set_packet_seq(_instance_to_seq[id]++); + if (request.block) { + brpc_request->set_allocated_block(request.block); + } + + auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos); + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + _closure->addFailedHandler( + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result) { + Status s = Status(result.status()); + if (!s.ok()) { + _failed(id, + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); + } else if (eos) { + _ended(id); + } else { + _send_rpc(id); + } + }); - auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { - Status s = Status(result.status()); - if (!s.ok()) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); - } else if (eos) { - _ended(id); - } else { - _send_rpc(id); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + if (enable_http_send_block(*brpc_request)) { + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, + *brpc_request, + request.channel->_brpc_dest_addr)); + } else { + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + } } - }); - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, - *brpc_request, request.channel->_brpc_dest_addr)); - } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + if (request.block) { + brpc_request->release_block(); + } + q.pop(); + } else if (!broadcast_q.empty()) { + // If we have data to shuffle which is broadcasted + BroadcastTransmitInfo& request = broadcast_q.front(); Review Comment: reuse the code of `broadcast_q` and `shuffle_q` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org