This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c00dca70e60 [pipelineX](local shuffle) Support parallel execution despite of tablet number (#28266) c00dca70e60 is described below commit c00dca70e605727d10134a7d10acb4e71ae0d176 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Thu Dec 14 12:53:54 2023 +0800 [pipelineX](local shuffle) Support parallel execution despite of tablet number (#28266) --- be/src/pipeline/exec/aggregation_sink_operator.h | 5 +- be/src/pipeline/exec/es_scan_operator.cpp | 4 +- be/src/pipeline/exec/es_scan_operator.h | 2 +- be/src/pipeline/exec/exchange_source_operator.h | 4 +- be/src/pipeline/exec/file_scan_operator.h | 4 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 14 +- be/src/pipeline/exec/hashjoin_build_sink.h | 5 +- be/src/pipeline/exec/jdbc_scan_operator.cpp | 4 +- be/src/pipeline/exec/jdbc_scan_operator.h | 2 +- be/src/pipeline/exec/olap_scan_operator.cpp | 4 +- be/src/pipeline/exec/olap_scan_operator.h | 2 +- be/src/pipeline/exec/scan_operator.cpp | 8 +- be/src/pipeline/exec/scan_operator.h | 8 +- be/src/pipeline/pipeline.h | 13 ++ be/src/pipeline/pipeline_x/dependency.h | 23 ++- .../local_exchange/local_exchange_sink_operator.h | 2 + .../local_exchange_source_operator.h | 2 + .../pipeline_x/local_exchange/local_exchanger.cpp | 107 ++++++++-- .../pipeline_x/local_exchange/local_exchanger.h | 59 +++++- be/src/pipeline/pipeline_x/operator.h | 21 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 73 +++++-- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 8 +- be/src/runtime/runtime_filter_mgr.cpp | 2 +- be/src/runtime/runtime_filter_mgr.h | 3 +- be/src/vec/exec/runtime_filter_consumer.cpp | 6 +- be/src/vec/exec/runtime_filter_consumer.h | 4 +- .../vec/runtime/shared_hash_table_controller.cpp | 3 +- be/src/vec/runtime/shared_hash_table_controller.h | 4 +- .../glue/translator/PhysicalPlanTranslator.java | 8 + .../org/apache/doris/planner/OlapScanNode.java | 5 + .../org/apache/doris/planner/PlanFragment.java | 20 ++ .../java/org/apache/doris/planner/ScanNode.java | 16 +- .../main/java/org/apache/doris/qe/Coordinator.java | 228 ++++++++++----------- .../java/org/apache/doris/qe/SessionVariable.java | 13 ++ gensrc/thrift/PaloInternalService.thrift | 1 + 35 files changed, 481 insertions(+), 206 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index cd85390dce0..639687ec74d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -369,7 +369,10 @@ public: std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; } ExchangeType get_local_exchange_type() const override { if (_probe_expr_ctxs.empty()) { - return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP; + return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x + ->ignore_data_distribution() + ? ExchangeType::PASSTHROUGH + : ExchangeType::NOOP; } return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; } diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 8567db90948..c00ee6917ea 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -116,8 +116,8 @@ void EsScanLocalState::set_scan_ranges(RuntimeState* state, } EsScanOperatorX::EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs) - : ScanOperatorX<EsScanLocalState>(pool, tnode, operator_id, descs), + const DescriptorTbl& descs, int parallel_tasks) + : ScanOperatorX<EsScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), _tuple_id(tnode.es_scan_node.tuple_id), _tuple_desc(nullptr) { ScanOperatorX<EsScanLocalState>::_output_tuple_id = tnode.es_scan_node.tuple_id; diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index dbdbbe198bb..62d1a043c47 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -72,7 +72,7 @@ private: class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> { public: EsScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs); + const DescriptorTbl& descs, int parallel_tasks); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 24927fc947c..49211321886 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -116,7 +116,9 @@ public: return _sub_plan_query_statistics_recvr; } - bool need_to_local_shuffle() const override { return !_is_hash_partition; } + bool need_to_local_shuffle() const override { + return !_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution(); + } private: friend class ExchangeLocalState; diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 6ae3344ed71..4e64bd850ba 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -70,8 +70,8 @@ private: class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> { public: FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs) - : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, descs), + const DescriptorTbl& descs, int parallel_tasks) + : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), _table_name(tnode.file_scan_node.__isset.table_name ? tnode.file_scan_node.table_name : "") { _output_tuple_id = tnode.file_scan_node.tuple_id; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index efc75828484..b8ae2ca9666 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -62,9 +62,12 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo if (p._is_broadcast_join) { profile()->add_info_string("BroadcastJoin", "true"); if (state->enable_share_hash_table_for_broadcast_join()) { - profile()->add_info_string("ShareHashTableEnabled", "true"); - _should_build_hash_table = p._shared_hashtable_controller->should_build_hash_table( - state->fragment_instance_id(), p.node_id()); + _should_build_hash_table = info.task_idx == 0; + if (_should_build_hash_table) { + profile()->add_info_string("ShareHashTableEnabled", "true"); + CHECK(p._shared_hashtable_controller->should_build_hash_table( + state->fragment_instance_id(), p.node_id())); + } } else { profile()->add_info_string("ShareHashTableEnabled", "false"); } @@ -514,10 +517,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); - auto wait_timer = ADD_TIMER(local_state.profile(), "WaitForSharedHashTableTime"); - SCOPED_TIMER(wait_timer); - RETURN_IF_ERROR( - _shared_hashtable_controller->wait_for_signal(state, _shared_hash_table_context)); + CHECK(_shared_hash_table_context->signaled); local_state.profile()->add_info_string( "SharedHashTableFrom", diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index c00d1b8e591..580c2bb8ff9 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -157,8 +157,11 @@ public: std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; } ExchangeType get_local_exchange_type() const override { - if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || _is_broadcast_join) { + if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { return ExchangeType::NOOP; + } else if (_is_broadcast_join) { + return _child_x->ignore_data_distribution() ? ExchangeType::BROADCAST + : ExchangeType::NOOP; } return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE || _join_distribution == TJoinDistributionType::COLOCATE diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp b/be/src/pipeline/exec/jdbc_scan_operator.cpp index 74890f647fc..f6c22db9283 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.cpp +++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp @@ -38,8 +38,8 @@ Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s } JDBCScanOperatorX::JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs) - : ScanOperatorX<JDBCScanLocalState>(pool, tnode, operator_id, descs), + const DescriptorTbl& descs, int parallel_tasks) + : ScanOperatorX<JDBCScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), _table_name(tnode.jdbc_scan_node.table_name), _tuple_id(tnode.jdbc_scan_node.tuple_id), _query_string(tnode.jdbc_scan_node.query_string), diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h b/be/src/pipeline/exec/jdbc_scan_operator.h index 2acf5b5ec9d..825e01acc2a 100644 --- a/be/src/pipeline/exec/jdbc_scan_operator.h +++ b/be/src/pipeline/exec/jdbc_scan_operator.h @@ -54,7 +54,7 @@ private: class JDBCScanOperatorX final : public ScanOperatorX<JDBCScanLocalState> { public: JDBCScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs); + const DescriptorTbl& descs, int parallel_tasks); private: friend class JDBCScanLocalState; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 1e9c4da1e82..c751c167d0c 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -491,8 +491,8 @@ void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo& upda } OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs) - : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs), + const DescriptorTbl& descs, int parallel_tasks) + : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), _olap_scan_node(tnode.olap_scan_node) { _output_tuple_id = tnode.olap_scan_node.tuple_id; _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 868d3efe555..f1db77b2054 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -188,7 +188,7 @@ private: class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> { public: OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs); + const DescriptorTbl& descs, int parallel_tasks); private: friend class OlapScanLocalState; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 482d71c6265..d559cccdad9 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -117,13 +117,13 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); + auto& p = _parent->cast<typename Derived::Parent>(); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); _scan_dependency = ScanDependency::create_shared(PipelineXLocalState<>::_parent->operator_id(), PipelineXLocalState<>::_parent->node_id(), state->get_query_ctx()); - auto& p = _parent->cast<typename Derived::Parent>(); set_scan_ranges(state, info.scan_ranges); _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { @@ -1362,9 +1362,11 @@ void ScanLocalState<Derived>::get_cast_types_for_variants() { template <typename LocalStateType> ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, - int operator_id, const DescriptorTbl& descs) + int operator_id, const DescriptorTbl& descs, + int parallel_tasks) : OperatorX<LocalStateType>(pool, tnode, operator_id, descs), - _runtime_filter_descs(tnode.runtime_filters) { + _runtime_filter_descs(tnode.runtime_filters), + _parallel_tasks(parallel_tasks) { if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { // Which means the request could be fullfilled in a single segment iterator request. if (tnode.limit > 0 && tnode.limit < 1024) { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 18b5cebd6c1..9eda1be1692 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -429,8 +429,9 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } bool need_to_local_shuffle() const override { - // If _col_distribute_ids is not empty, we prefer to not do local shuffle. - return _col_distribute_ids.empty(); + // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. + // 2. `ignore_data_distribution()` returns true means we ignore the distribution. + return _col_distribute_ids.empty() || OperatorX<LocalStateType>::ignore_data_distribution(); } bool is_bucket_shuffle_scan() const override { return !_col_distribute_ids.empty(); } @@ -443,7 +444,7 @@ public: protected: using LocalState = LocalStateType; ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, - const DescriptorTbl& descs); + const DescriptorTbl& descs, int parallel_tasks = 0); virtual ~ScanOperatorX() = default; template <typename Derived> friend class ScanLocalState; @@ -479,6 +480,7 @@ protected: // Record the value of the aggregate function 'count' from doris's be int64_t _push_down_count = -1; + const int _parallel_tasks = 0; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 5d623f899aa..832a3f51e3c 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -144,6 +144,19 @@ public: void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; } int num_tasks() const { return _num_tasks; } + std::string debug_string() { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, + "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: {}, " + "_need_to_local_shuffle: {}]", + _pipeline_id, _num_tasks, _num_tasks_created, _need_to_local_shuffle); + for (size_t i = 0; i < operatorXs.size(); i++) { + fmt::format_to(debug_string_buffer, "\n{}", operatorXs[i]->debug_string(i)); + } + fmt::format_to(debug_string_buffer, "\n{}", _sink_x->debug_string(operatorXs.size())); + return fmt::to_string(debug_string_buffer); + } + private: void _init_profile(); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index ecf0c8188f4..70698a58d2f 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -577,6 +577,7 @@ enum class ExchangeType : uint8_t { HASH_SHUFFLE = 1, PASSTHROUGH = 2, BUCKET_HASH_SHUFFLE = 3, + BROADCAST = 4, }; inline std::string get_exchange_type_name(ExchangeType idx) { @@ -589,6 +590,8 @@ inline std::string get_exchange_type_name(ExchangeType idx) { return "PASSTHROUGH"; case ExchangeType::BUCKET_HASH_SHUFFLE: return "BUCKET_HASH_SHUFFLE"; + case ExchangeType::BROADCAST: + return "BROADCAST"; } LOG(FATAL) << "__builtin_unreachable"; __builtin_unreachable(); @@ -623,16 +626,28 @@ public: dep->set_ready(); } - void add_mem_usage(int channel_id, size_t delta) { + void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) { mem_trackers[channel_id]->consume(delta); + if (update_total_mem_usage) { + add_total_mem_usage(delta); + } + } + + void sub_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) { + mem_trackers[channel_id]->release(delta); + if (update_total_mem_usage) { + sub_total_mem_usage(delta); + } + } + + void add_total_mem_usage(size_t delta) { if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) { sink_dependency->block(); } } - void sub_mem_usage(int channel_id, size_t delta) { - mem_trackers[channel_id]->release(delta); - if (mem_usage.fetch_sub(delta) < config::local_exchange_buffer_mem_limit) { + void sub_total_mem_usage(size_t delta) { + if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) { sink_dependency->set_ready(); } } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 2be71e5847c..01b5f9d0999 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -33,6 +33,7 @@ public: class Exchanger; class ShuffleExchanger; class PassthroughExchanger; +class BroadcastExchanger; class LocalExchangeSinkOperatorX; class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalExchangeSinkDependency> { @@ -53,6 +54,7 @@ private: friend class ShuffleExchanger; friend class BucketShuffleExchanger; friend class PassthroughExchanger; + friend class BroadcastExchanger; Exchanger* _exchanger = nullptr; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 1f87b86ebe2..0db2db29439 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -35,6 +35,7 @@ public: class Exchanger; class ShuffleExchanger; class PassthroughExchanger; +class BroadcastExchanger; class LocalExchangeSourceOperatorX; class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExchangeSourceDependency> { @@ -50,6 +51,7 @@ private: friend class LocalExchangeSourceOperatorX; friend class ShuffleExchanger; friend class PassthroughExchanger; + friend class BroadcastExchanger; Exchanger* _exchanger = nullptr; int _channel_id; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 86e1b3866fe..4e637b111b3 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -49,8 +49,12 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block do { const auto* offset_start = &(( *std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]); - mutable_block->add_rows(partitioned_block.first.get(), offset_start, + auto block_wrapper = partitioned_block.first; + local_state._shared_state->sub_mem_usage( + local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false); + mutable_block->add_rows(&block_wrapper->data_block, offset_start, offset_start + std::get<2>(partitioned_block.second)); + block_wrapper->unref(local_state._shared_state); } while (mutable_block->rows() < state->batch_size() && _data_queue[local_state._channel_id].try_dequeue(partitioned_block)); *result_block = mutable_block->to_block(); @@ -58,8 +62,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = - vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + mutable_block = vectorized::MutableBlock::create_unique( + partitioned_block.first->data_block.clone_empty()); get_data(block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); @@ -67,8 +71,8 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block } } else if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { SCOPED_TIMER(local_state._copy_data_timer); - mutable_block = - vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty()); + mutable_block = vectorized::MutableBlock::create_unique( + partitioned_block.first->data_block.clone_empty()); get_data(block); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); @@ -98,15 +102,45 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest local_state._partition_rows_histogram[channel_ids[i]]--; } } - auto new_block = vectorized::Block::create_shared(block->clone_empty()); - new_block->swap(*block); + + vectorized::Block data_block; + std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper; + if (_free_blocks.try_enqueue(data_block)) { + new_block_wrapper = ShuffleBlockWrapper::create_shared(std::move(data_block)); + } else { + new_block_wrapper = ShuffleBlockWrapper::create_shared(block->clone_empty()); + } + + new_block_wrapper->data_block.swap(*block); + if (new_block_wrapper->data_block.empty()) { + return Status::OK(); + } + local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes()); + new_block_wrapper->ref(_num_partitions); if (get_type() == ExchangeType::HASH_SHUFFLE) { for (size_t i = 0; i < _num_partitions; i++) { size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { - data_queue[i].enqueue({new_block, {row_idx, start, size}}); + local_state._shared_state->add_mem_usage( + i, new_block_wrapper->data_block.allocated_bytes(), false); + data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}}); + local_state._shared_state->set_ready_to_read(i); + } else { + new_block_wrapper->unref(local_state._shared_state); + } + } + } else if (_num_senders != _num_sources) { + for (size_t i = 0; i < _num_partitions; i++) { + size_t start = local_state._partition_rows_histogram[i]; + size_t size = local_state._partition_rows_histogram[i + 1] - start; + if (size > 0) { + local_state._shared_state->add_mem_usage( + i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); + data_queue[i % _num_sources].enqueue({new_block_wrapper, {row_idx, start, size}}); local_state._shared_state->set_ready_to_read(i); + } else { + new_block_wrapper->unref(local_state._shared_state); } } } else { @@ -116,8 +150,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest size_t start = local_state._partition_rows_histogram[i]; size_t size = local_state._partition_rows_histogram[i + 1] - start; if (size > 0) { - data_queue[map[i]].enqueue({new_block, {row_idx, start, size}}); + local_state._shared_state->add_mem_usage( + map[i], new_block_wrapper->data_block.allocated_bytes(), false); + data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}}); local_state._shared_state->set_ready_to_read(map[i]); + } else { + new_block_wrapper->unref(local_state._shared_state); } } } @@ -128,10 +166,13 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, LocalExchangeSinkLocalState& local_state) { - vectorized::Block new_block(in_block->clone_empty()); + vectorized::Block new_block; + if (!_free_blocks.try_dequeue(new_block)) { + new_block = {in_block->clone_empty()}; + } new_block.swap(*in_block); auto channel_id = (local_state._channel_id++) % _num_partitions; - local_state._shared_state->add_mem_usage(channel_id, new_block.bytes()); + local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes()); _data_queue[channel_id].enqueue(std::move(new_block)); local_state._shared_state->set_ready_to_read(channel_id); @@ -144,15 +185,53 @@ Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* b vectorized::Block next_block; if (_running_sink_operators == 0) { if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { - *block = std::move(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->bytes()); + block->swap(next_block); + _free_blocks.enqueue(std::move(next_block)); + local_state._shared_state->sub_mem_usage(local_state._channel_id, + block->allocated_bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); source_state = SourceState::FINISHED; } } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + block->swap(next_block); + _free_blocks.enqueue(std::move(next_block)); + local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return Status::OK(); +} + +Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block, + SourceState source_state, + LocalExchangeSinkLocalState& local_state) { + vectorized::Block new_block(in_block->clone_empty()); + new_block.swap(*in_block); + _data_queue[0].enqueue(std::move(new_block)); + local_state._shared_state->set_ready_to_read(0); + + return Status::OK(); +} + +Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, + SourceState& source_state, + LocalExchangeSourceLocalState& local_state) { + if (local_state._channel_id != 0) { + source_state = SourceState::FINISHED; + return Status::OK(); + } + vectorized::Block next_block; + if (_running_sink_operators == 0) { + if (_data_queue[0].try_dequeue(next_block)) { + *block = std::move(next_block); + } else { + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + source_state = SourceState::FINISHED; + } + } else if (_data_queue[0].try_dequeue(next_block)) { *block = std::move(next_block); - local_state._shared_state->sub_mem_usage(local_state._channel_id, block->bytes()); } else { COUNTER_UPDATE(local_state._get_block_failed_counter, 1); local_state._dependency->block(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h index 8c28469b50d..a5ce907a137 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h @@ -24,11 +24,20 @@ namespace doris::pipeline { class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; +struct ShuffleBlockWrapper; class Exchanger { public: Exchanger(int running_sink_operators, int num_partitions) - : _running_sink_operators(running_sink_operators), _num_partitions(num_partitions) {} + : _running_sink_operators(running_sink_operators), + _num_partitions(num_partitions), + _num_senders(running_sink_operators), + _num_sources(num_partitions) {} + Exchanger(int running_sink_operators, int num_sources, int num_partitions) + : _running_sink_operators(running_sink_operators), + _num_partitions(num_partitions), + _num_senders(running_sink_operators), + _num_sources(num_sources) {} virtual ~Exchanger() = default; virtual Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, @@ -40,16 +49,35 @@ public: protected: friend struct LocalExchangeSourceDependency; friend struct LocalExchangeSharedState; + friend struct ShuffleBlockWrapper; std::atomic<int> _running_sink_operators = 0; const int _num_partitions; + const int _num_senders; + const int _num_sources; + moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks; }; class LocalExchangeSourceLocalState; class LocalExchangeSinkLocalState; +struct ShuffleBlockWrapper { + ENABLE_FACTORY_CREATOR(ShuffleBlockWrapper); + ShuffleBlockWrapper(vectorized::Block&& data_block_) : data_block(std::move(data_block_)) {} + void ref(int delta) { ref_count += delta; } + void unref(LocalExchangeSharedState* shared_state) { + if (ref_count.fetch_sub(1) == 1) { + shared_state->sub_total_mem_usage(data_block.allocated_bytes()); + data_block.clear_column_data(); + shared_state->exchanger->_free_blocks.enqueue(std::move(data_block)); + } + } + std::atomic<int> ref_count = 0; + vectorized::Block data_block; +}; + class ShuffleExchanger : public Exchanger { using PartitionedBlock = - std::pair<std::shared_ptr<vectorized::Block>, + std::pair<std::shared_ptr<ShuffleBlockWrapper>, std::tuple<std::shared_ptr<std::vector<uint32_t>>, size_t, size_t>>; public: @@ -58,6 +86,10 @@ public: : Exchanger(running_sink_operators, num_partitions) { _data_queue.resize(num_partitions); } + ShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions) + : Exchanger(running_sink_operators, num_sources, num_partitions) { + _data_queue.resize(num_partitions); + } ~ShuffleExchanger() override = default; Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, LocalExchangeSinkLocalState& local_state) override; @@ -76,8 +108,8 @@ protected: class BucketShuffleExchanger : public ShuffleExchanger { ENABLE_FACTORY_CREATOR(BucketShuffleExchanger); - BucketShuffleExchanger(int running_sink_operators, int num_buckets) - : ShuffleExchanger(running_sink_operators, num_buckets) {} + BucketShuffleExchanger(int running_sink_operators, int num_sources, int num_partitions) + : ShuffleExchanger(running_sink_operators, num_sources, num_partitions) {} ~BucketShuffleExchanger() override = default; ExchangeType get_type() const override { return ExchangeType::BUCKET_HASH_SHUFFLE; } }; @@ -101,4 +133,23 @@ private: std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue; }; +class BroadcastExchanger final : public Exchanger { +public: + ENABLE_FACTORY_CREATOR(BroadcastExchanger); + BroadcastExchanger(int running_sink_operators, int num_partitions) + : Exchanger(running_sink_operators, num_partitions) { + _data_queue.resize(num_partitions); + } + ~BroadcastExchanger() override = default; + Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state, + LocalExchangeSinkLocalState& local_state) override; + + Status get_block(RuntimeState* state, vectorized::Block* block, SourceState& source_state, + LocalExchangeSourceLocalState& local_state) override; + ExchangeType get_type() const override { return ExchangeType::BROADCAST; } + +private: + std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue; +}; + } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index c298ad692ae..783b15ac7eb 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -34,13 +34,14 @@ struct LocalStateInfo { std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<LocalExchangeSinkDependency>>> le_state_map; - int task_idx; + const int task_idx; DependencySPtr dependency; }; // This struct is used only for initializing local sink state. struct LocalSinkStateInfo { + const int task_idx; RuntimeProfile* parent_profile = nullptr; const int sender_id; std::vector<DependencySPtr>& dependencys; @@ -180,9 +181,15 @@ public: throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); } [[nodiscard]] std::string get_name() const override { return _op_name; } - virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; - virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; } - virtual ExchangeType get_local_exchange_type() const { return ExchangeType::NOOP; } + [[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0; + [[nodiscard]] virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; } + [[nodiscard]] virtual ExchangeType get_local_exchange_type() const { + return _child_x && _child_x->ignore_data_distribution() && !is_source() + ? ExchangeType::PASSTHROUGH + : ExchangeType::NOOP; + } + [[nodiscard]] bool ignore_data_distribution() const { return _ignore_data_distribution; } + void set_ignore_data_distribution() { _ignore_data_distribution = true; } Status prepare(RuntimeState* state) override; @@ -296,6 +303,7 @@ protected: int64_t _limit; // -1: no limit std::string _op_name; + bool _ignore_data_distribution = false; }; template <typename LocalStateType> @@ -471,7 +479,10 @@ public: virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0; virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; } - virtual ExchangeType get_local_exchange_type() const { return ExchangeType::NOOP; } + virtual ExchangeType get_local_exchange_type() const { + return _child_x && _child_x->ignore_data_distribution() ? ExchangeType::PASSTHROUGH + : ExchangeType::NOOP; + } Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 12fe179eceb..c92d06c5e91 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -228,15 +228,16 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast<void>(root_pipeline->set_sink(_sink)); + for (PipelinePtr& pipeline : _pipelines) { + DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); + static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); + } if (_enable_local_shuffle()) { RETURN_IF_ERROR( _plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx)); } - // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { - DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size(); - static_cast<void>(pipeline->sink_x()->set_child(pipeline->operator_xs().back())); pipeline->children().clear(); RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); } @@ -266,8 +267,11 @@ Status PipelineXFragmentContext::_plan_local_exchange( } } - RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx], - bucket_seq_to_instance_idx)); + RETURN_IF_ERROR(_plan_local_exchange( + _pipelines[pip_idx]->operator_xs().front()->ignore_data_distribution() + ? _num_instances + : num_buckets, + pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx)); } return Status::OK(); } @@ -725,6 +729,12 @@ void PipelineXFragmentContext::_inherit_pipeline_properties(ExchangeType exchang pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); pipe_with_source->set_need_to_local_shuffle(true); break; + case ExchangeType::BROADCAST: + // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make + // sure remaining operators should use local shuffle to make data distribution right. + pipe_with_sink->set_need_to_local_shuffle(pipe_with_source->need_to_local_shuffle()); + pipe_with_source->set_need_to_local_shuffle(true); + break; default: __builtin_unreachable(); } @@ -764,25 +774,29 @@ Status PipelineXFragmentContext::_add_local_exchange( // 3. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(); - shared_state->source_dependencies.resize(_num_instances, nullptr); - shared_state->mem_trackers.resize(_num_instances); switch (exchange_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique(new_pip->num_tasks(), _num_instances); break; case ExchangeType::BUCKET_HASH_SHUFFLE: - shared_state->exchanger = - BucketShuffleExchanger::create_unique(new_pip->num_tasks(), num_buckets); + shared_state->exchanger = BucketShuffleExchanger::create_unique( + new_pip->num_tasks(), _num_instances, num_buckets); break; case ExchangeType::PASSTHROUGH: shared_state->exchanger = PassthroughExchanger::create_unique(new_pip->num_tasks(), _num_instances); break; + case ExchangeType::BROADCAST: + shared_state->exchanger = + BroadcastExchanger::create_unique(new_pip->num_tasks(), _num_instances); + break; default: return Status::InternalError("Unsupported local exchange type : " + std::to_string((int)exchange_type)); } + shared_state->source_dependencies.resize(_num_instances, nullptr); + shared_state->mem_trackers.resize(_num_instances, nullptr); auto sink_dep = std::make_shared<LocalExchangeSinkDependency>(sink_id, local_exchange_id, _runtime_state->get_query_ctx()); sink_dep->set_shared_state(shared_state); @@ -838,6 +852,8 @@ Status PipelineXFragmentContext::_add_local_exchange( } cur_pipe->set_children(new_children); _dag[downstream_pipeline_id] = edges_with_source; + RETURN_IF_ERROR(new_pip->sink_x()->set_child(new_pip->operator_xs().back())); + RETURN_IF_ERROR(cur_pipe->sink_x()->set_child(cur_pipe->operator_xs().back())); CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + new_pip->operator_xs().size()) << "total_op_num: " << total_op_num @@ -855,35 +871,54 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN std::stringstream error_msg; switch (tnode.node_type) { case TPlanNodeType::OLAP_SCAN_NODE: { - op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs)); + op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - const bool shared_scan = - find_with_default(request.per_node_shared_scans, op->node_id(), false); - if (shared_scan) { - cur_pipe->set_num_tasks(1); + if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + } + op->set_ignore_data_distribution(); } break; } case doris::TPlanNodeType::JDBC_SCAN_NODE: { if (config::enable_java_support) { - op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs)); + op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else { return Status::InternalError( "Jdbc scan node is disabled, you can change be config enable_java_support " "to true and restart be."); } + if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + } + op->set_ignore_data_distribution(); + } break; } case doris::TPlanNodeType::FILE_SCAN_NODE: { - op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs)); + op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + } + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::ES_SCAN_NODE: case TPlanNodeType::ES_HTTP_SCAN_NODE: { - op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs)); + op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, _num_instances)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (find_with_default(request.per_node_shared_scans, op->node_id(), false)) { + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + } + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::EXCHANGE_NODE: { @@ -891,6 +926,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DCHECK_GT(num_senders, 0); op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), descs, num_senders)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::AGGREGATION_NODE: { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index cda47527264..9a7c5d99050 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -79,8 +79,12 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams { // set sink local state - LocalSinkStateInfo info {_task_profile.get(), local_params.sender_id, - get_downstream_dependency(), _le_state_map, tsink}; + LocalSinkStateInfo info {_task_idx, + _task_profile.get(), + local_params.sender_id, + get_downstream_dependency(), + _le_state_map, + tsink}; RETURN_IF_ERROR(_sink->setup_local_state(state, info)); } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index a2120c92389..c1ebf2103f8 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -111,7 +111,7 @@ Status RuntimeFilterMgr::get_consume_filters(const int filter_id, Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, int node_id, - bool build_bf_exactly) { + bool build_bf_exactly, int merged_rf_num) { SCOPED_CONSUME_MEM_TRACKER(_tracker.get()); int32_t key = desc.filter_id; diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 5f9ee46d656..2541299b5d8 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -83,7 +83,8 @@ public: // register filter Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, - int node_id, bool build_bf_exactly = false); + int node_id, bool build_bf_exactly = false, + int merged_rf_num = 0); Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options, bool build_bf_exactly = false); diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp b/be/src/vec/exec/runtime_filter_consumer.cpp index 9eda2788f06..d97338da86d 100644 --- a/be/src/vec/exec/runtime_filter_consumer.cpp +++ b/be/src/vec/exec/runtime_filter_consumer.cpp @@ -30,9 +30,9 @@ RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id, _blocked_by_rf = std::make_shared<std::atomic_bool>(false); } -Status RuntimeFilterConsumer::init(RuntimeState* state) { +Status RuntimeFilterConsumer::init(RuntimeState* state, int parallel_tasks) { _state = state; - RETURN_IF_ERROR(_register_runtime_filter()); + RETURN_IF_ERROR(_register_runtime_filter(parallel_tasks)); return Status::OK(); } @@ -45,7 +45,7 @@ void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) { profile->add_info_string("RuntimeFilters: ", ss.str()); } -Status RuntimeFilterConsumer::_register_runtime_filter() { +Status RuntimeFilterConsumer::_register_runtime_filter(int parallel_tasks) { int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.reserve(filter_size); _runtime_filter_ready_flag.reserve(filter_size); diff --git a/be/src/vec/exec/runtime_filter_consumer.h b/be/src/vec/exec/runtime_filter_consumer.h index 3fa822b4b73..00c10cfa9d8 100644 --- a/be/src/vec/exec/runtime_filter_consumer.h +++ b/be/src/vec/exec/runtime_filter_consumer.h @@ -30,7 +30,7 @@ public: const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts); ~RuntimeFilterConsumer() = default; - Status init(RuntimeState* state); + Status init(RuntimeState* state, int parallel_tasks = 0); // Try to append late arrived runtime filters. // Return num of filters which are applied already. @@ -42,7 +42,7 @@ public: protected: // Register and get all runtime filters at Init phase. - Status _register_runtime_filter(); + Status _register_runtime_filter(int parallel_tasks); // Get all arrived runtime filters at Open phase. Status _acquire_runtime_filter(); // Append late-arrival runtime filters to the vconjunct_ctx. diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index 0dc50f20859..53e24df183a 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -105,7 +105,8 @@ Status SharedHashTableController::wait_for_signal(RuntimeState* state, // maybe builder signaled before other instances waiting, // so here need to check value of `signaled` while (!context->signaled) { - _cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return context->signaled; }); + _cv.wait_for(lock, std::chrono::milliseconds(400), + [&]() { return context->signaled.load(); }); // return if the instances is cancelled(eg. query timeout) RETURN_IF_CANCELLED(state); } diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index e1c01709042..b8770e63856 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -60,8 +60,8 @@ struct SharedHashTableContext { std::shared_ptr<void> hash_table_variants; std::shared_ptr<Block> block; std::map<int, SharedRuntimeFilterContext> runtime_filters; - bool signaled {}; - bool short_circuit_for_null_in_probe_side {}; + std::atomic<bool> signaled = false; + bool short_circuit_for_null_in_probe_side = false; }; using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7050323b1af..d85115750d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -863,6 +863,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // Set colocate info in agg node. This is a hint for local shuffling to decide which type of // local exchanger will be used. aggregationNode.setColocate(true); + + if (aggregate.getAggMode().isFinalPhase) { + inputPlanFragment.setHasColocateFinalizeAggNode(true); + } } setPlanRoot(inputPlanFragment, aggregationNode, aggregate); if (aggregate.getStats() != null) { @@ -1134,6 +1138,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla null, null, null, hashJoin.isMarkJoin()); PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin); + + if (joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) { + currentFragment.setHasNullAwareLeftAntiJoin(true); + } if (JoinUtils.shouldColocateJoin(physicalHashJoin)) { // TODO: add reason hashJoinNode.setColocate(true, ""); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0683e54082a..15d83702ff3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1277,9 +1277,14 @@ public class OlapScanNode extends ScanNode { // In pipeline exec engine, the instance num equals be_num * parallel instance. // so here we need count distinct be_num to do the work. make sure get right instance if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + && !ConnectContext.get().getSessionVariable().getEnablePipelineXEngine() && ConnectContext.get().getSessionVariable().getEnableSharedScan()) { return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); } + if (ConnectContext.get().getSessionVariable().getEnablePipelineXEngine() + && ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } return scanRangeLocations.size(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index df4aa499feb..c128a2ecec7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -149,6 +149,10 @@ public class PlanFragment extends TreeNode<PlanFragment> { // has colocate plan node private boolean hasColocatePlanNode = false; + private boolean hasColocateFinalizeAggNode = false; + + private boolean hasNullAwareLeftAntiJoin = false; + private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; /** @@ -470,4 +474,20 @@ public class PlanFragment extends TreeNode<PlanFragment> { public void setBucketNum(int bucketNum) { this.bucketNum = bucketNum; } + + public boolean isHasColocateFinalizeAggNode() { + return hasColocateFinalizeAggNode; + } + + public void setHasColocateFinalizeAggNode(boolean hasColocateFinalizeAggNode) { + this.hasColocateFinalizeAggNode = hasColocateFinalizeAggNode; + } + + public boolean isHasNullAwareLeftAntiJoin() { + return hasNullAwareLeftAntiJoin; + } + + public void setHasNullAwareLeftAntiJoin(boolean hasNullAwareLeftAntiJoin) { + this.hasNullAwareLeftAntiJoin = hasNullAwareLeftAntiJoin; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 0559600f920..6eece9aa545 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -43,6 +43,7 @@ import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.planner.external.FileScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; @@ -711,7 +712,20 @@ public abstract class ScanNode extends PlanNode { // 1. is key search // 2. session variable not enable_shared_scan public boolean shouldDisableSharedScan(ConnectContext context) { - return isKeySearch() || !context.getSessionVariable().getEnableSharedScan(); + return isKeySearch() || context == null + || !context.getSessionVariable().getEnableSharedScan() + || !context.getSessionVariable().getEnablePipelineEngine() + || context.getSessionVariable().getEnablePipelineXEngine() + || this instanceof FileScanNode + || getShouldColoScan(); + } + + public boolean ignoreScanDistribution(ConnectContext context) { + return !isKeySearch() && context != null + && context.getSessionVariable().isIgnoreScanDistribution() + && context.getSessionVariable().getEnablePipelineXEngine() + && !fragment.isHasColocateFinalizeAggNode() + && !fragment.isHasNullAwareLeftAntiJoin(); } public boolean haveLimitAndConjunts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index ba1499fae11..3c8dd8c1dc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -34,6 +34,7 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; @@ -45,7 +46,6 @@ import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.MultiCastPlanFragment; import org.apache.doris.planner.OlapScanNode; -import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNode; @@ -60,7 +60,6 @@ import org.apache.doris.planner.SetOperationNode; import org.apache.doris.planner.UnionNode; import org.apache.doris.planner.external.ExternalScanNode; import org.apache.doris.planner.external.FileQueryScanNode; -import org.apache.doris.planner.external.FileScanNode; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.InternalService.PExecPlanFragmentResult; import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest; @@ -247,6 +246,7 @@ public class Coordinator implements CoordInterface { private boolean enablePipelineEngine = false; private boolean enablePipelineXEngine = false; + private boolean useNereids = false; // Runtime filter merge instance address and ID public TNetworkAddress runtimeFilterMergeAddr; @@ -307,7 +307,8 @@ public class Coordinator implements CoordInterface { && (fragments.size() > 0); initQueryOptions(context); - if (planner instanceof OriginalPlanner) { + useNereids = planner instanceof NereidsPlanner; + if (!useNereids) { // Enable local shuffle on pipelineX engine only if Nereids planner is applied. queryOptions.setEnableLocalShuffle(false); } @@ -1584,7 +1585,10 @@ public class Coordinator implements CoordInterface { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) { + int parallelTasksNum = destParams.ignoreDataDistribution + ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); + for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1692,7 +1696,10 @@ public class Coordinator implements CoordInterface { dest.server = dummyServer; dest.setBrpcServer(dummyServer); - for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) { + int parallelTasksNum = destParams.ignoreDataDistribution + ? destParams.parallelTasksNum : destParams.instanceExecParams.size(); + for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) { + FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx); if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { dest.fragment_instance_id = instanceExecParams.instanceId; dest.server = toRpcHost(instanceExecParams.host); @@ -1989,7 +1996,6 @@ public class Coordinator implements CoordInterface { for (Integer planNodeId : value.keySet()) { List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId); List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList(); - List<Boolean> sharedScanOpts = Lists.newArrayList(); Optional<ScanNode> node = scanNodes.stream().filter(scanNode -> { return scanNode.getId().asInt() == planNodeId; @@ -2000,9 +2006,20 @@ public class Coordinator implements CoordInterface { // 2. This fragment has a colocated scan node // 3. This fragment has a FileScanNode // 4. Disable shared scan optimization by session variable - if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan()) - || (node.isPresent() && node.get() instanceof FileScanNode) - || (node.isPresent() && node.get().shouldDisableSharedScan(context))) { + boolean sharedScan = true; + if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) + || (node.get().ignoreScanDistribution(context) && useNereids))) { + int expectedInstanceNum = Math.min(parallelExecInstanceNum, + leftMostNode.getNumInstances()); + expectedInstanceNum = Math.max(expectedInstanceNum, 1); + // if have limit and conjunts, only need 1 instance to save cpu and + // mem resource + if (node.get().haveLimitAndConjunts()) { + expectedInstanceNum = 1; + } + + perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges); + } else { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num @@ -2010,38 +2027,27 @@ public class Coordinator implements CoordInterface { } // if have limit and conjunts, only need 1 instance to save cpu and // mem resource - if (node.isPresent() && node.get().haveLimitAndConjunts()) { + if (node.get().haveLimitAndConjunts()) { expectedInstanceNum = 1; } perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); - sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false); - } else { - int expectedInstanceNum = Math.min(parallelExecInstanceNum, - leftMostNode.getNumInstances()); - expectedInstanceNum = Math.max(expectedInstanceNum, 1); - // if have limit and conjunts, only need 1 instance to save cpu and - // mem resource - if (node.isPresent() && node.get().haveLimitAndConjunts()) { - expectedInstanceNum = 1; - } - - perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges); - sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true); + sharedScan = false; } LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size()); for (int j = 0; j < perInstanceScanRanges.size(); j++) { List<TScanRangeParams> scanRangeParams = perInstanceScanRanges.get(j); - boolean sharedScan = sharedScanOpts.get(j); FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params); instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams); instanceParam.perNodeSharedScans.put(planNodeId, sharedScan); params.instanceExecParams.add(instanceParam); } + params.ignoreDataDistribution = sharedScan; + params.parallelTasksNum = sharedScan ? 1 : params.instanceExecParams.size(); } } } @@ -2156,74 +2162,8 @@ public class Coordinator implements CoordInterface { private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { - Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); - BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(fragmentId); - Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); - - // 1. count each node in one fragment should scan how many tablet, gather them in one list - Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges - = Maps.newHashMap(); - for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) { - TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); - Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue(); - - // We only care about the node scan ranges of scan nodes which belong to this fragment - Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap(); - for (Integer scanNodeId : nodeScanRanges.keySet()) { - if (scanNodeIds.contains(scanNodeId)) { - filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId)); - } - } - - // set bucket for scanRange, the pair is <bucket_num, map<scanNode_id, list<scanRange>>>> - // we should make sure - // 1. same bucket in some address be - // 2. different scanNode id scan different scanRange which belong to the scanNode id - // 3. split how many scanRange one instance should scan, same bucket do not split to different instance - Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges - = Pair.of(scanRanges.getKey(), filteredNodeScanRanges); - - if (!addressToScanRanges.containsKey(address)) { - addressToScanRanges.put(address, Lists.newArrayList()); - } - addressToScanRanges.get(address).add(filteredScanRanges); - } - FragmentScanRangeAssignment assignment = params.scanRangeAssignment; - for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange - : addressToScanRanges.entrySet()) { - List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue(); - Map<Integer, List<TScanRangeParams>> range - = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>()); - int expectedInstanceNum = 1; - if (parallelExecInstanceNum > 1) { - //the scan instance num should not larger than the tablets num - expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum); - } - - // 2.split how many scanRange one instance should scan - List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges - = ListUtil.splitBySize(scanRange, expectedInstanceNum); - - // 3.construct instanceExecParam add the scanRange should be scan by instance - for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange - : perInstanceScanRanges) { - FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params); - - for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) { - instanceParam.bucketSeqSet.add(nodeScanRangeMap.first); - for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange - : nodeScanRangeMap.second.entrySet()) { - if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { - range.put(nodeScanRange.getKey(), Lists.newArrayList()); - instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); - } - range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); - } - } - params.instanceExecParams.add(instanceParam); - } - } + assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap, + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); } private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() { @@ -2882,44 +2822,81 @@ public class Coordinator implements CoordInterface { private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { - Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId); - BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); - Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); - - // 1. count each node in one fragment should scan how many tablet, gather them in one list - Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges - = Maps.newHashMap(); - for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges - : bucketSeqToScanRange.entrySet()) { - TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); - Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue(); - // We only care about the node scan ranges of scan nodes which belong to this fragment - Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap(); - for (Integer scanNodeId : nodeScanRanges.keySet()) { - if (scanNodeIds.contains(scanNodeId)) { - filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId)); - } - } - Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges - = Pair.of(scanRanges.getKey(), filteredNodeScanRanges); + assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, + fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds); + } + } + + private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params, + Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap, + Map<PlanFragmentId, Map<Integer, TNetworkAddress>> curFragmentIdToSeqToAddressMap, + Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) { + Map<Integer, TNetworkAddress> bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId); + Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId); + + boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> { + return scanNodeIds.contains(scanNode.getId().asInt()); + }).allMatch(node -> node.ignoreScanDistribution(context)) && useNereids; - if (!addressToScanRanges.containsKey(address)) { - addressToScanRanges.put(address, Lists.newArrayList()); + // 1. count each node in one fragment should scan how many tablet, gather them in one list + Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges + = Maps.newHashMap(); + for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges + : bucketSeqToScanRange.entrySet()) { + TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey()); + Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue(); + // We only care about the node scan ranges of scan nodes which belong to this fragment + Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap(); + for (Integer scanNodeId : nodeScanRanges.keySet()) { + if (scanNodeIds.contains(scanNodeId)) { + filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId)); } - addressToScanRanges.get(address).add(filteredScanRanges); } - FragmentScanRangeAssignment assignment = params.scanRangeAssignment; - for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange - : addressToScanRanges.entrySet()) { - List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue(); - Map<Integer, List<TScanRangeParams>> range - = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>()); + Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges + = Pair.of(scanRanges.getKey(), filteredNodeScanRanges); + + if (!addressToScanRanges.containsKey(address)) { + addressToScanRanges.put(address, Lists.newArrayList()); + } + addressToScanRanges.get(address).add(filteredScanRanges); + } + FragmentScanRangeAssignment assignment = params.scanRangeAssignment; + for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange + : addressToScanRanges.entrySet()) { + List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue(); + Map<Integer, List<TScanRangeParams>> range + = findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>()); + + if (ignoreScanDistribution) { + FInstanceExecParam instanceParam = new FInstanceExecParam( + null, addressScanRange.getKey(), 0, params); + + for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) { + instanceParam.addBucketSeq(nodeScanRangeMap.first); + for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange + : nodeScanRangeMap.second.entrySet()) { + if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) { + range.put(nodeScanRange.getKey(), Lists.newArrayList()); + instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList()); + instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true); + } + range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue()); + instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()) + .addAll(nodeScanRange.getValue()); + } + } + params.instanceExecParams.add(instanceParam); + for (int i = 1; i < parallelExecInstanceNum; i++) { + params.instanceExecParams.add(new FInstanceExecParam( + null, addressScanRange.getKey(), 0, params)); + } + } else { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum); } - // 2. split how many scanRange one instance should scan List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges = ListUtil.splitBySize(scanRange, expectedInstanceNum); @@ -2947,6 +2924,8 @@ public class Coordinator implements CoordInterface { } } } + params.parallelTasksNum = ignoreScanDistribution ? 1 : params.instanceExecParams.size(); + params.ignoreDataDistribution = ignoreScanDistribution; } private final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap(); @@ -3561,6 +3540,8 @@ public class Coordinator implements CoordInterface { // used to assemble TPlanFragmentExecParams protected class FragmentExecParams { public PlanFragment fragment; + public int parallelTasksNum = 0; + public boolean ignoreDataDistribution = false; public List<TPlanFragmentDestination> destinations = Lists.newArrayList(); public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap(); @@ -3705,6 +3686,9 @@ public class Coordinator implements CoordInterface { params.setFileScanParams(fileScanRangeParamsMap); params.setNumBuckets(fragment.getBucketNum()); params.setPerNodeSharedScans(perNodeSharedScans); + if (ignoreDataDistribution) { + params.setParallelInstances(parallelTasksNum); + } res.put(instanceExecParam.host, params); res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>()); instanceIdx.put(instanceExecParam.host, 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index df1f093cfb6..af81911dbaf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -219,6 +219,7 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine"; public static final String ENABLE_SHARED_SCAN = "enable_shared_scan"; + public static final String IGNORE_SCAN_DISTRIBUTION = "ignore_scan_distribution"; public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; @@ -783,6 +784,10 @@ public class SessionVariable implements Serializable, Writable { needForward = true) private boolean enableSharedScan = false; + @VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + needForward = true) + private boolean ignoreScanDistribution = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -3143,4 +3148,12 @@ public class SessionVariable implements Serializable, Writable { public boolean isMaterializedViewRewriteEnableContainForeignTable() { return materializedViewRewriteEnableContainForeignTable; } + + public boolean isIgnoreScanDistribution() { + return ignoreScanDistribution && getEnablePipelineXEngine() && enableLocalShuffle; + } + + public void setIgnoreScanDistribution(boolean ignoreScanDistribution) { + this.ignoreScanDistribution = ignoreScanDistribution; + } } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 177bec22059..14e84c4bdc9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -692,6 +692,7 @@ struct TPipelineFragmentParams { 34: optional i32 num_buckets 35: optional map<i32, i32> bucket_seq_to_instance_idx 36: optional map<Types.TPlanNodeId, bool> per_node_shared_scans + 37: optional i32 parallel_instances } struct TPipelineFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org