This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new e6545a36a32 [improvement](iceberg)Parallelize splits for count(*) for 2.1 (#41169) (#41880) e6545a36a32 is described below commit e6545a36a327856b32585fc5792c3069ed12ee5b Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Wed Oct 16 10:52:06 2024 +0800 [improvement](iceberg)Parallelize splits for count(*) for 2.1 (#41169) (#41880) bp: #41169 --- be/src/vec/exec/format/table/iceberg_reader.cpp | 10 ++++-- be/src/vec/exec/format/table/iceberg_reader.h | 13 ++++--- be/src/vec/exec/scan/vfile_scanner.cpp | 8 ++--- .../datasource/iceberg/source/IcebergScanNode.java | 40 +++++++++++++++++----- .../datasource/iceberg/source/IcebergSplit.java | 9 +++++ gensrc/thrift/PlanNodes.thrift | 1 + 6 files changed, 59 insertions(+), 22 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 424ef5d7bf4..295a3a40544 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -76,15 +76,14 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx, int64_t push_down_count) + io::IOContext* io_ctx) : TableFormatReader(std::move(file_format_reader)), _profile(profile), _state(state), _params(params), _range(range), _kv_cache(kv_cache), - _io_ctx(io_ctx), - _remaining_push_down_count(push_down_count) { + _io_ctx(io_ctx) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -95,6 +94,11 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); + if (range.table_format_params.iceberg_params.__isset.row_count) { + _remaining_push_down_count = range.table_format_params.iceberg_params.row_count; + } else { + _remaining_push_down_count = -1; + } } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index a4ecbe9e360..04f64aad518 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -76,8 +76,8 @@ public: IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, - int64_t push_down_count); + const TFileRangeDesc& range, ShardedKVCache* kv_cache, + io::IOContext* io_ctx); ~IcebergTableReader() override = default; Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final; @@ -197,9 +197,9 @@ public: IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx, int64_t push_down_count) + io::IOContext* io_ctx) : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, - kv_cache, io_ctx, push_down_count) {} + kv_cache, io_ctx) {} Status init_reader( const std::vector<std::string>& file_col_names, const std::unordered_map<int, std::string>& col_id_name_map, @@ -237,10 +237,9 @@ public: IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, - int64_t push_down_count) + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx) : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, - kv_cache, io_ctx, push_down_count) {} + kv_cache, io_ctx) {} void set_delete_rows() override { auto* orc_reader = (OrcReader*)_file_format_reader.get(); diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index a61c6d1c094..d7ce386f85c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -844,7 +844,7 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr<IcebergParquetReader> iceberg_reader = IcebergParquetReader::create_unique(std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache, - _io_ctx.get(), _get_push_down_count()); + _io_ctx.get()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), @@ -914,9 +914,9 @@ Status VFileScanner::_get_next_reader() { _cur_reader = std::move(tran_orc_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { - std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique( - std::move(orc_reader), _profile, _state, *_params, range, _kv_cache, - _io_ctx.get(), _get_push_down_count()); + std::unique_ptr<IcebergOrcReader> iceberg_reader = + IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state, + *_params, range, _kv_cache, _io_ctx.get()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 2ca51298fe6..fe6c54cf53b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TExplainLevel; @@ -86,6 +87,8 @@ public class IcebergScanNode extends FileQueryScanNode { private IcebergSource source; private Table icebergTable; private List<String> pushdownIcebergPredicates = Lists.newArrayList(); + private boolean pushDownCount = false; + private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; /** * External file scan node for Query iceberg table @@ -137,6 +140,9 @@ public class IcebergScanNode extends FileQueryScanNode { int formatVersion = icebergSplit.getFormatVersion(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); + if (pushDownCount) { + fileDesc.setRowCount(icebergSplit.getRowCount()); + } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { @@ -255,9 +261,24 @@ public class IcebergScanNode extends FileQueryScanNode { } TPushAggOp aggOp = getPushDownAggNoGroupingOp(); - if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) { + if (aggOp.equals(TPushAggOp.COUNT)) { // we can create a special empty split and skip the plan process - return splits.isEmpty() ? splits : Collections.singletonList(splits.get(0)); + if (splits.isEmpty()) { + return splits; + } + long countFromSnapshot = getCountFromSnapshot(); + if (countFromSnapshot >= 0) { + pushDownCount = true; + List<Split> pushDownCountSplits; + if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) { + int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum)); + } else { + pushDownCountSplits = Collections.singletonList(splits.get(0)); + } + assignCountToSplits(pushDownCountSplits, countFromSnapshot); + return pushDownCountSplits; + } } selectedPartitionNum = partitionPathSet.size(); @@ -374,12 +395,6 @@ public class IcebergScanNode extends FileQueryScanNode { @Override protected void toThrift(TPlanNode planNode) { super.toThrift(planNode); - if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) { - long countFromSnapshot = getCountFromSnapshot(); - if (countFromSnapshot >= 0) { - planNode.setPushDownCount(countFromSnapshot); - } - } } @Override @@ -399,4 +414,13 @@ public class IcebergScanNode extends FileQueryScanNode { return super.getNodeExplainString(prefix, detailLevel) + String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb); } + + private void assignCountToSplits(List<Split> splits, long totalCount) { + int size = splits.size(); + long countPerSplit = totalCount / size; + for (int i = 0; i < size - 1; i++) { + ((IcebergSplit) splits.get(i)).setRowCount(countPerSplit); + } + ((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java index 8549e96bc2e..46e8f96ba35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java @@ -37,6 +37,7 @@ public class IcebergSplit extends FileSplit { private Integer formatVersion; private List<IcebergDeleteFileFilter> deleteFileFilters; private Map<String, String> config; + private long rowCount = -1; // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, @@ -47,4 +48,12 @@ public class IcebergSplit extends FileSplit { this.config = config; this.originalPath = originalPath; } + + public long getRowCount() { + return rowCount; + } + + public void setRowCount(long rowCount) { + this.rowCount = rowCount; + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index daf2e28a991..72d0951062c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -307,6 +307,7 @@ struct TIcebergFileDesc { // Deprecated 5: optional Exprs.TExpr file_select_conjunct; 6: optional string original_file_path; + 7: optional i64 row_count; } struct TPaimonDeletionFileDesc { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org