This is an automated email from the ASF dual-hosted git repository. lihaopeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e41ffc7cb51 [Refactor](Exec) Support one rpc send muti blocks (#50113) e41ffc7cb51 is described below commit e41ffc7cb51e5336c730e6e9e4d6b9c80c353c5b Author: HappenLee <happen...@selectdb.com> AuthorDate: Tue Apr 29 10:35:17 2025 +0800 [Refactor](Exec) Support one rpc send muti blocks (#50113) 1. Rmove unless channel ptr in trans struct to reduce mem consume 2. Send multi block one time in one rpc --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 256 +++++++++++++++------ be/src/pipeline/exec/exchange_sink_buffer.h | 15 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 12 + be/src/vec/sink/vdata_stream_sender.cpp | 4 +- be/test/vec/exec/exchange_sink_test.h | 5 +- .../java/org/apache/doris/qe/SessionVariable.java | 13 ++ gensrc/proto/internal_service.proto | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 8 files changed, 220 insertions(+), 87 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 95a5a00f68a..fc28f8b115c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -100,7 +100,13 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _node_id(node_id), _state(state), _context(state->get_query_ctx()), - _exchange_sink_num(sender_ins_ids.size()) {} + _exchange_sink_num(sender_ins_ids.size()), + _send_multi_blocks(state->query_options().__isset.exchange_multi_blocks_byte_size && + state->query_options().exchange_multi_blocks_byte_size > 0) { + if (_send_multi_blocks) { + _send_multi_blocks_byte_size = state->query_options().exchange_multi_blocks_byte_size; + } +} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -124,9 +130,12 @@ void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { auto instance_data = std::make_unique<RpcInstance>(low_id); instance_data->mutex = std::make_unique<std::mutex>(); instance_data->seq = 0; - instance_data->package_queue = std::queue<TransmitInfo, std::list<TransmitInfo>>(); - instance_data->broadcast_package_queue = - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>(); + instance_data->package_queue = + std::unordered_map<vectorized::Channel*, + std::queue<TransmitInfo, std::list<TransmitInfo>>>(); + instance_data->broadcast_package_queue = std::unordered_map< + vectorized::Channel*, + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>(); _queue_capacity = config::exchg_buffer_queue_capacity_factor * _rpc_instances.size(); PUniqueId finst_id; @@ -146,14 +155,14 @@ void ExchangeSinkBuffer::construct_request(TUniqueId fragment_instance_id) { _rpc_instances[low_id] = std::move(instance_data); } -Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { +Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel, TransmitInfo&& request) { if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->dest_ins_id(); + auto ins_id = channel->dest_ins_id(); if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", - print_id(request.channel->_fragment_instance_id)); + print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; if (instance_data.rpc_channel_is_turn_off) { @@ -170,10 +179,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { if (request.block) { RETURN_IF_ERROR( BeExecVersionManager::check_be_exec_version(request.block->be_exec_version())); - COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), - request.block->ByteSizeLong()); + COUNTER_UPDATE(channel->_parent->memory_used_counter(), request.block->ByteSizeLong()); } - instance_data.package_queue.emplace(std::move(request)); + instance_data.package_queue[channel].emplace(std::move(request)); _total_queue_size++; if (_total_queue_size > _queue_capacity) { for (auto& dep : _queue_deps) { @@ -188,14 +196,15 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } -Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { +Status ExchangeSinkBuffer::add_block(vectorized::Channel* channel, + BroadcastTransmitInfo&& request) { if (_is_failed) { return Status::OK(); } - auto ins_id = request.channel->dest_ins_id(); + auto ins_id = channel->dest_ins_id(); if (!_rpc_instances.contains(ins_id)) { return Status::InternalError("fragment_instance_id {} not do register_sink", - print_id(request.channel->_fragment_instance_id)); + print_id(channel->_fragment_instance_id)); } auto& instance_data = *_rpc_instances[ins_id]; if (instance_data.rpc_channel_is_turn_off) { @@ -213,7 +222,7 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version( request.block_holder->get_block()->be_exec_version())); } - instance_data.broadcast_package_queue.emplace(request); + instance_data.broadcast_package_queue[channel].emplace(request); } if (send_now) { RETURN_IF_ERROR(_send_rpc(instance_data)); @@ -225,9 +234,31 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { std::unique_lock<std::mutex> lock(*(instance_data.mutex)); - std::queue<TransmitInfo, std::list<TransmitInfo>>& q = instance_data.package_queue; - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = - instance_data.broadcast_package_queue; + auto& q_map = instance_data.package_queue; + auto& broadcast_q_map = instance_data.broadcast_package_queue; + + auto find_max_size_queue = [](vectorized::Channel*& channel, auto& ptr, auto& map) { + for (auto& [chan, lists] : map) { + if (!ptr) { + if (!lists.empty()) { + channel = chan; + ptr = &lists; + } + } else { + if (ptr->size() < lists.size()) { + channel = chan; + ptr = &lists; + } + } + } + }; + + vectorized::Channel* channel = nullptr; + + std::queue<TransmitInfo, std::list<TransmitInfo>>* q_ptr = nullptr; + find_max_size_queue(channel, q_ptr, q_map); + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>* broadcast_q_ptr = nullptr; + find_max_size_queue(channel, broadcast_q_ptr, broadcast_q_map); if (_is_failed) { _turn_off_channel(instance_data, lock); @@ -237,20 +268,49 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { return Status::OK(); } - if (!q.empty()) { + auto mem_byte = 0; + if (q_ptr && !q_ptr->empty()) { + auto& q = *q_ptr; + + std::vector<TransmitInfo> requests(_send_multi_blocks ? q.size() : 1); + for (int i = 0; i < requests.size(); i++) { + requests[i] = std::move(q.front()); + q.pop(); + + if (requests[i].block) { + // make sure rpc byte size under the _send_multi_blocks_bytes_size + mem_byte += requests[i].block->ByteSizeLong(); + if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + requests.resize(i + 1); + break; + } + } + } + // If we have data to shuffle which is not broadcasted - auto& request = q.front(); + auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(instance_data.seq++); - brpc_request->set_sender_id(request.channel->_parent->sender_id()); - brpc_request->set_be_number(request.channel->_parent->be_number()); - if (request.block && !request.block->column_metas().empty()) { - brpc_request->set_allocated_block(request.block.get()); + brpc_request->set_sender_id(channel->_parent->sender_id()); + brpc_request->set_be_number(channel->_parent->be_number()); + + if (_send_multi_blocks) { + for (auto& req : requests) { + if (req.block && !req.block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + add_block->Swap(req.block.get()); + } + } + } else { + if (request.block && !request.block->column_metas().empty()) { + brpc_request->set_allocated_block(request.block.get()); + } } - auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); + instance_data.seq += requests.size(); + brpc_request->set_packet_seq(instance_data.seq); + brpc_request->set_eos(requests.back().eos); + auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); + send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } @@ -308,38 +368,79 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (enable_http_send_block(*brpc_request)) { RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), std::move(send_remote_block_closure), - request.channel->_brpc_dest_addr)); + channel->_brpc_dest_addr)); } else { - transmit_blockv2(*request.channel->_brpc_stub, - std::move(send_remote_block_closure)); + transmit_blockv2(*channel->_brpc_stub, std::move(send_remote_block_closure)); } } - if (request.block) { - COUNTER_UPDATE(request.channel->_parent->memory_used_counter(), - -request.block->ByteSizeLong()); + + if (!_send_multi_blocks && request.block) { static_cast<void>(brpc_request->release_block()); + } else { + brpc_request->clear_blocks(); } - q.pop(); - _total_queue_size--; + if (mem_byte) { + COUNTER_UPDATE(channel->_parent->memory_used_counter(), -mem_byte); + } + DCHECK_GE(_total_queue_size, requests.size()); + _total_queue_size -= (int)requests.size(); if (_total_queue_size <= _queue_capacity) { for (auto& dep : _queue_deps) { dep->set_ready(); } } - } else if (!broadcast_q.empty()) { + } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { + auto& broadcast_q = *broadcast_q_ptr; // If we have data to shuffle which is broadcasted - auto& request = broadcast_q.front(); + std::vector<BroadcastTransmitInfo> requests(_send_multi_blocks ? broadcast_q.size() : 1); + for (int i = 0; i < requests.size(); i++) { + requests[i] = broadcast_q.front(); + broadcast_q.pop(); + + if (requests[i].block_holder->get_block()) { + // make sure rpc byte size under the _send_multi_blocks_bytes_size + mem_byte += requests[i].block_holder->get_block()->ByteSizeLong(); + if (_send_multi_blocks && mem_byte > _send_multi_blocks_byte_size) { + requests.resize(i + 1); + break; + } + } + } + + auto& request = requests[0]; auto& brpc_request = instance_data.request; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(instance_data.seq++); - brpc_request->set_sender_id(request.channel->_parent->sender_id()); - brpc_request->set_be_number(request.channel->_parent->be_number()); - if (request.block_holder->get_block() && - !request.block_holder->get_block()->column_metas().empty()) { - brpc_request->set_allocated_block(request.block_holder->get_block()); + brpc_request->set_sender_id(channel->_parent->sender_id()); + brpc_request->set_be_number(channel->_parent->be_number()); + + if (_send_multi_blocks) { + for (int i = 0; i < requests.size(); i++) { + auto& req = requests[i]; + if (auto block = req.block_holder->get_block(); + block && !block->column_metas().empty()) { + auto add_block = brpc_request->add_blocks(); + for (int j = 0; j < block->column_metas_size(); ++j) { + add_block->add_column_metas()->CopyFrom(block->column_metas(j)); + } + add_block->set_be_exec_version(block->be_exec_version()); + add_block->set_compressed(block->compressed()); + add_block->set_compression_type(block->compression_type()); + add_block->set_uncompressed_size(block->uncompressed_size()); + add_block->set_allocated_column_values( + const_cast<std::string*>(&block->column_values())); + } + } + } else { + if (request.block_holder->get_block() && + !request.block_holder->get_block()->column_metas().empty()) { + brpc_request->set_allocated_block(request.block_holder->get_block()); + } } - auto send_callback = request.channel->get_send_callback(&instance_data, request.eos); - send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); + instance_data.seq += requests.size(); + brpc_request->set_packet_seq(instance_data.seq); + brpc_request->set_eos(requests.back().eos); + auto send_callback = channel->get_send_callback(&instance_data, requests.back().eos); + + send_callback->cntl_->set_timeout_ms(channel->_brpc_timeout_ms); if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } @@ -397,16 +498,19 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { if (enable_http_send_block(*brpc_request)) { RETURN_IF_ERROR(transmit_block_httpv2(_context->exec_env(), std::move(send_remote_block_closure), - request.channel->_brpc_dest_addr)); + channel->_brpc_dest_addr)); } else { - transmit_blockv2(*request.channel->_brpc_stub, - std::move(send_remote_block_closure)); + transmit_blockv2(*channel->_brpc_stub, std::move(send_remote_block_closure)); } } - if (request.block_holder->get_block()) { + if (!_send_multi_blocks && request.block_holder->get_block()) { static_cast<void>(brpc_request->release_block()); + } else { + for (int i = 0; i < brpc_request->mutable_blocks()->size(); ++i) { + static_cast<void>(brpc_request->mutable_blocks(i)->release_column_values()); + } + brpc_request->clear_blocks(); } - broadcast_q.pop(); } else { instance_data.rpc_channel_is_idle = true; } @@ -436,27 +540,27 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { // and the rpc_channel should be turned off immediately. Defer turn_off([&]() { _turn_off_channel(ins, lock); }); - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q = - ins.broadcast_package_queue; - for (; !broadcast_q.empty(); broadcast_q.pop()) { - if (broadcast_q.front().block_holder->get_block()) { - COUNTER_UPDATE(broadcast_q.front().channel->_parent->memory_used_counter(), - -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + auto& broadcast_q_map = ins.broadcast_package_queue; + for (auto& [channel, broadcast_q] : broadcast_q_map) { + for (; !broadcast_q.empty(); broadcast_q.pop()) { + if (broadcast_q.front().block_holder->get_block()) { + COUNTER_UPDATE(channel->_parent->memory_used_counter(), + -broadcast_q.front().block_holder->get_block()->ByteSizeLong()); + } } } - { - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty; - swap(empty, broadcast_q); - } - - std::queue<TransmitInfo, std::list<TransmitInfo>>& q = ins.package_queue; - for (; !q.empty(); q.pop()) { - // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, - // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked - _total_queue_size--; - if (q.front().block) { - COUNTER_UPDATE(q.front().channel->_parent->memory_used_counter(), - -q.front().block->ByteSizeLong()); + broadcast_q_map.clear(); + + auto& q_map = ins.package_queue; + for (auto& [channel, q] : q_map) { + for (; !q.empty(); q.pop()) { + // Must update _total_queue_size here, otherwise if _total_queue_size > _queue_capacity at EOF, + // ExchangeSinkQueueDependency will be blocked and pipeline will be deadlocked + _total_queue_size--; + if (q.front().block) { + COUNTER_UPDATE(channel->_parent->memory_used_counter(), + -q.front().block->ByteSizeLong()); + } } } @@ -467,10 +571,7 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { } } - { - std::queue<TransmitInfo, std::list<TransmitInfo>> empty; - swap(empty, q); - } + q_map.clear(); } // The unused parameter `with_lock` is to ensure that the function is called when the lock is held. @@ -582,8 +683,11 @@ std::string ExchangeSinkBuffer::debug_each_instance_queue_size() { fmt::memory_buffer debug_string_buffer; for (auto& [id, instance_data] : _rpc_instances) { std::unique_lock<std::mutex> lock(*instance_data->mutex); - fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, - instance_data->package_queue.size()); + auto queue_size = 0; + for (auto& [_, list] : instance_data->package_queue) { + queue_size += list.size(); + } + fmt::format_to(debug_string_buffer, "Instance: {}, queue size: {}\n", id, queue_size); } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 9a00ef072d5..44416ef68e1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -111,13 +111,11 @@ private: namespace pipeline { struct TransmitInfo { - vectorized::Channel* channel = nullptr; std::unique_ptr<PBlock> block; bool eos; }; struct BroadcastTransmitInfo { - vectorized::Channel* channel = nullptr; std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr; bool eos; }; @@ -144,10 +142,13 @@ struct RpcInstance { int64_t seq = 0; // Queue for regular data transmission requests - std::queue<TransmitInfo, std::list<TransmitInfo>> package_queue; + std::unordered_map<vectorized::Channel*, std::queue<TransmitInfo, std::list<TransmitInfo>>> + package_queue; // Queue for broadcast data transmission requests - std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> broadcast_package_queue; + std::unordered_map<vectorized::Channel*, + std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>> + broadcast_package_queue; // RPC request parameters for data transmission std::shared_ptr<PTransmitDataParams> request; @@ -275,8 +276,8 @@ public: void construct_request(TUniqueId); - Status add_block(TransmitInfo&& request); - Status add_block(BroadcastTransmitInfo&& request); + Status add_block(vectorized::Channel* channel, TransmitInfo&& request); + Status add_block(vectorized::Channel* channel, BroadcastTransmitInfo&& request); void close(); void update_rpc_time(RpcInstance& ins, int64_t start_rpc_time, int64_t receive_rpc_time); void update_profile(RuntimeProfile* profile); @@ -341,6 +342,8 @@ private: // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. std::vector<ExchangeSinkLocalState*> _parents; const int64_t _exchange_sink_num; + bool _send_multi_blocks = false; + int _send_multi_blocks_byte_size = 256 * 1024; }; } // namespace pipeline diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 535f59c49e8..2a4f4e22861 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -146,6 +146,18 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, } bool eos = request->eos(); + if (!request->blocks().empty()) { + for (int i = 0; i < request->blocks_size(); i++) { + std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>(); + pblock_ptr->Swap(const_cast<PBlock*>(&request->blocks(i))); + RETURN_IF_ERROR(recvr->add_block( + std::move(pblock_ptr), request->sender_id(), request->be_number(), + request->packet_seq() - request->blocks_size() + i, eos ? nullptr : done, + wait_for_worker, cpu_time_stop_watch.elapsed_time())); + } + } + + // old logic, for compatibility if (request->has_block()) { std::unique_ptr<PBlock> pblock_ptr { const_cast<PTransmitDataParams*>(request)->release_block()}; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 3f09d02f20f..0e9c5371a2b 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -174,7 +174,7 @@ Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) { } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos})); + RETURN_IF_ERROR(_buffer->add_block(this, {std::move(block), eos})); } return Status::OK(); } @@ -188,7 +188,7 @@ Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& blo _eos_send = true; } if (eos || block->get_block()->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + RETURN_IF_ERROR(_buffer->add_block(this, {block, eos})); } return Status::OK(); } diff --git a/be/test/vec/exec/exchange_sink_test.h b/be/test/vec/exec/exchange_sink_test.h index 59004a53a9e..5cf0fdbed6a 100644 --- a/be/test/vec/exec/exchange_sink_test.h +++ b/be/test/vec/exec/exchange_sink_test.h @@ -138,9 +138,8 @@ struct SinkWithChannel { std::map<int64_t, std::shared_ptr<Channel>> channels; Status add_block(int64_t id, bool eos) { auto channel = channels[id]; - TransmitInfo transmitInfo { - .channel = channel.get(), .block = std::make_unique<PBlock>(), .eos = eos}; - return buffer->add_block(std::move(transmitInfo)); + TransmitInfo transmitInfo {.block = std::make_unique<PBlock>(), .eos = eos}; + return buffer->add_block(channel.get(), std::move(transmitInfo)); } }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c7d26af924f..67e52b66133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -674,6 +674,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_ES_PARALLEL_SCROLL = "enable_es_parallel_scroll"; + public static final String EXCHANGE_MULTI_BLOCKS_BYTE_SIZE = "exchange_multi_blocks_byte_size"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -2243,6 +2245,10 @@ public class SessionVariable implements Serializable, Writable { "When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"}) public boolean keepCarriageReturn = false; + @VariableMgr.VarAttr(name = EXCHANGE_MULTI_BLOCKS_BYTE_SIZE, + description = {"Enable exchange to send multiple blocks in one RPC. Default is 256KB. A negative" + + " value disables multi-block exchange."}) + public int exchangeMultiBlocksByteSize = 256 * 1024; @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) @@ -2591,6 +2597,12 @@ public class SessionVariable implements Serializable, Writable { this.disableStreamPreaggregations = random.nextBoolean(); this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); this.enableParallelResultSink = random.nextBoolean(); + + // 4KB = 4 * 1024 bytes + int minBytes = 4 * 1024; + // 10MB = 10 * 1024 * 1024 bytes + int maxBytes = 10 * 1024 * 1024; + this.exchangeMultiBlocksByteSize = minBytes + (int) (random.nextDouble() * (maxBytes - minBytes)); int randomInt = random.nextInt(4); if (randomInt % 2 == 0) { this.rewriteOrToInPredicateThreshold = 100000; @@ -4165,6 +4177,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableRuntimeFilterPartitionPrune(enableRuntimeFilterPartitionPrune); tResult.setMinimumOperatorMemoryRequiredKb(minimumOperatorMemoryRequiredKB); + tResult.setExchangeMultiBlocksByteSize(exchangeMultiBlocksByteSize); return tResult; } diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 402f1044568..e36907e9ea1 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -51,6 +51,7 @@ message PTransmitDataParams { optional bool transfer_by_attachment = 10 [default = false]; optional PUniqueId query_id = 11; optional PStatus exec_status = 12; + repeated PBlock blocks = 13; }; message PTransmitDataResult { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 60ca569949f..81e4d1f877c 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -393,6 +393,7 @@ struct TQueryOptions { 162: optional bool dump_heap_profile_when_mem_limit_exceeded = false 163: optional bool inverted_index_compatible_read = false 164: optional bool check_orc_init_sargs_success = false + 165: optional i32 exchange_multi_blocks_byte_size = 262144 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org