This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch opt_perf in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/opt_perf by this push: new c5ec7601d4 [Opt](Vectorized) Support push down no grouping agg (#12881) c5ec7601d4 is described below commit c5ec7601d4a45493051292c7234997c622a46f36 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Sep 22 19:46:21 2022 +0800 [Opt](Vectorized) Support push down no grouping agg (#12881) --- be/src/olap/iterators.h | 1 + be/src/olap/reader.h | 1 + be/src/olap/rowset/beta_rowset_reader.cpp | 1 + be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/olap/rowset/segment_v2/column_reader.cpp | 42 ++++++ be/src/olap/rowset/segment_v2/column_reader.h | 12 ++ be/src/olap/rowset/segment_v2/segment.cpp | 8 +- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 1 - be/src/olap/rowset/segment_v2/segment_iterator.h | 4 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 9 +- be/src/vec/exec/volap_scanner.cpp | 8 +- be/src/vec/olap/block_reader.cpp | 1 + be/src/vec/olap/vgeneric_iterators.cpp | 79 ++++++++++ be/src/vec/olap/vgeneric_iterators.h | 6 + be/test/vec/exec/vgeneric_iterators_test.cpp | 1 - .../org/apache/doris/catalog/PrimitiveType.java | 4 + .../org/apache/doris/planner/OlapScanNode.java | 11 ++ .../apache/doris/planner/SingleNodePlanner.java | 160 +++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 13 +- gensrc/thrift/PlanNodes.thrift | 8 ++ 20 files changed, 359 insertions(+), 12 deletions(-) diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 22f081d0eb..4f12118c2c 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -77,6 +77,7 @@ public: std::vector<ColumnPredicate*> column_predicates; std::unordered_map<int32_t, std::shared_ptr<AndBlockColumnPredicate>> col_id_to_predicates; std::unordered_map<int32_t, std::vector<const ColumnPredicate*>> col_id_to_del_predicates; + TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; // REQUIRED (null is not allowed) OlapReaderStatistics* stats = nullptr; diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 004e75c773..ae476e4fa2 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -91,6 +91,7 @@ public: // use only in vec exec engine std::vector<uint32_t>* origin_return_columns = nullptr; std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; + TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; // used for comapction to record row ids bool record_rowids = false; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index df15b72f62..87893927d5 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -49,6 +49,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) { // convert RowsetReaderContext to StorageReadOptions StorageReadOptions read_options; read_options.stats = _stats; + read_options.push_down_agg_type_opt = _context->push_down_agg_type_opt; if (read_context->lower_bound_keys != nullptr) { for (int i = 0; i < read_context->lower_bound_keys->size(); ++i) { read_options.key_ranges.emplace_back(&read_context->lower_bound_keys->at(i), diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index de61117426..ce2fd4b721 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -41,6 +41,7 @@ struct RowsetReaderContext { std::vector<uint32_t>* read_orderby_key_columns = nullptr; // projection columns: the set of columns rowset reader should return const std::vector<uint32_t>* return_columns = nullptr; + TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; // column name -> column predicate // adding column_name for predicate to make use of column selectivity const std::vector<ColumnPredicate*>* predicates = nullptr; diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index d42358c5e7..451b8f3e91 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -171,6 +171,44 @@ Status ColumnReader::get_row_ranges_by_zone_map( return Status::OK(); } +Status ColumnReader::next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) const { + // TODO: this work to get min/max value seems should only do once + FieldType type = _type_info->type(); + std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type, _meta.length())); + std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type, _meta.length())); + _parse_zone_map(_zone_map_index_meta->segment_zone_map(), min_value.get(), max_value.get()); + + dst->reserve(*n); + bool is_string = is_olap_string_type(type); + if (max_value->is_null()) { + assert_cast<vectorized::ColumnNullable&>(*dst).insert_default(); + } else { + if (is_string) { + auto sv = (StringValue*)max_value->cell_ptr(); + dst->insert_data(sv->ptr, sv->len); + } else { + dst->insert_many_fix_len_data(static_cast<const char*>(max_value->cell_ptr()), 1); + } + } + + auto size = *n - 1; + if (min_value->is_null()) { + assert_cast<vectorized::ColumnNullable&>(*dst).insert_null_elements(size); + } else { + if (is_string) { + auto sv = (StringValue*)min_value->cell_ptr(); + dst->insert_many_data(sv->ptr, sv->len, size); + } else { + // TODO: the work may cause performance problem, opt latter + for (int i = 0; i < size; ++i) { + dst->insert_many_fix_len_data(static_cast<const char*>(min_value->cell_ptr()), 1); + } + } + } + + return Status::OK(); +} + bool ColumnReader::match_condition(const AndBlockColumnPredicate* col_predicates) const { if (_zone_map_index_meta == nullptr) { return true; @@ -710,6 +748,10 @@ Status FileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool* has return Status::OK(); } +Status FileColumnIterator::next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) { + return _reader->next_batch_of_zone_map(n, dst); +} + Status FileColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) { size_t curr_size = dst->byte_size(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 33c7c418cd..d165dff067 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -125,6 +125,8 @@ public: // Return true if segment zone map is absent or `cond' could be satisfied, false otherwise. bool match_condition(const AndBlockColumnPredicate* col_predicates) const; + Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) const; + // get row ranges with zone map // - cond_column is user's query predicate // - delete_condition is a delete predicate of one version @@ -256,6 +258,10 @@ public: return Status::NotSupported("next_batch not implement"); } + virtual Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) { + return Status::NotSupported("next_batch_of_zone_map not implement"); + } + virtual Status read_by_rowids(const rowid_t* rowids, const size_t count, vectorized::MutableColumnPtr& dst) { return Status::NotSupported("read_by_rowids not implement"); @@ -299,6 +305,8 @@ public: Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override; + Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) override; + Status read_by_rowids(const rowid_t* rowids, const size_t count, vectorized::MutableColumnPtr& dst) override; @@ -447,6 +455,10 @@ public: Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override; + Status next_batch_of_zone_map(size_t* n, vectorized::MutableColumnPtr& dst) override { + return next_batch(n, dst); + } + Status read_by_rowids(const rowid_t* rowids, const size_t count, vectorized::MutableColumnPtr& dst) override; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 1f1cb59729..d5f585c3f3 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -35,6 +35,7 @@ #include "olap/tablet_schema.h" #include "util/crc32c.h" #include "util/slice.h" // Slice +#include "vec/olap/vgeneric_iterators.h" namespace doris { namespace segment_v2 { @@ -100,7 +101,12 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea } RETURN_IF_ERROR(load_index()); - iter->reset(new SegmentIterator(this->shared_from_this(), schema)); + if (read_options.col_id_to_del_predicates.empty() && + read_options.push_down_agg_type_opt != TPushAggOp::NONE) { + iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), schema)); + } else { + iter->reset(new SegmentIterator(this->shared_from_this(), schema)); + } iter->get()->init(read_options); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 376545b067..98f2cfae27 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -22,7 +22,6 @@ #include <utility> #include "common/status.h" -#include "gutil/strings/substitute.h" #include "olap/column_predicate.h" #include "olap/olap_common.h" #include "olap/row_block2.h" diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h index 2454167a33..d2820ff74b 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.h +++ b/be/src/olap/rowset/segment_v2/segment_iterator.h @@ -145,8 +145,8 @@ private: std::shared_ptr<Segment> _segment; const Schema& _schema; - // _column_iterators.size() == _schema.num_columns() - // map<unique_id, ColumnIterator*> _column_iterators/_bitmap_index_iterators; + // _column_iterators_map.size() == _schema.num_columns() + // map<unique_id, ColumnIterator*> _column_iterators_map/_bitmap_index_iterators; // can use _schema get unique_id by cid std::map<int32_t, ColumnIterator*> _column_iterators; std::map<int32_t, BitmapIndexIterator*> _bitmap_index_iterators; diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index b85652abf0..5d88345cf5 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -147,12 +147,14 @@ Status NewOlapScanner::_init_tablet_reader_params( ->rowset() ->rowset_meta() ->is_segments_overlapping()); - + auto real_parent = reinterpret_cast<NewOlapScanNode*>(_parent); if (_state->skip_storage_engine_merge()) { _tablet_reader_params.direct_mode = true; _aggregation = true; } else { - _tablet_reader_params.direct_mode = _aggregation || single_version; + _tablet_reader_params.direct_mode = + _aggregation || single_version || + real_parent->_olap_scan_node.__isset.push_down_agg_type_opt; } RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode)); @@ -161,6 +163,9 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = READER_QUERY; _tablet_reader_params.aggregation = _aggregation; + if (real_parent->_olap_scan_node.__isset.push_down_agg_type_opt) + _tablet_reader_params.push_down_agg_type_opt = + real_parent->_olap_scan_node.push_down_agg_type_opt; _tablet_reader_params.version = Version(0, _version); // Condition diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index 4157bdb89d..9258d48611 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -175,15 +175,18 @@ Status VOlapScanner::_init_tablet_reader_params( _tablet_reader_params.direct_mode = true; _aggregation = true; } else { - _tablet_reader_params.direct_mode = _aggregation || single_version; + _tablet_reader_params.direct_mode = _aggregation || single_version || + _parent->_olap_scan_node.__isset.push_down_agg_type_opt; } - RETURN_IF_ERROR(_init_return_columns(!_tablet_reader_params.direct_mode)); _tablet_reader_params.tablet = _tablet; _tablet_reader_params.tablet_schema = _tablet_schema; _tablet_reader_params.reader_type = READER_QUERY; _tablet_reader_params.aggregation = _aggregation; + if (_parent->_olap_scan_node.__isset.push_down_agg_type_opt) + _tablet_reader_params.push_down_agg_type_opt = + _parent->_olap_scan_node.push_down_agg_type_opt; _tablet_reader_params.version = Version(0, _version); // Condition @@ -283,7 +286,6 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) { int32_t index = slot->col_unique_id() >= 0 ? _tablet_schema->field_index(slot->col_unique_id()) : _tablet_schema->field_index(slot->col_name()); - if (index < 0) { std::stringstream ss; ss << "field name is invalid. field=" << slot->col_name(); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 570315b431..f794d97ab5 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -49,6 +49,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params, _reader_context.batch_size = _batch_size; _reader_context.is_vec = true; + _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; for (auto& rs_reader : rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); Status res = _vcollect_iter.add_child(rs_reader); diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 280d7b054a..e64668a838 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -21,6 +21,8 @@ #include "common/status.h" #include "olap/iterators.h" +#include "olap/rowset/segment_v2/column_reader.h" +#include "olap/rowset/segment_v2/segment.h" #include "olap/schema.h" #include "vec/core/block.h" @@ -111,6 +113,79 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) { return Status::OK(); } +class VStatisticsIterator : public RowwiseIterator { +public: + // Will generate num_rows rows in total + VStatisticsIterator(std::shared_ptr<Segment> segment, const Schema& schema) + : _segment(std::move(segment)), _schema(schema) {} + + ~VStatisticsIterator() override { + for (auto& pair : _column_iterators_map) { + delete pair.second; + } + } + + Status init(const StorageReadOptions& opts) override { + if (!_init) { + _push_down_agg_type_opt = opts.push_down_agg_type_opt; + + for (size_t i = 0; i < _schema.num_column_ids(); i++) { + auto cid = _schema.column_id(i); + auto unique_id = _schema.column(cid)->unique_id(); + if (_column_iterators_map.count(unique_id) < 1) { + RETURN_IF_ERROR(_segment->new_column_iterator( + opts.tablet_schema->column(cid), &_column_iterators_map[unique_id])); + } + _column_iterators.push_back(_column_iterators_map[unique_id]); + } + + _target_rows = _push_down_agg_type_opt == TPushAggOp::MINMAX ? 2 : _segment->num_rows(); + _init = true; + } + + return Status::OK(); + } + + Status next_batch(Block* block) override { + DCHECK(block->columns() == _column_iterators.size()); + if (_output_rows < _target_rows) { + block->clear_column_data(); + auto columns = block->mutate_columns(); + + size_t size = _push_down_agg_type_opt == TPushAggOp::MINMAX + ? 2 + : std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); + if (_push_down_agg_type_opt == TPushAggOp::COUNT) { + size = std::min(_target_rows - _output_rows, MAX_ROW_SIZE_IN_COUNT); + for (int i = 0; i < block->columns(); ++i) { + columns[i]->resize(size); + } + } else { + for (int i = 0; i < block->columns(); ++i) { + _column_iterators[i]->next_batch_of_zone_map(&size, columns[i]); + } + } + _output_rows += size; + return Status::OK(); + } + return Status::EndOfFile("End of VStatisticsIterator"); + } + + const Schema& schema() const override { return _schema; } + +private: + std::shared_ptr<Segment> _segment; + const Schema& _schema; + size_t _target_rows = 0; + size_t _output_rows = 0; + bool _init = false; + TPushAggOp::type _push_down_agg_type_opt; + std::map<int32_t, ColumnIterator*> _column_iterators_map; + std::vector<ColumnIterator*> _column_iterators; + + static constexpr size_t MAX_ROW_SIZE_IN_COUNT = 65535; +}; + // Used to store merge state for a VMergeIterator input. // This class will iterate all data from internal iterator // through client call advance(). @@ -575,6 +650,10 @@ RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs) { return new VUnionIterator(inputs); } +RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, const Schema& schema) { + return new VStatisticsIterator(segment, schema); +} + RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows) { return new VAutoIncrementIterator(schema, num_rows); } diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index abab6d20fe..1342589355 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -19,6 +19,10 @@ namespace doris { +namespace segment_v2 { +class Segment; +} + namespace vectorized { // Create a merge iterator for input iterators. Merge iterator will merge @@ -43,6 +47,8 @@ RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs); // Client should delete returned iterator. RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows); +RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, const Schema& schema); + } // namespace vectorized } // namespace doris diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp b/be/test/vec/exec/vgeneric_iterators_test.cpp index 6d8b307116..760a7059e4 100644 --- a/be/test/vec/exec/vgeneric_iterators_test.cpp +++ b/be/test/vec/exec/vgeneric_iterators_test.cpp @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java index a063cfaba9..b2a3215247 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PrimitiveType.java @@ -1068,6 +1068,10 @@ public enum PrimitiveType { return this == ARRAY; } + public boolean isComplexType() { + return this == HLL || this == BITMAP; + } + public boolean isStringType() { return (this == VARCHAR || this == CHAR || this == HLL || this == STRING); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 3e859101e7..eaf701e077 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -65,6 +65,7 @@ import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TPrimitiveType; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; @@ -153,6 +154,8 @@ public class OlapScanNode extends ScanNode { // It's limit for scanner instead of scanNode so we add a new limit. private long sortLimit = -1; + private TPushAggOp pushDownAggNoGroupingOp = null; + // List of tablets will be scanned by current olap_scan_node private ArrayList<Long> scanTabletIds = Lists.newArrayList(); @@ -175,6 +178,10 @@ public class OlapScanNode extends ScanNode { this.reasonOfPreAggregation + " " + reason; } + public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { + this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; + } + public boolean isPreAggregation() { return isPreAggregation; } @@ -926,6 +933,10 @@ public class OlapScanNode extends ScanNode { msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift()); msg.olap_scan_node.setTableName(olapTable.getName()); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); + + if (pushDownAggNoGroupingOp != null) { + msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); + } } // export some tablets diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 0a68abfc64..3c5b702947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -56,8 +56,10 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; @@ -66,7 +68,9 @@ import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.external.ExternalFileScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TNullSide; +import org.apache.doris.thrift.TPushAggOp; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -362,6 +366,154 @@ public class SingleNodePlanner { return selectNode; } + private TPushAggOp freshTPushAggOpByName(String functionName, TPushAggOp originAggOp) { + TPushAggOp newPushAggOp = null; + if (functionName.equalsIgnoreCase("COUNT")) { + newPushAggOp = TPushAggOp.COUNT; + } else { + newPushAggOp = TPushAggOp.MINMAX; + } + + if (originAggOp == null || newPushAggOp == originAggOp) { + return newPushAggOp; + } + return TPushAggOp.MIX; + } + + private void pushDownAggNoGrouping(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) { + do { + // TODO: Support other scan node in the future + if (!(root instanceof OlapScanNode)) { + break; + } + + KeysType type = ((OlapScanNode) root).getOlapTable().getKeysType(); + if (type == KeysType.UNIQUE_KEYS || type == KeysType.PRIMARY_KEYS) { + break; + } + + // TODO: Support muti table in the future + if (selectStmt.getTableRefs().size() != 1) { + break; + } + + // No not support group by and where clause + if (null == aggInfo || !aggInfo.getGroupingExprs().isEmpty()) { + break; + } + List<Expr> allConjuncts = analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId()); + if (allConjuncts != null) { + break; + } + + List<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs(); + boolean aggExprValidate = true; + TPushAggOp aggOp = null; + for (FunctionCallExpr aggExpr : aggExprs) { + // Only support `min`, `max`, `count` and `count` only effective in dup table + String functionName = aggExpr.getFnName().getFunction(); + if (!functionName.equalsIgnoreCase("MAX") + && !functionName.equalsIgnoreCase("MIN") + && !functionName.equalsIgnoreCase("COUNT")) { + aggExprValidate = false; + break; + } + + if (functionName.equalsIgnoreCase("COUNT") + && type != KeysType.DUP_KEYS) { + aggExprValidate = false; + break; + } + + aggOp = freshTPushAggOpByName(functionName, aggOp); + + if (aggExpr.getChildren().size() > 1) { + aggExprValidate = false; + break; + } + + boolean returnColumnValidate = true; + if (aggExpr.getChildren().size() == 1) { + List<Column> returnColumns = Lists.newArrayList(); + if (!(aggExpr.getChild(0) instanceof SlotRef)) { + Expr child = aggExpr.getChild(0); + if ((child instanceof CastExpr) && (child.getChild(0) instanceof SlotRef)) { + if (child.getType().isNumericType() + && child.getChild(0).getType().isNumericType()) { + returnColumns.add(((SlotRef) child.getChild(0)).getDesc().getColumn()); + } else { + aggExprValidate = false; + break; + } + } else { + aggExprValidate = false; + break; + } + } else { + returnColumns.add(((SlotRef) aggExpr.getChild(0)).getDesc().getColumn()); + } + + + // check return columns + Column firstColumn = returnColumns.get(0); + for (Column col : returnColumns) { + // TODO(zc): Here column is null is too bad + // Only column of Inline-view will be null + if (col == null) { + continue; + } + + if (type == KeysType.AGG_KEYS) { + if (!col.isKey() && !col.getName().equals(firstColumn.getName())) { + returnColumnValidate = false; + break; + } + } + + // The zone map max length of CharFamily is 512, do not + // over the length: https://github.com/apache/doris/pull/6293 + if (aggOp == TPushAggOp.MINMAX || aggOp == TPushAggOp.MIX) { + PrimitiveType colType = col.getDataType(); + if (colType.isArrayType() || colType.isComplexType() + || colType == PrimitiveType.STRING) { + returnColumnValidate = false; + break; + } + + if (colType.isCharFamily() && aggOp != TPushAggOp.COUNT + && col.getType().getLength() > 512) { + returnColumnValidate = false; + break; + } + } + + if (aggOp == TPushAggOp.COUNT || aggOp == TPushAggOp.MIX) { + // NULL value behavior in `count` function is zero, so + // we should not use row_count to speed up query. the col + // must be not null + if (col.isAllowNull()) { + returnColumnValidate = false; + break; + } + } + } + } + + if (!returnColumnValidate) { + aggExprValidate = false; + break; + } + } + + if (!aggExprValidate) { + break; + } + + OlapScanNode olapNode = (OlapScanNode) root; + olapNode.setPushDownAggNoGrouping(aggOp); + } while (false); + } + private void turnOffPreAgg(AggregateInfo aggInfo, SelectStmt selectStmt, Analyzer analyzer, PlanNode root) { String turnOffReason = null; do { @@ -998,6 +1150,10 @@ public class SingleNodePlanner { materializeTableResultForCrossJoinOrCountStar(ref, analyzer); PlanNode plan = createTableRefNode(analyzer, ref, selectStmt); turnOffPreAgg(aggInfo, selectStmt, analyzer, plan); + if (VectorizedUtil.isVectorized() + && ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) { + pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, plan); + } if (plan instanceof OlapScanNode) { OlapScanNode olapNode = (OlapScanNode) plan; @@ -1026,6 +1182,10 @@ public class SingleNodePlanner { // selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap()); turnOffPreAgg(aggInfo, selectStmt, analyzer, root); + if (VectorizedUtil.isVectorized() + && ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg) { + pushDownAggNoGrouping(aggInfo, selectStmt, analyzer, root); + } if (root instanceof OlapScanNode) { OlapScanNode olapNode = (OlapScanNode) root; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index e6752df67f..95fb643817 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -221,6 +221,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD = "enable_new_shuffle_hash_method"; + public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = "enable_push_down_no_group_agg"; + // session origin value public Map<Field, String> sessionOriginValue = new HashMap<Field, String>(); // check stmt is or not [select /*+ SET_VAR(...)*/ ...] @@ -563,7 +565,10 @@ public class SessionVariable implements Serializable, Writable { public boolean enableFallbackToOriginalPlanner = true; @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD) - public boolean enableNewShffleHashMethod = true; + public boolean enableNewShuffleHashMethod = true; + + @VariableMgr.VarAttr(name = ENABLE_PUSH_DOWN_NO_GROUP_AGG) + public boolean enablePushDownNoGroupAgg = true; public String getBlockEncryptionMode() { return blockEncryptionMode; @@ -958,6 +963,10 @@ public class SessionVariable implements Serializable, Writable { this.enableVectorizedEngine = enableVectorizedEngine; } + public boolean enablePushDownNoGroupAgg() { + return enablePushDownNoGroupAgg; + } + public boolean getEnableFunctionPushdown() { return this.enableFunctionPushdown; } @@ -1170,7 +1179,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setEnableFunctionPushdown(enableFunctionPushdown); tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec); tResult.setEnableLocalExchange(enableLocalExchange); - tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod); + tResult.setEnableNewShuffleHashMethod(enableNewShuffleHashMethod); tResult.setSkipStorageEngineMerge(skipStorageEngineMerge); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 223a738df0..4135099c8b 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -463,6 +463,13 @@ struct TSortInfo { 4: optional list<Exprs.TExpr> sort_tuple_slot_exprs } +enum TPushAggOp { + NONE = 0, + MINMAX = 1, + COUNT = 2, + MIX = 3 +} + struct TOlapScanNode { 1: required Types.TTupleId tuple_id 2: required list<string> key_column_name @@ -477,6 +484,7 @@ struct TOlapScanNode { // It's limit for scanner instead of scanNode so we add a new limit. 10: optional i64 sort_limit 11: optional bool enable_unique_key_merge_on_write + 12: optional TPushAggOp push_down_agg_type_opt } struct TEqJoinCondition { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org