This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9135b676a9d [improvement](iceberg/paimon)support estimate row count (#31204) 9135b676a9d is described below commit 9135b676a9d0686841e2f5a116967951d9479ec4 Author: wuwenchi <wuwenchi...@hotmail.com> AuthorDate: Mon Feb 26 11:05:09 2024 +0800 [improvement](iceberg/paimon)support estimate row count (#31204) Get the number of rows evaluated for iceberg and paimon. --- .../doris/datasource/hive/HMSExternalTable.java | 2 +- .../datasource/iceberg/IcebergExternalTable.java | 6 ++++ .../doris/datasource/iceberg/IcebergUtils.java | 34 ++++++++++++++++++++++ .../datasource/iceberg/source/IcebergScanNode.java | 8 ++--- .../datasource/paimon/PaimonExternalTable.java | 17 +++++++++++ .../doris/statistics/util/StatisticsUtil.java | 26 ----------------- 6 files changed, 61 insertions(+), 32 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 0e11267829c..d095a959e90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -321,7 +321,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI rowCount = StatisticsUtil.getHiveRowCount(this); break; case ICEBERG: - rowCount = StatisticsUtil.getIcebergRowCount(this); + rowCount = IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); break; default: if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index 21f7c1d3d21..dfc78f44944 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -87,4 +87,10 @@ public class IcebergExternalTable extends ExternalTable { makeSureInitialized(); return new ExternalAnalysisTask(info); } + + @Override + public long fetchRowCount() { + makeSureInitialized(); + return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index f66babfe03e..1102527fa3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -45,6 +45,8 @@ import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.types.Types; @@ -54,6 +56,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; /** * Iceberg utils @@ -65,6 +68,10 @@ public class IcebergUtils { // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; + public static final String TOTAL_RECORDS = "total-records"; + public static final String TOTAL_POSITION_DELETES = "total-position-deletes"; + public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; + public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -314,4 +321,31 @@ public class IcebergUtils { return tmpSchema; }); } + + + /** + * Estimate iceberg table row count. + * Get the row count by adding all task file recordCount. + * + * @return estimated row count + */ + public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, String tbName) { + try { + Table icebergTable = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, dbName, tbName); + Snapshot snapshot = icebergTable.currentSnapshot(); + if (snapshot == null) { + // empty table + return 0; + } + Map<String, String> summary = snapshot.summary(); + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); + } + return -1; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index f8b72208ea4..e2564eae527 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -87,9 +87,6 @@ import java.util.stream.Collectors; public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; - private static final String TOTAL_RECORDS = "total-records"; - private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; - private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; private IcebergSource source; private Table icebergTable; @@ -424,8 +421,9 @@ public class IcebergScanNode extends FileQueryScanNode { } Map<String, String> summary = snapshot.summary(); - if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) { - return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + if (summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) { + return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) + - Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES)); } else { return -1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index f921fcd681a..41440c3f4cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DecimalType; @@ -163,4 +164,20 @@ public class PaimonExternalTable extends ExternalTable { makeSureInitialized(); return new ExternalAnalysisTask(info); } + + @Override + public long fetchRowCount() { + makeSureInitialized(); + try { + long rowCount = 0; + List<Split> splits = originTable.newReadBuilder().newScan().plan().splits(); + for (Split split : splits) { + rowCount += split.rowCount(); + } + return rowCount; + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); + } + return -1; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 8688447dcb9..8ee08d57e69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -84,7 +84,6 @@ import org.apache.commons.text.StringSubstitutor; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; @@ -596,31 +595,6 @@ public class StatisticsUtil { return parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE)) : 0; } - /** - * Estimate iceberg table row count. - * Get the row count by adding all task file recordCount. - * - * @param table Iceberg HMSExternalTable to estimate row count. - * @return estimated row count - */ - public static long getIcebergRowCount(HMSExternalTable table) { - long rowCount = 0; - try { - Table icebergTable = Env.getCurrentEnv() - .getExtMetaCacheMgr() - .getIcebergMetadataCache() - .getIcebergTable(table.getCatalog(), table.getDbName(), table.getName()); - TableScan tableScan = icebergTable.newScan().includeColumnStats(); - for (FileScanTask task : tableScan.planFiles()) { - rowCount += task.file().recordCount(); - } - return rowCount; - } catch (Exception e) { - LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e); - } - return -1; - } - /** * Estimate hive table row count : totalFileSize/estimatedRowSize * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org