This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch opt_perf
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/opt_perf by this push:
     new c5ec7601d4 [Opt](Vectorized) Support push down no grouping agg (#12881)
c5ec7601d4 is described below

commit c5ec7601d4a45493051292c7234997c622a46f36
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Thu Sep 22 19:46:21 2022 +0800

    [Opt](Vectorized) Support push down no grouping agg (#12881)
---
 be/src/olap/iterators.h                            |   1 +
 be/src/olap/reader.h                               |   1 +
 be/src/olap/rowset/beta_rowset_reader.cpp          |   1 +
 be/src/olap/rowset/rowset_reader_context.h         |   1 +
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  42 ++++++
 be/src/olap/rowset/segment_v2/column_reader.h      |  12 ++
 be/src/olap/rowset/segment_v2/segment.cpp          |   8 +-
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |   1 -
 be/src/olap/rowset/segment_v2/segment_iterator.h   |   4 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   9 +-
 be/src/vec/exec/volap_scanner.cpp                  |   8 +-
 be/src/vec/olap/block_reader.cpp                   |   1 +
 be/src/vec/olap/vgeneric_iterators.cpp             |  79 ++++++++++
 be/src/vec/olap/vgeneric_iterators.h               |   6 +
 be/test/vec/exec/vgeneric_iterators_test.cpp       |   1 -
 .../org/apache/doris/catalog/PrimitiveType.java    |   4 +
 .../org/apache/doris/planner/OlapScanNode.java     |  11 ++
 .../apache/doris/planner/SingleNodePlanner.java    | 160 +++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 +-
 gensrc/thrift/PlanNodes.thrift                     |   8 ++
 20 files changed, 359 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index 22f081d0eb..4f12118c2c 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -77,6 +77,7 @@ public:
     std::vector<ColumnPredicate*> column_predicates;
     std::unordered_map<int32_t, std::shared_ptr<AndBlockColumnPredicate>> 
col_id_to_predicates;
     std::unordered_map<int32_t, std::vector<const ColumnPredicate*>> 
col_id_to_del_predicates;
+    TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
 
     // REQUIRED (null is not allowed)
     OlapReaderStatistics* stats = nullptr;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 004e75c773..ae476e4fa2 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -91,6 +91,7 @@ public:
         // use only in vec exec engine
         std::vector<uint32_t>* origin_return_columns = nullptr;
         std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = 
nullptr;
+        TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
 
         // used for comapction to record row ids
         bool record_rowids = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index df15b72f62..87893927d5 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -49,6 +49,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     // convert RowsetReaderContext to StorageReadOptions
     StorageReadOptions read_options;
     read_options.stats = _stats;
+    read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt;
     if (read_context->lower_bound_keys != nullptr) {
         for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) {
             
read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i),
diff --git a/be/src/olap/rowset/rowset_reader_context.h 
b/be/src/olap/rowset/rowset_reader_context.h
index de61117426..ce2fd4b721 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -41,6 +41,7 @@ struct RowsetReaderContext {
     std::vector<uint32_t>* read_orderby_key_columns = nullptr;
     // projection columns: the set of columns rowset reader should return
     const std::vector<uint32_t>* return_columns = nullptr;
+    TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
     // column name -> column predicate
     // adding column_name for predicate to make use of column selectivity
     const std::vector<ColumnPredicate*>* predicates = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index d42358c5e7..451b8f3e91 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -171,6 +171,44 @@ Status ColumnReader::get_row_ranges_by_zone_map(
     return Status::OK();
 }
 
+Status ColumnReader::next_batch_of_zone_map(size_t* n, 
vectorized::MutableColumnPtr& dst) const {
+    // TODO: this work to get min/max value seems should only do once
+    FieldType type = _type_info->type();
+    std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, 
_meta.length()));
+    std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, 
_meta.length()));
+    _parse_zone_map(_zone_map_index_meta->segment_zone_map(), min_value.get(), 
max_value.get());
+
+    dst->reserve(*n);
+    bool is_string = is_olap_string_type(type);
+    if (max_value->is_null()) {
+        assert_cast<vectorized::ColumnNullable&>(*dst).insert_default();
+    } else {
+        if (is_string) {
+            auto sv = (StringValue*)max_value->cell_ptr();
+            dst->insert_data(sv->ptr, sv->len);
+        } else {
+            dst->insert_many_fix_len_data(static_cast<const 
char*>(max_value->cell_ptr()), 1);
+        }
+    }
+
+    auto size = *n - 1;
+    if (min_value->is_null()) {
+        
assert_cast<vectorized::ColumnNullable&>(*dst).insert_null_elements(size);
+    } else {
+        if (is_string) {
+            auto sv = (StringValue*)min_value->cell_ptr();
+            dst->insert_many_data(sv->ptr, sv->len, size);
+        } else {
+            // TODO: the work may cause performance problem, opt latter
+            for (int i = 0; i < size; ++i) {
+                dst->insert_many_fix_len_data(static_cast<const 
char*>(min_value->cell_ptr()), 1);
+            }
+        }
+    }
+
+    return Status::OK();
+}
+
 bool ColumnReader::match_condition(const AndBlockColumnPredicate* 
col_predicates) const {
     if (_zone_map_index_meta == nullptr) {
         return true;
@@ -710,6 +748,10 @@ Status FileColumnIterator::next_batch(size_t* n, 
ColumnBlockView* dst, bool* has
     return Status::OK();
 }
 
+Status FileColumnIterator::next_batch_of_zone_map(size_t* n, 
vectorized::MutableColumnPtr& dst) {
+    return _reader->next_batch_of_zone_map(n, dst);
+}
+
 Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& 
dst,
                                       bool* has_null) {
     size_t curr_size = dst->byte_size();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index 33c7c418cd..d165dff067 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -125,6 +125,8 @@ public:
     // Return true if segment zone map is absent or `cond' could be satisfied, 
false otherwise.
     bool match_condition(const AndBlockColumnPredicate* col_predicates) const;
 
+    Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& 
dst) const;
+
     // get row ranges with zone map
     // - cond_column is user's query predicate
     // - delete_condition is a delete predicate of one version
@@ -256,6 +258,10 @@ public:
         return Status::NotSupported("next_batch not implement");
     }
 
+    virtual Status next_batch_of_zone_map(size_t* n, 
vectorized::MutableColumnPtr& dst) {
+        return Status::NotSupported("next_batch_of_zone_map not implement");
+    }
+
     virtual Status read_by_rowids(const rowid_t* rowids, const size_t count,
                                   vectorized::MutableColumnPtr& dst) {
         return Status::NotSupported("read_by_rowids not implement");
@@ -299,6 +305,8 @@ public:
 
     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
 
+    Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& 
dst) override;
+
     Status read_by_rowids(const rowid_t* rowids, const size_t count,
                           vectorized::MutableColumnPtr& dst) override;
 
@@ -447,6 +455,10 @@ public:
 
     Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
 
+    Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& 
dst) override {
+        return next_batch(n, dst);
+    }
+
     Status read_by_rowids(const rowid_t* rowids, const size_t count,
                           vectorized::MutableColumnPtr& dst) override;
 
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 1f1cb59729..d5f585c3f3 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -35,6 +35,7 @@
 #include "olap/tablet_schema.h"
 #include "util/crc32c.h"
 #include "util/slice.h" // Slice
+#include "vec/olap/vgeneric_iterators.h"
 
 namespace doris {
 namespace segment_v2 {
@@ -100,7 +101,12 @@ Status Segment::new_iterator(const Schema& schema, const 
StorageReadOptions& rea
     }
 
     RETURN_IF_ERROR(load_index());
-    iter->reset(new SegmentIterator(this->shared_from_this(), schema));
+    if (read_options.col_id_to_del_predicates.empty() &&
+        read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
+        
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), 
schema));
+    } else {
+        iter->reset(new SegmentIterator(this->shared_from_this(), schema));
+    }
     iter->get()->init(read_options);
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 376545b067..98f2cfae27 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -22,7 +22,6 @@
 #include <utility>
 
 #include "common/status.h"
-#include "gutil/strings/substitute.h"
 #include "olap/column_predicate.h"
 #include "olap/olap_common.h"
 #include "olap/row_block2.h"
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h 
b/be/src/olap/rowset/segment_v2/segment_iterator.h
index 2454167a33..d2820ff74b 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -145,8 +145,8 @@ private:
 
     std::shared_ptr<Segment> _segment;
     const Schema& _schema;
-    // _column_iterators.size() == _schema.num_columns()
-    // map<unique_id, ColumnIterator*> 
_column_iterators/_bitmap_index_iterators;
+    // _column_iterators_map.size() == _schema.num_columns()
+    // map<unique_id, ColumnIterator*> 
_column_iterators_map/_bitmap_index_iterators;
     // can use _schema get unique_id by cid
     std::map<int32_t, ColumnIterator*> _column_iterators;
     std::map<int32_t, BitmapIndexIterator*> _bitmap_index_iterators;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index b85652abf0..5d88345cf5 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -147,12 +147,14 @@ Status NewOlapScanner::_init_tablet_reader_params(
                       ->rowset()
                       ->rowset_meta()
                       ->is_segments_overlapping());
-
+    auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent);
     if (_state->skip_storage_engine_merge()) {
         _tablet_reader_params.direct_mode = true;
         _aggregation = true;
     } else {
-        _tablet_reader_params.direct_mode = _aggregation || single_version;
+        _tablet_reader_params.direct_mode =
+                _aggregation || single_version ||
+                real_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
     }
 
     RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
@@ -161,6 +163,9 @@ Status NewOlapScanner::_init_tablet_reader_params(
     _tablet_reader_params.tablet_schema = _tablet_schema;
     _tablet_reader_params.reader_type = READER_QUERY;
     _tablet_reader_params.aggregation = _aggregation;
+    if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt)
+        _tablet_reader_params.push_down_agg_type_opt =
+                real_parent->_olap_scan_node.push_down_agg_type_opt;
     _tablet_reader_params.version = Version(0, _version);
 
     // Condition
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index 4157bdb89d..9258d48611 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -175,15 +175,18 @@ Status VOlapScanner::_init_tablet_reader_params(
         _tablet_reader_params.direct_mode = true;
         _aggregation = true;
     } else {
-        _tablet_reader_params.direct_mode = _aggregation || single_version;
+        _tablet_reader_params.direct_mode = _aggregation || single_version ||
+                                            
_parent->_olap_scan_node.__isset.push_down_agg_type_opt;
     }
-
     RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode));
 
     _tablet_reader_params.tablet = _tablet;
     _tablet_reader_params.tablet_schema = _tablet_schema;
     _tablet_reader_params.reader_type = READER_QUERY;
     _tablet_reader_params.aggregation = _aggregation;
+    if (_parent->_olap_scan_node.__isset.push_down_agg_type_opt)
+        _tablet_reader_params.push_down_agg_type_opt =
+                _parent->_olap_scan_node.push_down_agg_type_opt;
     _tablet_reader_params.version = Version(0, _version);
 
     // Condition
@@ -283,7 +286,6 @@ Status VOlapScanner::_init_return_columns(bool 
need_seq_col) {
         int32_t index = slot->col_unique_id() >= 0
                                 ? 
_tablet_schema->field_index(slot->col_unique_id())
                                 : 
_tablet_schema->field_index(slot->col_name());
-
         if (index < 0) {
             std::stringstream ss;
             ss << "field name is invalid. field=" << slot->col_name();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 570315b431..f794d97ab5 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -49,6 +49,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& 
read_params,
 
     _reader_context.batch_size = _batch_size;
     _reader_context.is_vec = true;
+    _reader_context.push_down_agg_type_opt = 
read_params.push_down_agg_type_opt;
     for (auto& rs_reader : rs_readers) {
         RETURN_NOT_OK(rs_reader->init(&_reader_context));
         Status res = _vcollect_iter.add_child(rs_reader);
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index 280d7b054a..e64668a838 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -21,6 +21,8 @@
 
 #include "common/status.h"
 #include "olap/iterators.h"
+#include "olap/rowset/segment_v2/column_reader.h"
+#include "olap/rowset/segment_v2/segment.h"
 #include "olap/schema.h"
 #include "vec/core/block.h"
 
@@ -111,6 +113,79 @@ Status VAutoIncrementIterator::init(const 
StorageReadOptions& opts) {
     return Status::OK();
 }
 
+class VStatisticsIterator : public RowwiseIterator {
+public:
+    // Will generate num_rows rows in total
+    VStatisticsIterator(std::shared_ptr<Segment> segment, const Schema& schema)
+            : _segment(std::move(segment)), _schema(schema) {}
+
+    ~VStatisticsIterator() override {
+        for (auto& pair : _column_iterators_map) {
+            delete pair.second;
+        }
+    }
+
+    Status init(const StorageReadOptions& opts) override {
+        if (!_init) {
+            _push_down_agg_type_opt = opts.push_down_agg_type_opt;
+
+            for (size_t i = 0; i < _schema.num_column_ids(); i++) {
+                auto cid = _schema.column_id(i);
+                auto unique_id = _schema.column(cid)->unique_id();
+                if (_column_iterators_map.count(unique_id) < 1) {
+                    RETURN_IF_ERROR(_segment->new_column_iterator(
+                            opts.tablet_schema->column(cid), 
&_column_iterators_map[unique_id]));
+                }
+                _column_iterators.push_back(_column_iterators_map[unique_id]);
+            }
+
+            _target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : 
_segment->num_rows();
+            _init = true;
+        }
+
+        return Status::OK();
+    }
+
+    Status next_batch(Block* block) override {
+        DCHECK(block->columns() == _column_iterators.size());
+        if (_output_rows < _target_rows) {
+            block->clear_column_data();
+            auto columns = block->mutate_columns();
+
+            size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX
+                                  ? 2
+                                  : std::min(_target_rows - _output_rows, 
MAX_ROW_SIZE_IN_COUNT);
+            if (_push_down_agg_type_opt == TPushAggOp::COUNT) {
+                size = std::min(_target_rows - _output_rows, 
MAX_ROW_SIZE_IN_COUNT);
+                for (int i = 0; i < block->columns(); ++i) {
+                    columns[i]->resize(size);
+                }
+            } else {
+                for (int i = 0; i < block->columns(); ++i) {
+                    _column_iterators[i]->next_batch_of_zone_map(&size, 
columns[i]);
+                }
+            }
+            _output_rows += size;
+            return Status::OK();
+        }
+        return Status::EndOfFile("End of VStatisticsIterator");
+    }
+
+    const Schema& schema() const override { return _schema; }
+
+private:
+    std::shared_ptr<Segment> _segment;
+    const Schema& _schema;
+    size_t _target_rows = 0;
+    size_t _output_rows = 0;
+    bool _init = false;
+    TPushAggOp::type _push_down_agg_type_opt;
+    std::map<int32_t, ColumnIterator*> _column_iterators_map;
+    std::vector<ColumnIterator*> _column_iterators;
+
+    static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535;
+};
+
 // Used to store merge state for a VMergeIterator input.
 // This class will iterate all data from internal iterator
 // through client call advance().
@@ -575,6 +650,10 @@ RowwiseIterator* 
new_union_iterator(std::vector<RowwiseIterator*>& inputs) {
     return new VUnionIterator(inputs);
 }
 
+RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, 
const Schema& schema) {
+    return new VStatisticsIterator(segment, schema);
+}
+
 RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t 
num_rows) {
     return new VAutoIncrementIterator(schema, num_rows);
 }
diff --git a/be/src/vec/olap/vgeneric_iterators.h 
b/be/src/vec/olap/vgeneric_iterators.h
index abab6d20fe..1342589355 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -19,6 +19,10 @@
 
 namespace doris {
 
+namespace segment_v2 {
+class Segment;
+}
+
 namespace vectorized {
 
 // Create a merge iterator for input iterators. Merge iterator will merge
@@ -43,6 +47,8 @@ RowwiseIterator* 
new_union_iterator(std::vector<RowwiseIterator*>& inputs);
 // Client should delete returned iterator.
 RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t 
num_rows);
 
+RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, 
const Schema& schema);
+
 } // namespace vectorized
 
 } // namespace doris
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp 
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 6d8b307116..760a7059e4 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -1,4 +1,3 @@
-
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
index a063cfaba9..b2a3215247 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java
@@ -1068,6 +1068,10 @@ public enum PrimitiveType {
         return this == ARRAY;
     }
 
+    public boolean isComplexType() {
+        return this == HLL || this == BITMAP;
+    }
+
     public boolean isStringType() {
         return (this == VARCHAR || this == CHAR || this == HLL || this == 
STRING);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 3e859101e7..eaf701e077 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -65,6 +65,7 @@ import org.apache.doris.thrift.TPaloScanRange;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 import org.apache.doris.thrift.TPrimitiveType;
+import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -153,6 +154,8 @@ public class OlapScanNode extends ScanNode {
     // It's limit for scanner instead of scanNode so we add a new limit.
     private long sortLimit = -1;
 
+    private TPushAggOp pushDownAggNoGroupingOp = null;
+
     // List of tablets will be scanned by current olap_scan_node
     private ArrayList<Long> scanTabletIds = Lists.newArrayList();
 
@@ -175,6 +178,10 @@ public class OlapScanNode extends ScanNode {
                                       this.reasonOfPreAggregation + " " + 
reason;
     }
 
+    public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) {
+        this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
+    }
+
     public boolean isPreAggregation() {
         return isPreAggregation;
     }
@@ -926,6 +933,10 @@ public class OlapScanNode extends ScanNode {
         msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
         msg.olap_scan_node.setTableName(olapTable.getName());
         
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
+
+        if (pushDownAggNoGroupingOp != null) {
+            msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp);
+        }
     }
 
     // export some tablets
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 0a68abfc64..3c5b702947 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -56,8 +56,10 @@ import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.FunctionSet;
 import org.apache.doris.catalog.JdbcTable;
+import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.OdbcTable;
+import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
@@ -66,7 +68,9 @@ import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TNullSide;
+import org.apache.doris.thrift.TPushAggOp;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
@@ -362,6 +366,154 @@ public class SingleNodePlanner {
         return selectNode;
     }
 
+    private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp 
originAggOp) {
+        TPushAggOp newPushAggOp = null;
+        if (functionName.equalsIgnoreCase("COUNT")) {
+            newPushAggOp = TPushAggOp.COUNT;
+        } else {
+            newPushAggOp = TPushAggOp.MINMAX;
+        }
+
+        if (originAggOp == null || newPushAggOp == originAggOp) {
+            return newPushAggOp;
+        }
+        return TPushAggOp.MIX;
+    }
+
+    private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt 
selectStmt, Analyzer analyzer, PlanNode root) {
+        do {
+            // TODO: Support other scan node in the future
+            if (!(root instanceof OlapScanNode)) {
+                break;
+            }
+
+            KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType();
+            if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) 
{
+                break;
+            }
+
+            // TODO: Support muti table in the future
+            if (selectStmt.getTableRefs().size() != 1) {
+                break;
+            }
+
+            // No not support group by and where clause
+            if (null == aggInfo || !aggInfo.getGroupingExprs().isEmpty()) {
+                break;
+            }
+            List<Expr> allConjuncts = 
analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId());
+            if (allConjuncts != null) {
+                break;
+            }
+
+            List<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs();
+            boolean aggExprValidate = true;
+            TPushAggOp aggOp = null;
+            for (FunctionCallExpr aggExpr : aggExprs) {
+                // Only support `min`, `max`, `count` and `count` only 
effective in dup table
+                String functionName = aggExpr.getFnName().getFunction();
+                if (!functionName.equalsIgnoreCase("MAX")
+                        && !functionName.equalsIgnoreCase("MIN")
+                        && !functionName.equalsIgnoreCase("COUNT")) {
+                    aggExprValidate = false;
+                    break;
+                }
+
+                if (functionName.equalsIgnoreCase("COUNT")
+                        && type != KeysType.DUP_KEYS) {
+                    aggExprValidate = false;
+                    break;
+                }
+
+                aggOp = freshTPushAggOpByName(functionName, aggOp);
+
+                if (aggExpr.getChildren().size() > 1) {
+                    aggExprValidate = false;
+                    break;
+                }
+
+                boolean returnColumnValidate = true;
+                if (aggExpr.getChildren().size() == 1) {
+                    List<Column> returnColumns = Lists.newArrayList();
+                    if (!(aggExpr.getChild(0) instanceof SlotRef)) {
+                        Expr child = aggExpr.getChild(0);
+                        if ((child instanceof CastExpr) && (child.getChild(0) 
instanceof SlotRef)) {
+                            if (child.getType().isNumericType()
+                                    && 
child.getChild(0).getType().isNumericType()) {
+                                returnColumns.add(((SlotRef) 
child.getChild(0)).getDesc().getColumn());
+                            } else {
+                                aggExprValidate = false;
+                                break;
+                            }
+                        } else {
+                            aggExprValidate = false;
+                            break;
+                        }
+                    } else {
+                        returnColumns.add(((SlotRef) 
aggExpr.getChild(0)).getDesc().getColumn());
+                    }
+
+
+                    // check return columns
+                    Column firstColumn = returnColumns.get(0);
+                    for (Column col : returnColumns) {
+                        // TODO(zc): Here column is null is too bad
+                        // Only column of Inline-view will be null
+                        if (col == null) {
+                            continue;
+                        }
+
+                        if (type == KeysType.AGG_KEYS) {
+                            if (!col.isKey() && 
!col.getName().equals(firstColumn.getName())) {
+                                returnColumnValidate = false;
+                                break;
+                            }
+                        }
+
+                        // The zone map max length of CharFamily is 512, do not
+                        // over the length: 
https://github.com/apache/doris/pull/6293
+                        if (aggOp == TPushAggOp.MINMAX || aggOp == 
TPushAggOp.MIX) {
+                            PrimitiveType colType = col.getDataType();
+                            if (colType.isArrayType() || 
colType.isComplexType()
+                                    || colType == PrimitiveType.STRING) {
+                                returnColumnValidate = false;
+                                break;
+                            }
+
+                            if (colType.isCharFamily() && aggOp != 
TPushAggOp.COUNT
+                                    && col.getType().getLength() > 512) {
+                                returnColumnValidate = false;
+                                break;
+                            }
+                        }
+
+                        if (aggOp == TPushAggOp.COUNT || aggOp == 
TPushAggOp.MIX) {
+                            // NULL value behavior in `count` function is 
zero, so
+                            // we should not use row_count to speed up query. 
the col
+                            // must be not null
+                            if (col.isAllowNull()) {
+                                returnColumnValidate = false;
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (!returnColumnValidate) {
+                    aggExprValidate = false;
+                    break;
+                }
+            }
+
+            if (!aggExprValidate) {
+                break;
+            }
+
+            OlapScanNode olapNode = (OlapScanNode) root;
+            olapNode.setPushDownAggNoGrouping(aggOp);
+        } while (false);
+    }
+
     private void turnOffPreAgg(AggregateInfo aggInfo, SelectStmt selectStmt, 
Analyzer analyzer, PlanNode root) {
         String turnOffReason = null;
         do {
@@ -998,6 +1150,10 @@ public class SingleNodePlanner {
                 materializeTableResultForCrossJoinOrCountStar(ref, analyzer);
                 PlanNode plan = createTableRefNode(analyzer, ref, selectStmt);
                 turnOffPreAgg(aggInfo, selectStmt, analyzer, plan);
+                if (VectorizedUtil.isVectorized()
+                        && 
ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
+                    pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, plan);
+                }
 
                 if (plan instanceof OlapScanNode) {
                     OlapScanNode olapNode = (OlapScanNode) plan;
@@ -1026,6 +1182,10 @@ public class SingleNodePlanner {
             // 
selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
 
             turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
+            if (VectorizedUtil.isVectorized()
+                    && 
ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) {
+                pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, root);
+            }
 
             if (root instanceof OlapScanNode) {
                 OlapScanNode olapNode = (OlapScanNode) root;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e6752df67f..95fb643817 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -221,6 +221,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = 
"enable_new_shuffle_hash_method";
 
+    public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = 
"enable_push_down_no_group_agg";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -563,7 +565,10 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean enableFallbackToOriginalPlanner = true;
 
     @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD)
-    public boolean enableNewShffleHashMethod = true;
+    public boolean enableNewShuffleHashMethod = true;
+
+    @VariableMgr.VarAttr(name = ENABLE_PUSH_DOWN_NO_GROUP_AGG)
+    public boolean enablePushDownNoGroupAgg = true;
 
     public String getBlockEncryptionMode() {
         return blockEncryptionMode;
@@ -958,6 +963,10 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableVectorizedEngine = enableVectorizedEngine;
     }
 
+    public boolean enablePushDownNoGroupAgg() {
+        return enablePushDownNoGroupAgg;
+    }
+
     public boolean getEnableFunctionPushdown() {
         return this.enableFunctionPushdown;
     }
@@ -1170,7 +1179,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setEnableFunctionPushdown(enableFunctionPushdown);
         
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
         tResult.setEnableLocalExchange(enableLocalExchange);
-        tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod);
+        tResult.setEnableNewShuffleHashMethod(enableNewShuffleHashMethod);
 
         tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 223a738df0..4135099c8b 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -463,6 +463,13 @@ struct TSortInfo {
   4: optional list<Exprs.TExpr> sort_tuple_slot_exprs
 }
 
+enum TPushAggOp {
+       NONE = 0,
+       MINMAX = 1,
+       COUNT = 2,
+       MIX = 3
+}
+
 struct TOlapScanNode {
   1: required Types.TTupleId tuple_id
   2: required list<string> key_column_name
@@ -477,6 +484,7 @@ struct TOlapScanNode {
   // It's limit for scanner instead of scanNode so we add a new limit.
   10: optional i64 sort_limit
   11: optional bool enable_unique_key_merge_on_write
+  12: optional TPushAggOp push_down_agg_type_opt
 }
 
 struct TEqJoinCondition {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to