This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4d4550c2522abeef2e8b5ecc8bc3e5e0c6b1d83d Author: Pxl <pxl...@qq.com> AuthorDate: Thu Jun 13 16:34:33 2024 +0800 [Feature](sink) support parallel result sink (#36053) ## Proposed changes support parallel result sink --- be/src/pipeline/exec/result_file_sink_operator.cpp | 12 +++---- be/src/pipeline/exec/result_file_sink_operator.h | 4 +-- be/src/pipeline/exec/result_sink_operator.cpp | 20 +++++++---- be/src/pipeline/exec/result_sink_operator.h | 4 +-- be/src/runtime/buffer_control_block.cpp | 23 +++++++----- be/src/runtime/buffer_control_block.h | 3 +- be/src/runtime/result_buffer_mgr.cpp | 26 +++++++------- be/src/runtime/runtime_state.h | 4 --- .../nereids/properties/RequestPropertyDeriver.java | 7 +++- .../org/apache/doris/planner/PlanFragment.java | 4 --- .../main/java/org/apache/doris/qe/Coordinator.java | 41 ++++++++++++++++------ .../java/org/apache/doris/qe/ResultReceiver.java | 13 +++++-- .../java/org/apache/doris/qe/SessionVariable.java | 20 ++++++++--- .../apache/doris/nereids/DistributeHintTest.java | 1 + .../org/apache/doris/planner/ColocatePlanTest.java | 2 ++ .../planner/PaimonPredicateConverterTest.java | 1 + .../doris/utframe/DemoMultiBackendsTest.java | 1 + gensrc/thrift/PaloInternalService.thrift | 4 ++- .../suites/empty_relation/eliminate_empty.groovy | 1 + .../nereids_broadcast_shuffle_join/load.groovy | 1 + .../nereids_clickbench_shape_p0/query20.groovy | 1 + .../suites/nereids_p0/hint/multi_leading.groovy | 1 + .../eliminate_outer_join.groovy | 1 + .../push_down_alias_through_join.groovy | 1 + .../infer_set_operator_distinct.groovy | 1 + .../merge_aggregate/merge_aggregate.groovy | 1 + .../simplify_window_expression.groovy | 1 + .../nereids_ssb_shape_sf100_p0/shape/flat.groovy | 1 + .../push_filter_through_ptopn.groovy | 1 + .../push_filter_through_window.groovy | 1 + .../shape/query9.groovy | 1 + .../noStatsRfPrune/query9.groovy | 1 + .../no_stats_shape/query9.groovy | 1 + .../rf_prune/query9.groovy | 2 +- .../shape/query9.groovy | 1 + .../tpch/push_filter_window_eqset.groovy | 1 + 36 files changed, 139 insertions(+), 70 deletions(-) diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 381e234e78b..0cd14899f52 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -124,8 +124,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i std::mt19937 g(rd()); shuffle(_channels.begin(), _channels.end(), g); - for (int i = 0; i < _channels.size(); ++i) { - RETURN_IF_ERROR(_channels[i]->init_stub(state)); + for (auto& _channel : _channels) { + RETURN_IF_ERROR(_channel->init_stub(state)); } } _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); @@ -139,9 +139,9 @@ Status ResultFileSinkLocalState::open(RuntimeState* state) { auto& p = _parent->cast<ResultFileSinkOperatorX>(); if (!p._is_top_sink) { int local_size = 0; - for (int i = 0; i < _channels.size(); ++i) { - RETURN_IF_ERROR(_channels[i]->open(state)); - if (_channels[i]->is_local()) { + for (auto& _channel : _channels) { + RETURN_IF_ERROR(_channel->open(state)); + if (_channel->is_local()) { local_size++; } } @@ -175,7 +175,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) // close sender, this is normal path end if (_sender) { _sender->update_return_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); - static_cast<void>(_sender->close(final_status)); + RETURN_IF_ERROR(_sender->close(final_status)); } state->exec_env()->result_mgr()->cancel_at_time( time(nullptr) + config::result_buffer_cancelled_interval_time, diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 9699011d3be..4fa31f615ce 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -17,8 +17,6 @@ #pragma once -#include <stdint.h> - #include "operator.h" #include "vec/sink/writer/vfile_result_writer.h" @@ -39,7 +37,7 @@ public: using Base = AsyncWriterSink<vectorized::VFileResultWriter, ResultFileSinkOperatorX>; ENABLE_FACTORY_CREATOR(ResultFileSinkLocalState); ResultFileSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); - ~ResultFileSinkLocalState(); + ~ResultFileSinkLocalState() override; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b8ae962ea28..24c5162c4f4 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -18,6 +18,7 @@ #include "result_sink_operator.h" #include <memory> +#include <utility> #include "common/object_pool.h" #include "exec/rowid_fetcher.h" @@ -44,10 +45,13 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1); _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1); - // create sender - RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( - state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender, - state->execution_timeout())); + if (state->query_options().enable_parallel_result_sink) { + _sender = _parent->cast<ResultSinkOperatorX>()._sender; + } else { + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), RESULT_SINK_BUFFER_SIZE, &_sender, + state->execution_timeout())); + } _sender->set_dependency(_dependency->shared_from_this()); return Status::OK(); } @@ -104,9 +108,6 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id, const RowDescriptor& r } Status ResultSinkOperatorX::prepare(RuntimeState* state) { - auto fragment_instance_id = state->fragment_instance_id(); - auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})", - fragment_instance_id.hi, fragment_instance_id.lo); // prepare output_expr // From the thrift expressions create the real exprs. RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); @@ -118,6 +119,11 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) { } // Prepare the exprs to run. RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + + if (state->query_options().enable_parallel_result_sink) { + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->query_id(), RESULT_SINK_BUFFER_SIZE, &_sender, state->execution_timeout())); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/result_sink_operator.h b/be/src/pipeline/exec/result_sink_operator.h index 12a6bd0b11f..0ccb7f4946b 100644 --- a/be/src/pipeline/exec/result_sink_operator.h +++ b/be/src/pipeline/exec/result_sink_operator.h @@ -17,8 +17,6 @@ #pragma once -#include <stdint.h> - #include "operator.h" #include "runtime/buffer_control_block.h" #include "runtime/result_writer.h" @@ -159,6 +157,8 @@ private: // for fetch data by rowids TFetchOption _fetch_option; + + std::shared_ptr<BufferControlBlock> _sender = nullptr; }; } // namespace pipeline diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index c823b7d886a..8ef23265e3f 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -138,7 +138,7 @@ Status BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result) } _buffer_rows += num_rows; } else { - auto ctx = _waiting_rpc.front(); + auto* ctx = _waiting_rpc.front(); _waiting_rpc.pop_front(); ctx->on_data(result, _packet_num); _packet_num++; @@ -252,6 +252,11 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* Status BufferControlBlock::close(Status exec_status) { std::unique_lock<std::mutex> l(_lock); + close_cnt++; + if (close_cnt < _result_sink_dependencys.size()) { + return Status::OK(); + } + _is_close = true; _status = exec_status; @@ -286,16 +291,18 @@ void BufferControlBlock::cancel() { void BufferControlBlock::set_dependency( std::shared_ptr<pipeline::Dependency> result_sink_dependency) { - _result_sink_dependency = result_sink_dependency; + _result_sink_dependencys.push_back(result_sink_dependency); } void BufferControlBlock::_update_dependency() { - if (_result_sink_dependency && - (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled)) { - _result_sink_dependency->set_ready(); - } else if (_result_sink_dependency && - (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled)) { - _result_sink_dependency->block(); + if (_batch_queue_empty || _buffer_rows < _buffer_limit || _is_cancelled) { + for (auto dependency : _result_sink_dependencys) { + dependency->set_ready(); + } + } else if (!_batch_queue_empty && _buffer_rows < _buffer_limit && !_is_cancelled) { + for (auto dependency : _result_sink_dependencys) { + dependency->block(); + } } } diff --git a/be/src/runtime/buffer_control_block.h b/be/src/runtime/buffer_control_block.h index 98c84d4a72f..c8c240f928a 100644 --- a/be/src/runtime/buffer_control_block.h +++ b/be/src/runtime/buffer_control_block.h @@ -131,7 +131,8 @@ protected: // only used for FE using return rows to check limit std::unique_ptr<QueryStatistics> _query_statistics; std::atomic_bool _batch_queue_empty = false; - std::shared_ptr<pipeline::Dependency> _result_sink_dependency; + std::vector<std::shared_ptr<pipeline::Dependency>> _result_sink_dependencys; + size_t close_cnt = 0; }; } // namespace doris diff --git a/be/src/runtime/result_buffer_mgr.cpp b/be/src/runtime/result_buffer_mgr.cpp index bf6fb8a8f35..23f440d1909 100644 --- a/be/src/runtime/result_buffer_mgr.cpp +++ b/be/src/runtime/result_buffer_mgr.cpp @@ -96,13 +96,13 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) { std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock); - BufferMap::iterator iter = _buffer_map.find(query_id); + auto iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { return iter->second; } - return std::shared_ptr<BufferControlBlock>(); + return {}; } void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id, @@ -128,8 +128,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c tid.__set_lo(finst_id.lo()); std::shared_ptr<BufferControlBlock> cb = find_control_block(tid); if (cb == nullptr) { - LOG(WARNING) << "no result for this query, id=" << print_id(tid); - ctx->on_failure(Status::InternalError("no result for this query")); + ctx->on_failure(Status::InternalError("no result for this query, tid={}", print_id(tid))); return; } cb->get_batch(ctx); @@ -139,8 +138,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, std::shared_ptr<arrow::RecordBatch>* result) { std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id); if (cb == nullptr) { - LOG(WARNING) << "no result for this query, id=" << print_id(finst_id); - return Status::InternalError("no result for this query"); + return Status::InternalError("no result for this query, finst_id={}", print_id(finst_id)); } RETURN_IF_ERROR(cb->get_arrow_batch(result)); return Status::OK(); @@ -149,7 +147,7 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id, void ResultBufferMgr::cancel(const TUniqueId& query_id) { { std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); - BufferMap::iterator iter = _buffer_map.find(query_id); + auto iter = _buffer_map.find(query_id); if (_buffer_map.end() != iter) { iter->second->cancel(); @@ -169,7 +167,7 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id) { void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) { std::lock_guard<std::mutex> l(_timeout_lock); - TimeoutMap::iterator iter = _timeout_map.find(cancel_time); + auto iter = _timeout_map.find(cancel_time); if (_timeout_map.end() == iter) { _timeout_map.insert( @@ -189,11 +187,11 @@ void ResultBufferMgr::cancel_thread() { time_t now_time = time(nullptr); { std::lock_guard<std::mutex> l(_timeout_lock); - TimeoutMap::iterator end = _timeout_map.upper_bound(now_time + 1); + auto end = _timeout_map.upper_bound(now_time + 1); - for (TimeoutMap::iterator iter = _timeout_map.begin(); iter != end; ++iter) { - for (int i = 0; i < iter->second.size(); ++i) { - query_to_cancel.push_back(iter->second[i]); + for (auto iter = _timeout_map.begin(); iter != end; ++iter) { + for (const auto& id : iter->second) { + query_to_cancel.push_back(id); } } @@ -201,8 +199,8 @@ void ResultBufferMgr::cancel_thread() { } // cancel query - for (int i = 0; i < query_to_cancel.size(); ++i) { - cancel(query_to_cancel[i]); + for (const auto& id : query_to_cancel) { + cancel(id); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index af628ff9e89..d61bca2182e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -367,10 +367,6 @@ public: return _query_options.return_object_data_as_binary; } - bool enable_exchange_node_parallel_merge() const { - return _query_options.enable_enable_exchange_node_parallel_merge; - } - segment_v2::CompressionTypePB fragement_transmission_compression_type() const { if (_query_options.__isset.fragment_transmission_compression_codec) { if (_query_options.fragment_transmission_compression_codec == "lz4") { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 750707c52c4..0b4929e0a87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -154,7 +154,12 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { @Override public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) { - addRequestPropertyToChildren(PhysicalProperties.GATHER); + if (context.getSessionVariable().enableParallelResultSink() + && !context.getStatementContext().isShortCircuitQuery()) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } else { + addRequestPropertyToChildren(PhysicalProperties.GATHER); + } return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 2469d087cdd..7418e15bdc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -248,10 +248,6 @@ public class PlanFragment extends TreeNode<PlanFragment> { return hasColocatePlanNode; } - public void setDataPartition(DataPartition dataPartition) { - this.dataPartition = dataPartition; - } - /** * Finalize plan tree and create stream sink, if needed. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a191de9ba80..3f379232bb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -215,7 +215,7 @@ public class Coordinator implements CoordInterface { private final Map<Pair<Integer, Long>, PipelineExecContext> pipelineExecContexts = new HashMap<>(); private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList(); - private ResultReceiver receiver; + private List<ResultReceiver> receivers = Lists.newArrayList(); protected final List<ScanNode> scanNodes; private int scanRangeNum = 0; // number of instances of this query, equals to @@ -682,13 +682,27 @@ public class Coordinator implements CoordInterface { DataSink topDataSink = topParams.fragment.getSink(); this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L; if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) { + Boolean enableParallelResultSink = queryOptions.isEnableParallelResultSink() + && topDataSink instanceof ResultSink; TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host; - receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId, - addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline, - context.getSessionVariable().getMaxMsgSizeOfResultReceiver()); + Set<TNetworkAddress> addrs = new HashSet<>(); + for (FInstanceExecParam param : topParams.instanceExecParams) { + if (addrs.contains(param.host)) { + continue; + } + addrs.add(param.host); + receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host), + toBrpcHost(param.host), this.timeoutDeadline, + context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink)); + } if (!context.isReturnResultFromLocal()) { Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)); + if (enableParallelResultSink) { + context.setFinstId(queryId); + } else { + context.setFinstId(topParams.instanceExecParams.get(0).instanceId); + } context.setFinstId(topParams.instanceExecParams.get(0).instanceId); context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr)); context.setResultInternalServiceAddr(toBrpcHost(execBeAddr)); @@ -1084,13 +1098,13 @@ public class Coordinator implements CoordInterface { @Override public RowBatch getNext() throws Exception { - if (receiver == null) { + if (receivers.isEmpty()) { throw new UserException("There is no receiver."); } RowBatch resultBatch; Status status = new Status(); - resultBatch = receiver.getNext(status); + resultBatch = receivers.get(receivers.size() - 1).getNext(status); if (!status.ok()) { LOG.warn("Query {} coordinator get next fail, {}, need cancel.", DebugUtil.printId(queryId), status.getErrorMsg()); @@ -1129,7 +1143,12 @@ public class Coordinator implements CoordInterface { } if (resultBatch.isEos()) { - this.returnedAllResults = true; + receivers.remove(receivers.size() - 1); + if (receivers.isEmpty()) { + returnedAllResults = true; + } else { + resultBatch.setEos(false); + } // if this query is a block query do not cancel. Long numLimitRows = fragments.get(0).getPlanRoot().getLimit(); @@ -1250,7 +1269,7 @@ public class Coordinator implements CoordInterface { } private void cancelInternal(Status cancelReason) { - if (null != receiver) { + for (ResultReceiver receiver : receivers) { receiver.cancel(cancelReason); } if (null != pointExec) { @@ -1814,9 +1833,9 @@ public class Coordinator implements CoordInterface { leftMostNode.getNumInstances()); boolean forceToLocalShuffle = context != null && context.getSessionVariable().isForceToLocalShuffle(); - boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream() - .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, - addressToBackendID.size())) && useNereids); + boolean ignoreStorageDataDistribution = forceToLocalShuffle || (scanNodes.stream().allMatch( + scanNode -> scanNode.ignoreStorageDataDistribution(context, addressToBackendID.size())) + && useNereids); if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 6637509d4f2..b591a5d3c6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -54,17 +54,26 @@ public class ResultReceiver { private Long backendId; private Thread currentThread; private Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null; + private Boolean enableParallelResultSink = false; int maxMsgSizeOfResultReceiver; public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs, - int maxMsgSizeOfResultReceiver) { + int maxMsgSizeOfResultReceiver, Boolean enableParallelResultSink) { this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build(); this.finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build(); this.backendId = backendId; this.address = address; this.timeoutTs = timeoutTs; this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver; + this.enableParallelResultSink = enableParallelResultSink; + } + + Types.PUniqueId getRealFinstId() { + if (enableParallelResultSink) { + return queryId; + } + return finstId; } public RowBatch getNext(Status status) throws TException { @@ -75,7 +84,7 @@ public class ResultReceiver { try { while (!isDone && runStatus.ok()) { InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder() - .setFinstId(finstId) + .setFinstId(getRealFinstId()) .setRespInAttachment(false) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 51a63a2ee78..23ab0fb2701 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -132,7 +132,6 @@ public class SessionVariable implements Serializable, Writable { public static final String INSERT_MAX_FILTER_RATIO = "insert_max_filter_ratio"; public static final String ENABLE_SPILLING = "enable_spilling"; public static final String ENABLE_SHORT_CIRCUIT_QUERY = "enable_short_circuit_point_query"; - public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge"; public static final String ENABLE_SERVER_SIDE_PREPARED_STATEMENT = "enable_server_side_prepared_statement"; public static final String PREFER_JOIN_METHOD = "prefer_join_method"; @@ -194,6 +193,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size"; + public static final String ENABLE_PARALLEL_RESULT_SINK = "enable_parallel_result_sink"; + public static final String BE_NUMBER_FOR_TEST = "be_number_for_test"; // max ms to wait transaction publish finish when exec insert stmt. @@ -655,9 +656,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SHORT_CIRCUIT_QUERY) public boolean enableShortCircuitQuery = true; - @VariableMgr.VarAttr(name = ENABLE_EXCHANGE_NODE_PARALLEL_MERGE) - public boolean enableExchangeNodeParallelMerge = false; - // By default, the number of Limit items after OrderBy is changed from 65535 items // before v1.2.0 (not included), to return all items by default @VariableMgr.VarAttr(name = DEFAULT_ORDER_BY_LIMIT) @@ -1052,6 +1050,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = true) private boolean enableSyncRuntimeFilterSize = true; + @VariableMgr.VarAttr(name = ENABLE_PARALLEL_RESULT_SINK, needForward = true, fuzzy = true) + private boolean enableParallelResultSink = true; + @VariableMgr.VarAttr(name = USE_RF_DEFAULT) public boolean useRuntimeFilterDefaultSize = false; @@ -1946,6 +1947,7 @@ public class SessionVariable implements Serializable, Writable { this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); // this.enableHashJoinEarlyStartProbe = random.nextBoolean(); + this.enableParallelResultSink = random.nextBoolean(); int randomInt = random.nextInt(4); if (randomInt % 2 == 0) { this.rewriteOrToInPredicateThreshold = 100000; @@ -3341,7 +3343,6 @@ public class SessionVariable implements Serializable, Writable { } tResult.setEnableSpilling(enableSpilling); - tResult.setEnableEnableExchangeNodeParallelMerge(enableExchangeNodeParallelMerge); tResult.setRuntimeFilterWaitTimeMs(runtimeFilterWaitTimeMs); tResult.setRuntimeFilterMaxInNum(runtimeFilterMaxInNum); @@ -3422,6 +3423,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); tResult.setEnableLocalMergeSort(enableLocalMergeSort); + tResult.setEnableParallelResultSink(enableParallelResultSink); return tResult; } @@ -3765,6 +3767,14 @@ public class SessionVariable implements Serializable, Writable { return enablePipelineXEngine || enablePipelineEngine; } + public boolean enableParallelResultSink() { + return enableParallelResultSink; + } + + public void setParallelResultSink(Boolean enableParallelResultSink) { + this.enableParallelResultSink = enableParallelResultSink; + } + public boolean enableSyncRuntimeFilterSize() { return enableSyncRuntimeFilterSize; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java index e97a9c75a87..4405075b039 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/DistributeHintTest.java @@ -45,6 +45,7 @@ class DistributeHintTest extends TestWithFeService implements MemoPatternMatchSu createDatabase("test"); useDatabase("test"); connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + connectContext.getSessionVariable().setParallelResultSink(false); createTable("CREATE TABLE `t1` (\n" + " `a` int(11) NULL,\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java index c5275c20f0d..5110ea2fde1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java @@ -151,6 +151,7 @@ public class ColocatePlanTest extends TestWithFeService { // 2. scan node with two tablet one instance @Test public void sqlAggWithColocateTable() throws Exception { + connectContext.getSessionVariable().setParallelResultSink(false); String sql = "select k1, k2, count(*) from db1.test_multi_partition where k2 = 1 group by k1, k2"; StmtExecutor executor = getSqlStmtExecutor(sql); Planner planner = executor.planner(); @@ -186,6 +187,7 @@ public class ColocatePlanTest extends TestWithFeService { // Fix #8778 @Test public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception { + connectContext.getSessionVariable().setParallelResultSink(false); String createColocateTblStmtStr = "create table db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) " + "distributed by hash(k1, k2, k3) buckets 10 properties('replication_num' = '1');"; createTable(createColocateTblStmtStr); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java index 474f16ca0fc..fde1b6f74c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PaimonPredicateConverterTest.java @@ -55,6 +55,7 @@ public class PaimonPredicateConverterTest extends TestWithFeService { DataField paimonFieldV1 = new DataField(2, "v1", new IntType()); RowType rowType = new RowType(Lists.newArrayList(paimonFieldK1, paimonFieldK2, paimonFieldV1)); PaimonPredicateConverter converter = new PaimonPredicateConverter(rowType); + connectContext.getSessionVariable().setParallelResultSink(false); // k1=1 String sql1 = "SELECT * from db1.tbl1 where k1 = 1"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 4d9ed685fda..7481e9ffd19 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -125,6 +125,7 @@ public class DemoMultiBackendsTest { public void testCreateDbAndTable() throws Exception { // 1. create connect context ConnectContext ctx = UtFrameUtils.createDefaultCtx(); + ctx.getSessionVariable().setParallelResultSink(false); // 2. create database db1 String createDbStmtStr = "create database db1;"; CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 3619bf9d97d..439c666349d 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -143,7 +143,7 @@ struct TQueryOptions { // whether enable spilling to disk 31: optional bool enable_spilling = false; // whether enable parallel merge in exchange node - 32: optional bool enable_enable_exchange_node_parallel_merge = false; + 32: optional bool enable_enable_exchange_node_parallel_merge = false; // deprecated // Time in ms to wait until runtime filters are delivered. 33: optional i32 runtime_filter_wait_time_ms = 1000 @@ -301,6 +301,8 @@ struct TQueryOptions { 112: optional i32 max_column_reader_num = 0 113: optional bool enable_local_merge_sort = false; + + 114: optional bool enable_parallel_result_sink = false; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false diff --git a/regression-test/suites/empty_relation/eliminate_empty.groovy b/regression-test/suites/empty_relation/eliminate_empty.groovy index 2399523ee3d..47246b633ca 100644 --- a/regression-test/suites/empty_relation/eliminate_empty.groovy +++ b/regression-test/suites/empty_relation/eliminate_empty.groovy @@ -25,6 +25,7 @@ suite("eliminate_empty") { SET enable_fallback_to_original_planner=false; set disable_nereids_rules='PRUNE_EMPTY_PARTITION'; set forbid_unknown_col_stats=false; + set enable_parallel_result_sink=false; """ qt_onerow_union """ select * from (select 1, 2 union select 3, 4) T order by 1, 2 diff --git a/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy b/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy index 3c0fc19b119..9e3c715cf8c 100644 --- a/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy +++ b/regression-test/suites/nereids_broadcast_shuffle_join/load.groovy @@ -17,6 +17,7 @@ suite("broadcastJoin") { sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" String database = context.config.getDbNameByFile(context.file) sql "drop database if exists ${database}" sql "create database ${database}" diff --git a/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy b/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy index cf50bf0cfb3..bf98e0bc821 100644 --- a/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy +++ b/regression-test/suites/nereids_clickbench_shape_p0/query20.groovy @@ -22,6 +22,7 @@ suite("query20") { sql 'set enable_nereids_planner=true' sql 'set enable_fallback_to_original_planner=false' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" sql 'set topn_opt_limit_threshold = 1024' def ckBench = """SELECT UserID FROM hits WHERE UserID = 435090932899640449""" diff --git a/regression-test/suites/nereids_p0/hint/multi_leading.groovy b/regression-test/suites/nereids_p0/hint/multi_leading.groovy index bcefcfef141..9ea97cc024e 100644 --- a/regression-test/suites/nereids_p0/hint/multi_leading.groovy +++ b/regression-test/suites/nereids_p0/hint/multi_leading.groovy @@ -21,6 +21,7 @@ suite("multi_leading") { sql 'DROP DATABASE IF EXISTS test_multi_leading' sql 'CREATE DATABASE IF NOT EXISTS test_multi_leading' sql 'use test_multi_leading' + sql "set enable_parallel_result_sink=false;" // setting planner to nereids sql 'set exec_mem_limit=21G' diff --git a/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy b/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy index 81492dd7d48..d2cfae6eeb0 100644 --- a/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy +++ b/regression-test/suites/nereids_rules_p0/eliminate_outer_join/eliminate_outer_join.groovy @@ -21,6 +21,7 @@ suite("eliminate_outer_join") { sql "SET enable_fallback_to_original_planner=false" sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" sql 'set be_number_for_test=3' + sql "set enable_parallel_result_sink=false;" sql """ DROP TABLE IF EXISTS t diff --git a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy index 021d6056951..c54fd49f47f 100644 --- a/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy +++ b/regression-test/suites/nereids_rules_p0/filter_push_down/push_down_alias_through_join.groovy @@ -23,6 +23,7 @@ suite("push_down_alias_through_join") { sql "use regression_test_nereids_rules_p0" sql "set disable_join_reorder=true" sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" // Push alias through inner join where condition not use alias qt_pushdown_inner_join""" diff --git a/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy b/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy index e653b5eccab..930c6578a81 100644 --- a/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy +++ b/regression-test/suites/nereids_rules_p0/infer_set_operator_distinct/infer_set_operator_distinct.groovy @@ -20,6 +20,7 @@ suite("test_infer_set_operator_distinct") { sql "set runtime_filter_mode=OFF" sql "SET enable_fallback_to_original_planner=false" sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" sql """ diff --git a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy index 4a20cf4d68b..a824475c570 100644 --- a/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy +++ b/regression-test/suites/nereids_rules_p0/merge_aggregate/merge_aggregate.groovy @@ -18,6 +18,7 @@ suite("merge_aggregate") { sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" sql """ DROP TABLE IF EXISTS mal_test1 diff --git a/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy b/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy index 3e247b2a78f..10c76049e8e 100644 --- a/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy +++ b/regression-test/suites/nereids_rules_p0/simplify_window_expression/simplify_window_expression.groovy @@ -18,6 +18,7 @@ suite("simplify_window_expression") { sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" + sql "set enable_parallel_result_sink=false;" sql """ DROP TABLE IF EXISTS mal_test_simplify_window """ diff --git a/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy b/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy index 3f685edbfb9..a74f05c54c6 100644 --- a/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy +++ b/regression-test/suites/nereids_ssb_shape_sf100_p0/shape/flat.groovy @@ -25,6 +25,7 @@ suite("q1.1") { sql 'set exec_mem_limit=21G' sql 'SET enable_pipeline_engine = true' sql 'set parallel_pipeline_task_num=8' + sql "set enable_parallel_result_sink=false;" sql 'set be_number_for_test=3' sql 'set enable_runtime_filter_prune=false' diff --git a/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy b/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy index 162e71074fd..739517182f8 100644 --- a/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy +++ b/regression-test/suites/nereids_syntax_p0/push_filter_through_ptopn.groovy @@ -18,6 +18,7 @@ suite("push_filter_through_ptopn") { sql """set enable_nereids_planner=true""" sql """set enable_partition_topn=true""" + sql "set enable_parallel_result_sink=false;" sql """ DROP TABLE IF EXISTS push_filter_through_ptopn_tbl """ diff --git a/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy b/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy index 8c10c045391..c43f094bdb8 100644 --- a/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy +++ b/regression-test/suites/nereids_syntax_p0/push_filter_through_window.groovy @@ -17,6 +17,7 @@ suite("push_filter_through_window") { sql """set enable_nereids_planner=true""" + sql "set enable_parallel_result_sink=false;" sql """ DROP TABLE IF EXISTS push_filter_through_window_tbl """ diff --git a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy index 220cb0682a9..f950a4f4776 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf1000_p0/shape/query9.groovy @@ -32,6 +32,7 @@ suite("query9") { sql 'set runtime_filter_type=8' sql 'set dump_nereids_memo=false' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" def ds = """select case when (select count(*) from store_sales diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy index e3fd19dd569..18b92e42c5f 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query9.groovy @@ -33,6 +33,7 @@ sql 'set enable_runtime_filter_prune=true' sql 'set enable_nereids_timeout = false' sql 'SET enable_pipeline_engine = true' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" qt_ds_shape_9 ''' explain shape plan diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy index ac3262a5f1a..cedb73f4d68 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/no_stats_shape/query9.groovy @@ -33,6 +33,7 @@ sql 'set enable_runtime_filter_prune=false' sql 'set enable_nereids_timeout = false' sql 'SET enable_pipeline_engine = true' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" qt_ds_shape_9 ''' explain shape plan diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy index f2fc8a59402..ed0cf228f02 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/rf_prune/query9.groovy @@ -31,7 +31,7 @@ suite("query9") { sql 'set runtime_filter_type=8' sql 'set enable_runtime_filter_prune=true' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" - + sql "set enable_parallel_result_sink=false;" def ds = """select case when (select count(*) from store_sales diff --git a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy index 60beccf62ce..c49925dda42 100644 --- a/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy +++ b/regression-test/suites/nereids_tpcds_shape_sf100_p0/shape/query9.groovy @@ -31,6 +31,7 @@ suite("query9") { sql 'set enable_runtime_filter_prune=false' sql 'set runtime_filter_type=8' sql "set disable_nereids_rules=PRUNE_EMPTY_PARTITION" + sql "set enable_parallel_result_sink=false;" def ds = """select case when (select count(*) from store_sales diff --git a/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy b/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy index f0dffcf7d4f..02e5e7b4ff4 100644 --- a/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy +++ b/regression-test/suites/nereids_tpch_p0/tpch/push_filter_window_eqset.groovy @@ -22,6 +22,7 @@ suite("push_filter_window_eqset") { sql "use ${db}" sql 'set enable_nereids_planner=true' sql 'set enable_fallback_to_original_planner=false'; + sql "set enable_parallel_result_sink=false;" /** check the filter is pushed through window --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org