This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new e6545a36a32 [improvement](iceberg)Parallelize splits for count(*) for 
2.1 (#41169) (#41880)
e6545a36a32 is described below

commit e6545a36a327856b32585fc5792c3069ed12ee5b
Author: wuwenchi <wuwenchi...@hotmail.com>
AuthorDate: Wed Oct 16 10:52:06 2024 +0800

    [improvement](iceberg)Parallelize splits for count(*) for 2.1 (#41169) 
(#41880)
    
    bp: #41169
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 10 ++++--
 be/src/vec/exec/format/table/iceberg_reader.h      | 13 ++++---
 be/src/vec/exec/scan/vfile_scanner.cpp             |  8 ++---
 .../datasource/iceberg/source/IcebergScanNode.java | 40 +++++++++++++++++-----
 .../datasource/iceberg/source/IcebergSplit.java    |  9 +++++
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 6 files changed, 59 insertions(+), 22 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 424ef5d7bf4..295a3a40544 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -76,15 +76,14 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
                                        RuntimeProfile* profile, RuntimeState* 
state,
                                        const TFileScanRangeParams& params,
                                        const TFileRangeDesc& range, 
ShardedKVCache* kv_cache,
-                                       io::IOContext* io_ctx, int64_t 
push_down_count)
+                                       io::IOContext* io_ctx)
         : TableFormatReader(std::move(file_format_reader)),
           _profile(profile),
           _state(state),
           _params(params),
           _range(range),
           _kv_cache(kv_cache),
-          _io_ctx(io_ctx),
-          _remaining_push_down_count(push_down_count) {
+          _io_ctx(io_ctx) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -95,6 +94,11 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
     _iceberg_profile.delete_rows_sort_time =
             ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
+    if (range.table_format_params.iceberg_params.__isset.row_count) {
+        _remaining_push_down_count = 
range.table_format_params.iceberg_params.row_count;
+    } else {
+        _remaining_push_down_count = -1;
+    }
 }
 
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index a4ecbe9e360..04f64aad518 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -76,8 +76,8 @@ public:
 
     IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                        RuntimeState* state, const TFileScanRangeParams& params,
-                       const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx,
-                       int64_t push_down_count);
+                       const TFileRangeDesc& range, ShardedKVCache* kv_cache,
+                       io::IOContext* io_ctx);
     ~IcebergTableReader() override = default;
 
     Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) final;
@@ -197,9 +197,9 @@ public:
     IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                          RuntimeState* state, const TFileScanRangeParams& 
params,
                          const TFileRangeDesc& range, ShardedKVCache* kv_cache,
-                         io::IOContext* io_ctx, int64_t push_down_count)
+                         io::IOContext* io_ctx)
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
-                                 kv_cache, io_ctx, push_down_count) {}
+                                 kv_cache, io_ctx) {}
     Status init_reader(
             const std::vector<std::string>& file_col_names,
             const std::unordered_map<int, std::string>& col_id_name_map,
@@ -237,10 +237,9 @@ public:
 
     IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                      RuntimeState* state, const TFileScanRangeParams& params,
-                     const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx,
-                     int64_t push_down_count)
+                     const TFileRangeDesc& range, ShardedKVCache* kv_cache, 
io::IOContext* io_ctx)
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
-                                 kv_cache, io_ctx, push_down_count) {}
+                                 kv_cache, io_ctx) {}
 
     void set_delete_rows() override {
         auto* orc_reader = (OrcReader*)_file_format_reader.get();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a61c6d1c094..d7ce386f85c 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -844,7 +844,7 @@ Status VFileScanner::_get_next_reader() {
                 std::unique_ptr<IcebergParquetReader> iceberg_reader =
                         
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
                                                             _state, *_params, 
range, _kv_cache,
-                                                            _io_ctx.get(), 
_get_push_down_count());
+                                                            _io_ctx.get());
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _col_id_name_map, 
_colname_to_value_range,
                         _push_down_conjuncts, _real_tuple_desc, 
_default_val_row_desc.get(),
@@ -914,9 +914,9 @@ Status VFileScanner::_get_next_reader() {
                 _cur_reader = std::move(tran_orc_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"iceberg") {
-                std::unique_ptr<IcebergOrcReader> iceberg_reader = 
IcebergOrcReader::create_unique(
-                        std::move(orc_reader), _profile, _state, *_params, 
range, _kv_cache,
-                        _io_ctx.get(), _get_push_down_count());
+                std::unique_ptr<IcebergOrcReader> iceberg_reader =
+                        IcebergOrcReader::create_unique(std::move(orc_reader), 
_profile, _state,
+                                                        *_params, range, 
_kv_cache, _io_ctx.get());
 
                 init_status = iceberg_reader->init_reader(
                         _file_col_names, _col_id_name_map, 
_colname_to_value_range,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 2ca51298fe6..fe6c54cf53b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -36,6 +36,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -86,6 +87,8 @@ public class IcebergScanNode extends FileQueryScanNode {
     private IcebergSource source;
     private Table icebergTable;
     private List<String> pushdownIcebergPredicates = Lists.newArrayList();
+    private boolean pushDownCount = false;
+    private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
 
     /**
      * External file scan node for Query iceberg table
@@ -137,6 +140,9 @@ public class IcebergScanNode extends FileQueryScanNode {
         int formatVersion = icebergSplit.getFormatVersion();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
+        if (pushDownCount) {
+            fileDesc.setRowCount(icebergSplit.getRowCount());
+        }
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
@@ -255,9 +261,24 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         TPushAggOp aggOp = getPushDownAggNoGroupingOp();
-        if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) {
+        if (aggOp.equals(TPushAggOp.COUNT)) {
             // we can create a special empty split and skip the plan process
-            return splits.isEmpty() ? splits : 
Collections.singletonList(splits.get(0));
+            if (splits.isEmpty()) {
+                return splits;
+            }
+            long countFromSnapshot = getCountFromSnapshot();
+            if (countFromSnapshot >= 0) {
+                pushDownCount = true;
+                List<Split> pushDownCountSplits;
+                if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
+                    int parallelNum = 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), parallelNum));
+                } else {
+                    pushDownCountSplits = 
Collections.singletonList(splits.get(0));
+                }
+                assignCountToSplits(pushDownCountSplits, countFromSnapshot);
+                return pushDownCountSplits;
+            }
         }
 
         selectedPartitionNum = partitionPathSet.size();
@@ -374,12 +395,6 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     protected void toThrift(TPlanNode planNode) {
         super.toThrift(planNode);
-        if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
-            long countFromSnapshot = getCountFromSnapshot();
-            if (countFromSnapshot >= 0) {
-                planNode.setPushDownCount(countFromSnapshot);
-            }
-        }
     }
 
     @Override
@@ -399,4 +414,13 @@ public class IcebergScanNode extends FileQueryScanNode {
         return super.getNodeExplainString(prefix, detailLevel)
                 + String.format("%sicebergPredicatePushdown=\n%s\n", prefix, 
sb);
     }
+
+    private void assignCountToSplits(List<Split> splits, long totalCount) {
+        int size = splits.size();
+        long countPerSplit = totalCount / size;
+        for (int i = 0; i < size - 1; i++) {
+            ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
+        }
+        ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + 
totalCount % size);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 8549e96bc2e..46e8f96ba35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -37,6 +37,7 @@ public class IcebergSplit extends FileSplit {
     private Integer formatVersion;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
     private Map<String, String> config;
+    private long rowCount = -1;
 
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(LocationPath file, long start, long length, long 
fileLength, String[] hosts,
@@ -47,4 +48,12 @@ public class IcebergSplit extends FileSplit {
         this.config = config;
         this.originalPath = originalPath;
     }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public void setRowCount(long rowCount) {
+        this.rowCount = rowCount;
+    }
 }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index daf2e28a991..72d0951062c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -307,6 +307,7 @@ struct TIcebergFileDesc {
     // Deprecated
     5: optional Exprs.TExpr file_select_conjunct;
     6: optional string original_file_path;
+    7: optional i64 row_count;
 }
 
 struct TPaimonDeletionFileDesc {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to