This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5ca6cadd6 [Improvement] Optimize count operation for iceberg (#22923)
a5ca6cadd6 is described below

commit a5ca6cadd61d1c7e294a396fd3bc1d970875e4d2
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Fri Aug 18 09:57:51 2023 +0800

    [Improvement] Optimize count operation for iceberg (#22923)
    
    Iceberg has its own metadata information, which includes count statistics 
for table data. If the table does not contain equli'ty delete, we can get the 
count data of the current table directly from the count statistics.
---
 be/src/vec/exec/format/generic_reader.h            |  1 +
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 33 +++++++++++-
 be/src/vec/exec/format/table/iceberg_reader.h      |  6 ++-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  6 +--
 be/src/vec/exec/scan/vscan_node.cpp                |  5 ++
 be/src/vec/exec/scan/vscan_node.h                  |  6 +++
 .../java/org/apache/doris/planner/PlanNode.java    |  4 ++
 .../planner/external/iceberg/IcebergScanNode.java  | 63 +++++++++++++++++++++-
 .../planner/external/iceberg/IcebergSplit.java     |  4 +-
 gensrc/thrift/PlanNodes.thrift                     |  2 +
 10 files changed, 121 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index 7b6f3c7b9c..7842f2edb9 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -36,6 +36,7 @@ public:
     void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
         _push_down_agg_type = push_down_agg_type;
     }
+
     virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) 
= 0;
 
     virtual std::unordered_map<std::string, TypeDescriptor> get_name_to_type() 
{
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 6d2f572586..1b0f05cc95 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -90,14 +90,15 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
                                        RuntimeProfile* profile, RuntimeState* 
state,
                                        const TFileScanRangeParams& params,
                                        const TFileRangeDesc& range, 
ShardedKVCache* kv_cache,
-                                       io::IOContext* io_ctx)
+                                       io::IOContext* io_ctx, int64_t 
push_down_count)
         : TableFormatReader(std::move(file_format_reader)),
           _profile(profile),
           _state(state),
           _params(params),
           _range(range),
           _kv_cache(kv_cache),
-          _io_ctx(io_ctx) {
+          _io_ctx(io_ctx),
+          _remaining_push_down_count(push_down_count) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -132,10 +133,27 @@ Status IcebergTableReader::init_reader(
             _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
             conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
             not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
+
     return status;
 }
 
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    // already get rows from be
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_push_down_count > 0) {
+        auto rows =
+                std::min(_remaining_push_down_count, 
(int64_t)_state->query_options().batch_size);
+        _remaining_push_down_count -= rows;
+        for (auto& col : block->mutate_columns()) {
+            col->resize(rows);
+        }
+        *read_rows = rows;
+        if (_remaining_push_down_count == 0) {
+            *eof = true;
+        }
+
+        return Status::OK();
+    }
+
     // To support iceberg schema evolution. We change the column name in block 
to
     // make it match with the column name in parquet file before reading data. 
and
     // Set the name back to table column name before return this block.
@@ -149,6 +167,7 @@ Status IcebergTableReader::get_next_block(Block* block, 
size_t* read_rows, bool*
         }
         block->initialize_index_by_name();
     }
+
     auto res = _file_format_reader->get_next_block(block, read_rows, eof);
     // Set the name back to table column name before return this block.
     if (_has_schema_change) {
@@ -182,6 +201,11 @@ Status IcebergTableReader::get_columns(
 }
 
 Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
+    // We get the count value by doris's be, so we don't need to read the 
delete file
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_push_down_count > 0) {
+        return Status::OK();
+    }
+
     auto& table_desc = range.table_format_params.iceberg_params;
     auto& version = table_desc.format_version;
     if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
@@ -192,10 +216,15 @@ Status IcebergTableReader::init_row_filters(const 
TFileRangeDesc& range) {
     if (files.empty()) {
         return Status::OK();
     }
+
     if (delete_file_type == POSITION_DELETE) {
         RETURN_IF_ERROR(_position_delete(files));
     }
+
     // todo: equality delete
+    //       If it is a count operation and it has equality delete file kind,
+    //       the push down operation of the count for this split needs to be 
canceled.
+
     COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index 451c51445e..8def49e68c 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -69,8 +69,8 @@ public:
 
     IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params,
-                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
-                       io::IOContext* io_ctx);
+                       const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx,
+                       int64_t push_down_count);
     ~IcebergTableReader() override = default;
 
     Status init_row_filters(const TFileRangeDesc& range) override;
@@ -154,6 +154,8 @@ private:
     io::IOContext* _io_ctx;
     bool _has_schema_change = false;
     bool _has_iceberg_schema = false;
+
+    int64_t _remaining_push_down_count;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6365f60ed6..bc439b73d2 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -699,9 +699,9 @@ Status VFileScanner::_get_next_reader() {
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "iceberg") {
                 std::unique_ptr<IcebergTableReader> iceberg_reader =
-                        
IcebergTableReader::create_unique(std::move(parquet_reader), _profile,
-                                                          _state, *_params, 
range, _kv_cache,
-                                                          _io_ctx.get());
+                        IcebergTableReader::create_unique(
+                                std::move(parquet_reader), _profile, _state, 
*_params, range,
+                                _kv_cache, _io_ctx.get(), 
_parent->get_push_down_count());
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _col_id_name_map, 
_colname_to_value_range,
                         _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index c45524667c..e65bdab093 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -126,6 +126,11 @@ Status VScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     } else {
         _push_down_agg_type = TPushAggOp::type::NONE;
     }
+
+    if (tnode.__isset.push_down_count) {
+        _push_down_count = tnode.push_down_count;
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 6b1922b0b8..c023bbabe2 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -136,6 +136,9 @@ public:
     }
 
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
+
+    int64_t get_push_down_count() { return _push_down_count; }
+
     // Get next block.
     // If eos is true, no more data will be read and block should be empty.
     Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
@@ -349,6 +352,9 @@ protected:
 
     TPushAggOp::type _push_down_agg_type;
 
+    // Record the value of the aggregate function 'count' from doris's be
+    int64_t _push_down_count = -1;
+
 private:
     Status _normalize_conjuncts();
     Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, 
VExprContext* context,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 579b80586b..8dcc8043b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1191,6 +1191,10 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
         this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
     }
 
+    public TPushAggOp getPushDownAggNoGroupingOp() {
+        return pushDownAggNoGroupingOp;
+    }
+
     public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
         return false;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 5fa64fa2b6..7293982ebb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -18,8 +18,10 @@
 package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.external.ExternalTable;
@@ -43,6 +45,8 @@ import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
 import avro.shaded.com.google.common.base.Preconditions;
@@ -55,6 +59,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -68,6 +73,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -79,6 +85,9 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
     public static final String DEFAULT_DATA_PATH = "/data/";
+    private static final String TOTAL_RECORDS = "total-records";
+    private static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
+    private static final String TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
 
     private IcebergSource source;
     private Table icebergTable;
@@ -210,8 +219,8 @@ public class IcebergScanNode extends FileQueryScanNode {
                         splitTask.length(),
                         splitTask.file().fileSizeInBytes(),
                         new String[0],
+                        formatVersion,
                         source.getCatalog().getProperties());
-                split.setFormatVersion(formatVersion);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
                 }
@@ -222,6 +231,12 @@ public class IcebergScanNode extends FileQueryScanNode {
             throw new UserException(e.getMessage(), e.getCause());
         }
 
+        TPushAggOp aggOp = getPushDownAggNoGroupingOp();
+        if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
+            // we can create a special empty split and skip the plan process
+            return Collections.singletonList(splits.get(0));
+        }
+
         readPartitionNum = partitionPathSet.size();
 
         return splits;
@@ -334,4 +349,50 @@ public class IcebergScanNode extends FileQueryScanNode {
     public Map<String, String> getLocationProperties() throws UserException {
         return source.getCatalog().getProperties();
     }
+
+    @Override
+    public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) {
+        String aggFunctionName = 
aggExpr.getFnName().getFunction().toUpperCase();
+        return "COUNT".equals(aggFunctionName);
+    }
+
+    @Override
+    public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, 
Column col) {
+        return !col.isAllowNull();
+    }
+
+    private long getCountFromSnapshot() {
+        Long specifiedSnapshot;
+        try {
+            specifiedSnapshot = getSpecifiedSnapshot();
+        } catch (UserException e) {
+            return -1;
+        }
+
+        Snapshot snapshot = specifiedSnapshot == null
+                ? icebergTable.currentSnapshot() : 
icebergTable.snapshot(specifiedSnapshot);
+
+        // empty table
+        if (snapshot == null) {
+            return -1;
+        }
+
+        Map<String, String> summary = snapshot.summary();
+        if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) {
+            return Long.parseLong(summary.get(TOTAL_RECORDS)) - 
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    protected void toThrift(TPlanNode planNode) {
+        super.toThrift(planNode);
+        if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
+            long countFromSnapshot = getCountFromSnapshot();
+            if (countFromSnapshot > 0) {
+                planNode.setPushDownCount(countFromSnapshot);
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index de3f2ec6aa..29deb293b3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -27,10 +27,12 @@ import java.util.Map;
 
 @Data
 public class IcebergSplit extends FileSplit {
+
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
-                        Map<String, String> config) {
+                        Integer formatVersion, Map<String, String> config) {
         super(file, start, length, fileLength, hosts, null);
+        this.formatVersion = formatVersion;
         this.config = config;
     }
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 84492bcc16..0717fc498d 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1147,6 +1147,8 @@ struct TPlanNode {
   47: optional TTestExternalScanNode test_external_scan_node
 
   48: optional TPushAggOp push_down_agg_type_opt
+
+  49: optional i64 push_down_count
   
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to