This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 378d9e73369350da90f83a7918169565de24443e Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue Jan 30 20:59:39 2024 +0800 [Colo][Scan] delete the colo scan code (#30584) --- be/src/pipeline/exec/olap_scan_operator.cpp | 1 - be/src/pipeline/exec/scan_operator.h | 6 +- be/src/vec/exec/scan/new_olap_scan_node.cpp | 1 - be/src/vec/exec/scan/pip_scanner_context.h | 146 +++------------------ be/src/vec/exec/scan/scanner_context.cpp | 1 - be/src/vec/exec/scan/scanner_context.h | 1 - be/src/vec/exec/scan/vscan_node.cpp | 2 +- be/src/vec/exec/scan/vscan_node.h | 1 - .../apache/doris/planner/DistributedPlanner.java | 8 -- .../org/apache/doris/planner/OlapScanNode.java | 39 ------ .../java/org/apache/doris/planner/PlanNode.java | 5 - .../java/org/apache/doris/qe/SessionVariable.java | 8 -- 12 files changed, 19 insertions(+), 200 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index d1575075b4c..b89ac7333c3 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -568,7 +568,6 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), _olap_scan_node(tnode.olap_scan_node) { _output_tuple_id = tnode.olap_scan_node.tuple_id; - _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids; if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { _limit_per_scanner = _olap_scan_node.sort_limit; } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index ec2d68e7649..8fac0b946ea 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -432,9 +432,8 @@ public: TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } DataDistribution required_data_distribution() const override { - if (_col_distribute_ids.empty() || OperatorX<LocalStateType>::ignore_data_distribution()) { - // 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle. - // 2. `ignore_data_distribution()` returns true means we ignore the distribution. + if (OperatorX<LocalStateType>::ignore_data_distribution()) { + // `ignore_data_distribution()` returns true means we ignore the distribution. return {ExchangeType::NOOP}; } return {ExchangeType::BUCKET_HASH_SHUFFLE}; @@ -477,7 +476,6 @@ protected: // If sort info is set, push limit to each scanner; int64_t _limit_per_scanner = -1; - std::vector<int> _col_distribute_ids; std::vector<TRuntimeFilterDesc> _runtime_filter_descs; TPushAggOp::type _push_down_agg_type; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index e1f39f2948b..558169c775f 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -74,7 +74,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) { _output_tuple_id = tnode.olap_scan_node.tuple_id; - _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids; if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { _limit_per_scanner = _olap_scan_node.sort_limit; } diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 484fe5b4003..62f6f9edb21 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -32,12 +32,10 @@ public: const RowDescriptor* output_row_descriptor, const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, - const std::vector<int>& col_distribute_ids, const int num_parallel_instances) + const int num_parallel_instances) : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_, max_bytes_in_blocks_queue, - num_parallel_instances), - _col_distribute_ids(col_distribute_ids), - _need_colocate_distribute(!_col_distribute_ids.empty()) {} + num_parallel_instances) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id) override { @@ -113,63 +111,23 @@ public: } int64_t local_bytes = 0; - if (_need_colocate_distribute) { - std::vector<uint32_t> hash_vals; - for (const auto& block : blocks) { - auto st = validate_block_schema(block.get()); - if (!st.ok()) { - set_status_on_error(st, false); - } - // vectorized calculate hash - int rows = block->rows(); - const auto element_size = _num_parallel_instances; - hash_vals.resize(rows); - std::fill(hash_vals.begin(), hash_vals.end(), 0); - auto* __restrict hashes = hash_vals.data(); - - for (int j = 0; j < _col_distribute_ids.size(); ++j) { - block->get_by_position(_col_distribute_ids[j]) - .column->update_crcs_with_value( - hash_vals.data(), - _output_tuple_desc->slots()[_col_distribute_ids[j]] - ->type() - .type, - rows); - } - for (int i = 0; i < rows; i++) { - hashes[i] = hashes[i] % element_size; - } - - std::vector<uint32_t> channel2rows[element_size]; - for (uint32_t i = 0; i < rows; i++) { - channel2rows[hashes[i]].emplace_back(i); - } - - for (int i = 0; i < element_size; ++i) { - if (!channel2rows[i].empty()) { - _add_rows_colocate_blocks(block.get(), i, channel2rows[i]); - } - } - } - } else { - for (const auto& block : blocks) { - auto st = validate_block_schema(block.get()); - if (!st.ok()) { - set_status_on_error(st, false); - } - local_bytes += block->allocated_bytes(); + for (const auto& block : blocks) { + auto st = validate_block_schema(block.get()); + if (!st.ok()) { + set_status_on_error(st, false); } + local_bytes += block->allocated_bytes(); + } - for (int i = 0; i < queue_size && i < block_size; ++i) { - int queue = _next_queue_to_feed; - { - std::lock_guard<std::mutex> l(*_queue_mutexs[queue]); - for (int j = i; j < block_size; j += queue_size) { - _blocks_queues[queue].emplace_back(std::move(blocks[j])); - } + for (int i = 0; i < queue_size && i < block_size; ++i) { + int queue = _next_queue_to_feed; + { + std::lock_guard<std::mutex> l(*_queue_mutexs[queue]); + for (int j = i; j < block_size; j += queue_size) { + _blocks_queues[queue].emplace_back(std::move(blocks[j])); } - _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; } + _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0; } _current_used_bytes += local_bytes; } @@ -184,40 +142,7 @@ public: _queue_mutexs.emplace_back(std::make_unique<std::mutex>()); _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>()); } - RETURN_IF_ERROR(ScannerContext::init()); - if (_need_colocate_distribute) { - _init_colocate_block(); - } - return Status::OK(); - } - - void _init_colocate_block() { - int real_block_size = - limit == -1 ? _batch_size : std::min(static_cast<int64_t>(_batch_size), limit); - int64_t free_blocks_memory_usage = 0; - for (int i = 0; i < _num_parallel_instances; ++i) { - auto block = vectorized::Block::create_unique( - _output_tuple_desc->slots(), real_block_size, true /*ignore invalid slots*/); - free_blocks_memory_usage += block->allocated_bytes(); - _colocate_mutable_blocks.emplace_back( - vectorized::MutableBlock::create_unique(block.get())); - _colocate_blocks.emplace_back(std::move(block)); - _colocate_block_mutexs.emplace_back(new std::mutex); - } - _free_blocks_memory_usage->add(free_blocks_memory_usage); - } - - void _dispose_coloate_blocks_not_in_queue() override { - if (_need_colocate_distribute) { - for (int i = 0; i < _num_parallel_instances; ++i) { - std::scoped_lock s(*_colocate_block_mutexs[i], *_queue_mutexs[i]); - if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) { - _current_used_bytes += _colocate_blocks[i]->allocated_bytes(); - _blocks_queues[i].emplace_back(std::move(_colocate_blocks[i])); - _colocate_mutable_blocks[i]->clear(); - } - } - } + return ScannerContext::init(); } std::string debug_string() override { @@ -234,45 +159,6 @@ protected: std::vector<std::unique_ptr<std::mutex>> _queue_mutexs; std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues; std::atomic_int64_t _current_used_bytes = 0; - - const std::vector<int> _col_distribute_ids; - const bool _need_colocate_distribute; - std::vector<vectorized::BlockUPtr> _colocate_blocks; - std::vector<std::unique_ptr<vectorized::MutableBlock>> _colocate_mutable_blocks; - std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs; - - void _add_rows_colocate_blocks(vectorized::Block* block, int loc, - const std::vector<uint32_t>& rows) { - int row_wait_add = rows.size(); - const int batch_size = _batch_size; - const uint32_t* begin = rows.data(); - std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]); - - while (row_wait_add > 0) { - int row_add = 0; - int max_add = batch_size - _colocate_mutable_blocks[loc]->rows(); - if (row_wait_add >= max_add) { - row_add = max_add; - } else { - row_add = row_wait_add; - } - - _colocate_mutable_blocks[loc]->add_rows(block, begin, begin + row_add); - row_wait_add -= row_add; - begin += row_add; - - if (row_add == max_add) { - _current_used_bytes += _colocate_blocks[loc]->allocated_bytes(); - { - std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]); - _blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc])); - } - _colocate_blocks[loc] = get_free_block(); - _colocate_mutable_blocks[loc]->set_mutable_columns( - _colocate_blocks[loc]->mutate_columns()); - } - } - } }; class PipXScannerContext final : public vectorized::ScannerContext { diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 033709f950e..be143b9f729 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -497,7 +497,6 @@ void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel if (scanner->_scanner->need_to_close()) { --_num_unfinished_scanners; if (_num_unfinished_scanners == 0) { - _dispose_coloate_blocks_not_in_queue(); _is_finished = true; _set_scanner_done(); _blocks_queue_added_cv.notify_one(); diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 59e4c45a52a..4d936d72a13 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -178,7 +178,6 @@ protected: int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, std::shared_ptr<pipeline::ScanDependency> dependency); - virtual void _dispose_coloate_blocks_not_in_queue() {} void _set_scanner_done(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 1305062c854..e7bb370375c 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -326,7 +326,7 @@ void VScanNode::_start_scanners(const std::list<std::shared_ptr<ScannerDelegate> int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( _state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, limit(), - _state->scan_queue_mem_limit(), _col_distribute_ids, max_queue_size); + _state->scan_queue_mem_limit(), max_queue_size); } else { _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 1d54df4f4b4..0c39d15b57f 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -391,7 +391,6 @@ protected: RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr; std::unordered_map<std::string, int> _colname_to_slot_id; - std::vector<int> _col_distribute_ids; TPushAggOp::type _push_down_agg_type; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 6f54bd898fb..2586e1147cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -39,7 +39,6 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; @@ -932,13 +931,6 @@ public class DistributedPlanner { childFragment.addPlanRoot(node); childFragment.setHasColocatePlanNode(true); return childFragment; - } else if (SessionVariable.enablePipelineEngine() - && childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo()) - && childFragment.getPlanRoot() instanceof OlapScanNode) { - childFragment.getPlanRoot().setShouldColoScan(); - childFragment.addPlanRoot(node); - childFragment.setHasColocatePlanNode(false); - return childFragment; } else { return createMergeAggregationFragment(node, childFragment); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index ab7553e82d7..5e779324d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -17,7 +17,6 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BaseTableRef; import org.apache.doris.analysis.BinaryPredicate; @@ -1334,44 +1333,6 @@ public class OlapScanNode extends ScanNode { return scanRangeLocations.size(); } - @Override - public boolean shouldColoAgg(AggregateInfo aggregateInfo) { - distributionColumnIds.clear(); - if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine() - && ConnectContext.get().getSessionVariable().enableColocateScan()) { - List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs(); - List<SlotDescriptor> slots = desc.getSlots(); - for (Expr aggExpr : aggPartitionExprs) { - if (aggExpr instanceof SlotRef) { - SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc(); - int columnId = 0; - for (SlotDescriptor slotDescriptor : slots) { - if (slotDescriptor.equals(slotDesc)) { - if (slotDescriptor.getType().isFixedLengthType() - || slotDescriptor.getType().isStringType()) { - distributionColumnIds.add(columnId); - } else { - return false; - } - } - columnId++; - } - } - } - - for (int i = 0; i < slots.size(); i++) { - if (!distributionColumnIds.contains(i) && (!slots.get(i).getType().isFixedLengthType() - || slots.get(i).getType().isStringType())) { - return false; - } - } - - return !distributionColumnIds.isEmpty(); - } else { - return false; - } - } - @Override public void setShouldColoScan() { shouldColoScan = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 24fc40be2e1..f450758834f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -20,7 +20,6 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.AggregateInfo; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BitmapFilterPredicate; import org.apache.doris.analysis.CompoundPredicate; @@ -881,10 +880,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats { return this.children.get(0).getNumInstances(); } - public boolean shouldColoAgg(AggregateInfo aggregateInfo) { - return false; - } - public void setShouldColoScan() {} public boolean getShouldColoScan() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index cef6babc805..e4e23b411cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -114,7 +114,6 @@ public class SessionVariable implements Serializable, Writable { public static final String BATCH_SIZE = "batch_size"; public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations"; public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan"; - public static final String ENABLE_COLOCATE_SCAN = "enable_colocate_scan"; public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join"; public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num"; public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num"; @@ -707,9 +706,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN) public boolean disableColocatePlan = false; - @VariableMgr.VarAttr(name = ENABLE_COLOCATE_SCAN) - public boolean enableColocateScan = false; - @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE) public boolean enableBucketShuffleJoin = true; @@ -2043,10 +2039,6 @@ public class SessionVariable implements Serializable, Writable { return disableColocatePlan; } - public boolean enableColocateScan() { - return enableColocateScan; - } - public boolean isEnableBucketShuffleJoin() { return enableBucketShuffleJoin; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org