This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 378d9e73369350da90f83a7918169565de24443e
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue Jan 30 20:59:39 2024 +0800

    [Colo][Scan] delete the colo scan code (#30584)
---
 be/src/pipeline/exec/olap_scan_operator.cpp        |   1 -
 be/src/pipeline/exec/scan_operator.h               |   6 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   1 -
 be/src/vec/exec/scan/pip_scanner_context.h         | 146 +++------------------
 be/src/vec/exec/scan/scanner_context.cpp           |   1 -
 be/src/vec/exec/scan/scanner_context.h             |   1 -
 be/src/vec/exec/scan/vscan_node.cpp                |   2 +-
 be/src/vec/exec/scan/vscan_node.h                  |   1 -
 .../apache/doris/planner/DistributedPlanner.java   |   8 --
 .../org/apache/doris/planner/OlapScanNode.java     |  39 ------
 .../java/org/apache/doris/planner/PlanNode.java    |   5 -
 .../java/org/apache/doris/qe/SessionVariable.java  |   8 --
 12 files changed, 19 insertions(+), 200 deletions(-)

diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index d1575075b4c..b89ac7333c3 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -568,7 +568,6 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, 
const TPlanNode& tnode, i
         : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, 
parallel_tasks),
           _olap_scan_node(tnode.olap_scan_node) {
     _output_tuple_id = tnode.olap_scan_node.tuple_id;
-    _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
     if (_olap_scan_node.__isset.sort_info && 
_olap_scan_node.__isset.sort_limit) {
         _limit_per_scanner = _olap_scan_node.sort_limit;
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index ec2d68e7649..8fac0b946ea 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -432,9 +432,8 @@ public:
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
 
     DataDistribution required_data_distribution() const override {
-        if (_col_distribute_ids.empty() || 
OperatorX<LocalStateType>::ignore_data_distribution()) {
-            // 1. `_col_distribute_ids` is empty means storage distribution is 
not effective, so we prefer to do local shuffle.
-            // 2. `ignore_data_distribution()` returns true means we ignore 
the distribution.
+        if (OperatorX<LocalStateType>::ignore_data_distribution()) {
+            // `ignore_data_distribution()` returns true means we ignore the 
distribution.
             return {ExchangeType::NOOP};
         }
         return {ExchangeType::BUCKET_HASH_SHUFFLE};
@@ -477,7 +476,6 @@ protected:
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
 
-    std::vector<int> _col_distribute_ids;
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 
     TPushAggOp::type _push_down_agg_type;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index e1f39f2948b..558169c775f 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -74,7 +74,6 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const 
TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : VScanNode(pool, tnode, descs), _olap_scan_node(tnode.olap_scan_node) 
{
     _output_tuple_id = tnode.olap_scan_node.tuple_id;
-    _col_distribute_ids = tnode.olap_scan_node.distribute_column_ids;
     if (_olap_scan_node.__isset.sort_info && 
_olap_scan_node.__isset.sort_limit) {
         _limit_per_scanner = _olap_scan_node.sort_limit;
     }
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 484fe5b4003..62f6f9edb21 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -32,12 +32,10 @@ public:
                       const RowDescriptor* output_row_descriptor,
                       const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                       int64_t limit_, int64_t max_bytes_in_blocks_queue,
-                      const std::vector<int>& col_distribute_ids, const int 
num_parallel_instances)
+                      const int num_parallel_instances)
             : vectorized::ScannerContext(state, parent, output_tuple_desc, 
output_row_descriptor,
                                          scanners, limit_, 
max_bytes_in_blocks_queue,
-                                         num_parallel_instances),
-              _col_distribute_ids(col_distribute_ids),
-              _need_colocate_distribute(!_col_distribute_ids.empty()) {}
+                                         num_parallel_instances) {}
 
     Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
                                 int id) override {
@@ -113,63 +111,23 @@ public:
         }
         int64_t local_bytes = 0;
 
-        if (_need_colocate_distribute) {
-            std::vector<uint32_t> hash_vals;
-            for (const auto& block : blocks) {
-                auto st = validate_block_schema(block.get());
-                if (!st.ok()) {
-                    set_status_on_error(st, false);
-                }
-                // vectorized calculate hash
-                int rows = block->rows();
-                const auto element_size = _num_parallel_instances;
-                hash_vals.resize(rows);
-                std::fill(hash_vals.begin(), hash_vals.end(), 0);
-                auto* __restrict hashes = hash_vals.data();
-
-                for (int j = 0; j < _col_distribute_ids.size(); ++j) {
-                    block->get_by_position(_col_distribute_ids[j])
-                            .column->update_crcs_with_value(
-                                    hash_vals.data(),
-                                    
_output_tuple_desc->slots()[_col_distribute_ids[j]]
-                                            ->type()
-                                            .type,
-                                    rows);
-                }
-                for (int i = 0; i < rows; i++) {
-                    hashes[i] = hashes[i] % element_size;
-                }
-
-                std::vector<uint32_t> channel2rows[element_size];
-                for (uint32_t i = 0; i < rows; i++) {
-                    channel2rows[hashes[i]].emplace_back(i);
-                }
-
-                for (int i = 0; i < element_size; ++i) {
-                    if (!channel2rows[i].empty()) {
-                        _add_rows_colocate_blocks(block.get(), i, 
channel2rows[i]);
-                    }
-                }
-            }
-        } else {
-            for (const auto& block : blocks) {
-                auto st = validate_block_schema(block.get());
-                if (!st.ok()) {
-                    set_status_on_error(st, false);
-                }
-                local_bytes += block->allocated_bytes();
+        for (const auto& block : blocks) {
+            auto st = validate_block_schema(block.get());
+            if (!st.ok()) {
+                set_status_on_error(st, false);
             }
+            local_bytes += block->allocated_bytes();
+        }
 
-            for (int i = 0; i < queue_size && i < block_size; ++i) {
-                int queue = _next_queue_to_feed;
-                {
-                    std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
-                    for (int j = i; j < block_size; j += queue_size) {
-                        
_blocks_queues[queue].emplace_back(std::move(blocks[j]));
-                    }
+        for (int i = 0; i < queue_size && i < block_size; ++i) {
+            int queue = _next_queue_to_feed;
+            {
+                std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
+                for (int j = i; j < block_size; j += queue_size) {
+                    _blocks_queues[queue].emplace_back(std::move(blocks[j]));
                 }
-                _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
             }
+            _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
         }
         _current_used_bytes += local_bytes;
     }
@@ -184,40 +142,7 @@ public:
             _queue_mutexs.emplace_back(std::make_unique<std::mutex>());
             _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
         }
-        RETURN_IF_ERROR(ScannerContext::init());
-        if (_need_colocate_distribute) {
-            _init_colocate_block();
-        }
-        return Status::OK();
-    }
-
-    void _init_colocate_block() {
-        int real_block_size =
-                limit == -1 ? _batch_size : 
std::min(static_cast<int64_t>(_batch_size), limit);
-        int64_t free_blocks_memory_usage = 0;
-        for (int i = 0; i < _num_parallel_instances; ++i) {
-            auto block = vectorized::Block::create_unique(
-                    _output_tuple_desc->slots(), real_block_size, true 
/*ignore invalid slots*/);
-            free_blocks_memory_usage += block->allocated_bytes();
-            _colocate_mutable_blocks.emplace_back(
-                    vectorized::MutableBlock::create_unique(block.get()));
-            _colocate_blocks.emplace_back(std::move(block));
-            _colocate_block_mutexs.emplace_back(new std::mutex);
-        }
-        _free_blocks_memory_usage->add(free_blocks_memory_usage);
-    }
-
-    void _dispose_coloate_blocks_not_in_queue() override {
-        if (_need_colocate_distribute) {
-            for (int i = 0; i < _num_parallel_instances; ++i) {
-                std::scoped_lock s(*_colocate_block_mutexs[i], 
*_queue_mutexs[i]);
-                if (_colocate_blocks[i] && !_colocate_blocks[i]->empty()) {
-                    _current_used_bytes += 
_colocate_blocks[i]->allocated_bytes();
-                    
_blocks_queues[i].emplace_back(std::move(_colocate_blocks[i]));
-                    _colocate_mutable_blocks[i]->clear();
-                }
-            }
-        }
+        return ScannerContext::init();
     }
 
     std::string debug_string() override {
@@ -234,45 +159,6 @@ protected:
     std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
     std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
     std::atomic_int64_t _current_used_bytes = 0;
-
-    const std::vector<int> _col_distribute_ids;
-    const bool _need_colocate_distribute;
-    std::vector<vectorized::BlockUPtr> _colocate_blocks;
-    std::vector<std::unique_ptr<vectorized::MutableBlock>> 
_colocate_mutable_blocks;
-    std::vector<std::unique_ptr<std::mutex>> _colocate_block_mutexs;
-
-    void _add_rows_colocate_blocks(vectorized::Block* block, int loc,
-                                   const std::vector<uint32_t>& rows) {
-        int row_wait_add = rows.size();
-        const int batch_size = _batch_size;
-        const uint32_t* begin = rows.data();
-        std::lock_guard<std::mutex> l(*_colocate_block_mutexs[loc]);
-
-        while (row_wait_add > 0) {
-            int row_add = 0;
-            int max_add = batch_size - _colocate_mutable_blocks[loc]->rows();
-            if (row_wait_add >= max_add) {
-                row_add = max_add;
-            } else {
-                row_add = row_wait_add;
-            }
-
-            _colocate_mutable_blocks[loc]->add_rows(block, begin, begin + 
row_add);
-            row_wait_add -= row_add;
-            begin += row_add;
-
-            if (row_add == max_add) {
-                _current_used_bytes += 
_colocate_blocks[loc]->allocated_bytes();
-                {
-                    std::lock_guard<std::mutex> queue_l(*_queue_mutexs[loc]);
-                    
_blocks_queues[loc].emplace_back(std::move(_colocate_blocks[loc]));
-                }
-                _colocate_blocks[loc] = get_free_block();
-                _colocate_mutable_blocks[loc]->set_mutable_columns(
-                        _colocate_blocks[loc]->mutate_columns());
-            }
-        }
-    }
 };
 
 class PipXScannerContext final : public vectorized::ScannerContext {
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 033709f950e..be143b9f729 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -497,7 +497,6 @@ void 
ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr<ScannerDel
         if (scanner->_scanner->need_to_close()) {
             --_num_unfinished_scanners;
             if (_num_unfinished_scanners == 0) {
-                _dispose_coloate_blocks_not_in_queue();
                 _is_finished = true;
                 _set_scanner_done();
                 _blocks_queue_added_cv.notify_one();
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 59e4c45a52a..4d936d72a13 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -178,7 +178,6 @@ protected:
                    int64_t max_bytes_in_blocks_queue_, const int 
num_parallel_instances,
                    pipeline::ScanLocalStateBase* local_state,
                    std::shared_ptr<pipeline::ScanDependency> dependency);
-    virtual void _dispose_coloate_blocks_not_in_queue() {}
 
     void _set_scanner_done();
 
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 1305062c854..e7bb370375c 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -326,7 +326,7 @@ void VScanNode::_start_scanners(const 
std::list<std::shared_ptr<ScannerDelegate>
         int max_queue_size = _shared_scan_opt ? 
std::max(query_parallel_instance_num, 1) : 1;
         _scanner_ctx = pipeline::PipScannerContext::create_shared(
                 _state, this, _output_tuple_desc, 
_output_row_descriptor.get(), scanners, limit(),
-                _state->scan_queue_mem_limit(), _col_distribute_ids, 
max_queue_size);
+                _state->scan_queue_mem_limit(), max_queue_size);
     } else {
         _scanner_ctx = ScannerContext::create_shared(_state, this, 
_output_tuple_desc,
                                                      
_output_row_descriptor.get(), scanners,
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 1d54df4f4b4..0c39d15b57f 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -391,7 +391,6 @@ protected:
     RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
 
     std::unordered_map<std::string, int> _colname_to_slot_id;
-    std::vector<int> _col_distribute_ids;
 
     TPushAggOp::type _push_down_agg_type;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 6f54bd898fb..2586e1147cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -39,7 +39,6 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TPartitionType;
 
 import com.google.common.base.Preconditions;
@@ -932,13 +931,6 @@ public class DistributedPlanner {
                 childFragment.addPlanRoot(node);
                 childFragment.setHasColocatePlanNode(true);
                 return childFragment;
-            } else if (SessionVariable.enablePipelineEngine()
-                    && 
childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())
-                    && childFragment.getPlanRoot() instanceof OlapScanNode) {
-                childFragment.getPlanRoot().setShouldColoScan();
-                childFragment.addPlanRoot(node);
-                childFragment.setHasColocatePlanNode(false);
-                return childFragment;
             } else {
                 return createMergeAggregationFragment(node, childFragment);
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index ab7553e82d7..5e779324d5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BaseTableRef;
 import org.apache.doris.analysis.BinaryPredicate;
@@ -1334,44 +1333,6 @@ public class OlapScanNode extends ScanNode {
         return scanRangeLocations.size();
     }
 
-    @Override
-    public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
-        distributionColumnIds.clear();
-        if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
-                && 
ConnectContext.get().getSessionVariable().enableColocateScan()) {
-            List<Expr> aggPartitionExprs = 
aggregateInfo.getInputPartitionExprs();
-            List<SlotDescriptor> slots = desc.getSlots();
-            for (Expr aggExpr : aggPartitionExprs) {
-                if (aggExpr instanceof SlotRef) {
-                    SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc();
-                    int columnId = 0;
-                    for (SlotDescriptor slotDescriptor : slots) {
-                        if (slotDescriptor.equals(slotDesc)) {
-                            if (slotDescriptor.getType().isFixedLengthType()
-                                    || 
slotDescriptor.getType().isStringType()) {
-                                distributionColumnIds.add(columnId);
-                            } else {
-                                return false;
-                            }
-                        }
-                        columnId++;
-                    }
-                }
-            }
-
-            for (int i = 0; i < slots.size(); i++) {
-                if (!distributionColumnIds.contains(i) && 
(!slots.get(i).getType().isFixedLengthType()
-                        || slots.get(i).getType().isStringType())) {
-                    return false;
-                }
-            }
-
-            return !distributionColumnIds.isEmpty();
-        } else {
-            return false;
-        }
-    }
-
     @Override
     public void setShouldColoScan() {
         shouldColoScan = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 24fc40be2e1..f450758834f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -20,7 +20,6 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.AggregateInfo;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BitmapFilterPredicate;
 import org.apache.doris.analysis.CompoundPredicate;
@@ -881,10 +880,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         return this.children.get(0).getNumInstances();
     }
 
-    public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
-        return false;
-    }
-
     public void setShouldColoScan() {}
 
     public boolean getShouldColoScan() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index cef6babc805..e4e23b411cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -114,7 +114,6 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String BATCH_SIZE = "batch_size";
     public static final String DISABLE_STREAMING_PREAGGREGATIONS = 
"disable_streaming_preaggregations";
     public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
-    public static final String ENABLE_COLOCATE_SCAN = "enable_colocate_scan";
     public static final String ENABLE_BUCKET_SHUFFLE_JOIN = 
"enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = 
"parallel_fragment_exec_instance_num";
     public static final String PARALLEL_PIPELINE_TASK_NUM = 
"parallel_pipeline_task_num";
@@ -707,9 +706,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
     public boolean disableColocatePlan = false;
 
-    @VariableMgr.VarAttr(name = ENABLE_COLOCATE_SCAN)
-    public boolean enableColocateScan = false;
-
     @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = 
VariableAnnotation.EXPERIMENTAL_ONLINE)
     public boolean enableBucketShuffleJoin = true;
 
@@ -2043,10 +2039,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return disableColocatePlan;
     }
 
-    public boolean enableColocateScan() {
-        return enableColocateScan;
-    }
-
     public boolean isEnableBucketShuffleJoin() {
         return enableBucketShuffleJoin;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to