This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c76e4d803eaa8008fca66f17117da4e215c8a80c
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Jul 5 16:07:12 2023 +0800

    [improvement](statistics, multi catalog)Estimate hive table row count based 
on file size. (#21207)
    
    Support estimate table row count based on file size.
    
    With sample size=3000 (total partition number is 87491), load cache time is 
45s.
    With sample size=100000 (more than total partition number 87505), load 
cache time is 388s.
---
 .../main/java/org/apache/doris/common/Config.java  |   5 +
 .../doris/catalog/external/HMSExternalTable.java   |  30 +++--
 .../doris/statistics/StatisticConstants.java       |   2 +
 .../apache/doris/statistics/StatisticsCache.java   |   7 +-
 .../statistics/TableStatisticsCacheLoader.java     |   2 +-
 .../doris/statistics/util/StatisticsUtil.java      | 133 +++++++++++++++++++++
 6 files changed, 168 insertions(+), 11 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 8038b86b93..ab49c8e2ac 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2023,4 +2023,9 @@ public class Config extends ConfigBase {
             "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。",
             "Whether to disable creating catalog with WITH RESOURCE 
statement."})
     public static boolean disallow_create_catalog_with_resource = true;
+
+    @ConfField(mutable = true, masterOnly = false, description = {
+        "Hive行数估算分区采样数",
+        "Sample size for hive row count estimation."})
+    public static int hive_stats_partition_sample_size = 3000;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index c9f03fd80c..1aee1803ae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -79,9 +79,7 @@ public class HMSExternalTable extends ExternalTable {
 
     private static final String TBL_PROP_TXN_PROPERTIES = 
"transactional_properties";
     private static final String TBL_PROP_INSERT_ONLY = "insert_only";
-
-    public static final String NUM_ROWS = "numRows";
-    public static final String NUM_FILES = "numFiles";
+    private static final String NUM_ROWS = "numRows";
 
     static {
         SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet();
@@ -269,7 +267,24 @@ public class HMSExternalTable extends ExternalTable {
 
     @Override
     public long getRowCount() {
-        return 0;
+        makeSureInitialized();
+        long rowCount;
+        switch (dlaType) {
+            case HIVE:
+                rowCount = StatisticsUtil.getHiveRowCount(this);
+                break;
+            case ICEBERG:
+                rowCount = StatisticsUtil.getIcebergRowCount(this);
+                break;
+            default:
+                LOG.warn("getRowCount for dlaType {} is not supported.", 
dlaType);
+                rowCount = -1;
+        }
+        if (rowCount == -1) {
+            LOG.debug("Will estimate row count from file list.");
+            rowCount = StatisticsUtil.getRowCountFromFileList(this);
+        }
+        return rowCount;
     }
 
     @Override
@@ -416,10 +431,12 @@ public class HMSExternalTable extends ExternalTable {
             Optional<TableStatistic> tableStatistics = 
Env.getCurrentEnv().getStatisticsCache().getTableStatistics(
                     catalog.getId(), 
catalog.getDbOrAnalysisException(dbName).getId(), id);
             if (tableStatistics.isPresent()) {
-                return tableStatistics.get().rowCount;
+                long rowCount = tableStatistics.get().rowCount;
+                LOG.debug("Estimated row count for db {} table {} is {}.", 
dbName, name, rowCount);
+                return rowCount;
             }
         } catch (Exception e) {
-            LOG.warn(String.format("Fail to get row count for table %s", 
name), e);
+            LOG.warn("Fail to get row count for table {}", name, e);
         }
         return 1;
     }
@@ -576,6 +593,5 @@ public class HMSExternalTable extends ExternalTable {
             builder.setMaxValue(Double.MAX_VALUE);
         }
     }
-
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
index 89167d64f2..eb9572bb74 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java
@@ -44,6 +44,8 @@ public class StatisticConstants {
 
     public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2;
 
+    public static final int ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS = 12;
+
     /**
      * Bucket count fot column_statistics and analysis_job table.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
index 2963cbea00..f46e19f529 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java
@@ -72,7 +72,7 @@ public class StatisticsCache {
     private final AsyncLoadingCache<StatisticsCacheKey, 
Optional<TableStatistic>> tableStatisticsCache =
             Caffeine.newBuilder()
                     .maximumSize(Config.stats_cache_size)
-                    
.expireAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS))
+                    
.expireAfterWrite(Duration.ofHours(StatisticConstants.ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS))
                     .executor(threadPool)
                     .buildAsync(tableStatisticsCacheLoader);
 
@@ -143,8 +143,9 @@ public class StatisticsCache {
         StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, 
tableId);
         try {
             CompletableFuture<Optional<TableStatistic>> f = 
tableStatisticsCache.get(k);
-            // Synchronous return the cache value for table row count.
-            return f.get();
+            if (f.isDone()) {
+                return f.get();
+            }
         } catch (Exception e) {
             LOG.warn("Unexpected exception while returning Histogram", e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java
index 6847dd6b97..817e74540f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java
@@ -49,7 +49,7 @@ public class TableStatisticsCacheLoader extends 
StatisticsCacheLoader<Optional<T
             long rowCount = table.getRowCount();
             long lastAnalyzeTimeInMs = System.currentTimeMillis();
             String updateTime = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss").format(new Date(lastAnalyzeTimeInMs));
-            Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, 
updateTime));
+            return Optional.of(new TableStatistic(rowCount, 
lastAnalyzeTimeInMs, updateTime));
         } catch (Exception e) {
             LOG.warn(String.format("Fail to get row count for table %d", 
key.tableId), e);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 2ee244fbe6..0bdf736aab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -32,17 +32,25 @@ import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
+import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
 import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
 import org.apache.doris.qe.AutoCloseConnectContext;
@@ -59,9 +67,15 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.text.StringSubstitutor;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
 import java.text.SimpleDateFormat;
@@ -80,10 +94,14 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class StatisticsUtil {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsUtil.class);
 
     private static final String ID_DELIMITER = "-";
     private static final String VALUES_DELIMITER = ",";
 
+    private static final String TOTAL_SIZE = "totalSize";
+    private static final String NUM_ROWS = "numRows";
+
     private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
 
     public static List<ResultRow> executeQuery(String template, Map<String, 
String> params) {
@@ -461,4 +479,119 @@ public class StatisticsUtil {
             return (int) (healthCoefficient * 100.0);
         }
     }
+
+    /**
+     * Estimate hive table row count.
+     * First get it from remote table parameters. If not found, estimate it : 
totalSize/estimatedRowSize
+     * @param table Hive HMSExternalTable to estimate row count.
+     * @return estimated row count
+     */
+    public static long getHiveRowCount(HMSExternalTable table) {
+        Map<String, String> parameters = 
table.getRemoteTable().getParameters();
+        if (parameters == null) {
+            return -1;
+        }
+        // Table parameters contains row count, simply get and return it.
+        if (parameters.containsKey(NUM_ROWS)) {
+            return Long.parseLong(parameters.get(NUM_ROWS));
+        }
+        if (!parameters.containsKey(TOTAL_SIZE)) {
+            return -1;
+        }
+        // Table parameters doesn't contain row count but contain total size. 
Estimate row count : totalSize/rowSize
+        long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE));
+        long estimatedRowSize = 0;
+        for (Column column : table.getFullSchema()) {
+            estimatedRowSize += column.getDataType().getSlotSize();
+        }
+        if (estimatedRowSize == 0) {
+            return 1;
+        }
+        return totalSize / estimatedRowSize;
+    }
+
+    /**
+     * Estimate iceberg table row count.
+     * Get the row count by adding all task file recordCount.
+     * @param table Iceberg HMSExternalTable to estimate row count.
+     * @return estimated row count
+     */
+    public static long getIcebergRowCount(HMSExternalTable table) {
+        long rowCount = 0;
+        try {
+            Table icebergTable = 
HiveMetaStoreClientHelper.getIcebergTable(table);
+            TableScan tableScan = icebergTable.newScan().includeColumnStats();
+            for (FileScanTask task : tableScan.planFiles()) {
+                rowCount += task.file().recordCount();
+            }
+            return rowCount;
+        } catch (Exception e) {
+            LOG.warn("Fail to collect row count for db {} table {}", 
table.getDbName(), table.getName(), e);
+        }
+        return -1;
+    }
+
+    /**
+     * Estimate hive table row count : totalFileSize/estimatedRowSize
+     * @param table Hive HMSExternalTable to estimate row count.
+     * @return estimated row count
+     */
+    public static long getRowCountFromFileList(HMSExternalTable table) {
+        HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                .getMetaStoreCache((HMSExternalCatalog) table.getCatalog());
+        List<Type> partitionColumnTypes = table.getPartitionColumnTypes();
+        HiveMetaStoreCache.HivePartitionValues partitionValues = null;
+        List<HivePartition> hivePartitions = Lists.newArrayList();
+        int samplePartitionSize = Config.hive_stats_partition_sample_size;
+        int totalPartitionSize = 1;
+        // Get table partitions from cache.
+        if (!partitionColumnTypes.isEmpty()) {
+            partitionValues = cache.getPartitionValues(table.getDbName(), 
table.getName(), partitionColumnTypes);
+        }
+        if (partitionValues != null) {
+            Map<Long, PartitionItem> idToPartitionItem = 
partitionValues.getIdToPartitionItem();
+            totalPartitionSize = idToPartitionItem.size();
+            Collection<PartitionItem> partitionItems;
+            List<List<String>> partitionValuesList;
+            // If partition number is too large, randomly choose part of them 
to estimate the whole table.
+            if (samplePartitionSize < totalPartitionSize) {
+                List<PartitionItem> items = new 
ArrayList<>(idToPartitionItem.values());
+                Collections.shuffle(items);
+                partitionItems = items.subList(0, samplePartitionSize);
+                partitionValuesList = 
Lists.newArrayListWithCapacity(samplePartitionSize);
+            } else {
+                partitionItems = idToPartitionItem.values();
+                partitionValuesList = 
Lists.newArrayListWithCapacity(totalPartitionSize);
+            }
+            for (PartitionItem item : partitionItems) {
+                partitionValuesList.add(((ListPartitionItem) 
item).getItems().get(0).getPartitionValuesAsStringList());
+            }
+            hivePartitions = cache.getAllPartitions(table.getDbName(), 
table.getName(), partitionValuesList);
+        } else {
+            hivePartitions.add(new HivePartition(table.getDbName(), 
table.getName(), true,
+                    table.getRemoteTable().getSd().getInputFormat(),
+                    table.getRemoteTable().getSd().getLocation(), null));
+        }
+        // Get files for all partitions.
+        List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = 
cache.getFilesByPartitions(hivePartitions, true);
+        long totalSize = 0;
+        // Calculate the total file size.
+        for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) {
+            for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) {
+                totalSize += file.getLength();
+            }
+        }
+        // Estimate row count: totalSize/estimatedRowSize
+        long estimatedRowSize = 0;
+        for (Column column : table.getFullSchema()) {
+            estimatedRowSize += column.getDataType().getSlotSize();
+        }
+        if (estimatedRowSize == 0) {
+            return 1;
+        }
+        if (samplePartitionSize < totalPartitionSize) {
+            totalSize = totalSize * totalPartitionSize / samplePartitionSize;
+        }
+        return totalSize / estimatedRowSize;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to