This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c76e4d803eaa8008fca66f17117da4e215c8a80c Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Jul 5 16:07:12 2023 +0800 [improvement](statistics, multi catalog)Estimate hive table row count based on file size. (#21207) Support estimate table row count based on file size. With sample size=3000 (total partition number is 87491), load cache time is 45s. With sample size=100000 (more than total partition number 87505), load cache time is 388s. --- .../main/java/org/apache/doris/common/Config.java | 5 + .../doris/catalog/external/HMSExternalTable.java | 30 +++-- .../doris/statistics/StatisticConstants.java | 2 + .../apache/doris/statistics/StatisticsCache.java | 7 +- .../statistics/TableStatisticsCacheLoader.java | 2 +- .../doris/statistics/util/StatisticsUtil.java | 133 +++++++++++++++++++++ 6 files changed, 168 insertions(+), 11 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 8038b86b93..ab49c8e2ac 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2023,4 +2023,9 @@ public class Config extends ConfigBase { "是否禁止使用 WITH REOSOURCE 语句创建 Catalog。", "Whether to disable creating catalog with WITH RESOURCE statement."}) public static boolean disallow_create_catalog_with_resource = true; + + @ConfField(mutable = true, masterOnly = false, description = { + "Hive行数估算分区采样数", + "Sample size for hive row count estimation."}) + public static int hive_stats_partition_sample_size = 3000; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index c9f03fd80c..1aee1803ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -79,9 +79,7 @@ public class HMSExternalTable extends ExternalTable { private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties"; private static final String TBL_PROP_INSERT_ONLY = "insert_only"; - - public static final String NUM_ROWS = "numRows"; - public static final String NUM_FILES = "numFiles"; + private static final String NUM_ROWS = "numRows"; static { SUPPORTED_HIVE_FILE_FORMATS = Sets.newHashSet(); @@ -269,7 +267,24 @@ public class HMSExternalTable extends ExternalTable { @Override public long getRowCount() { - return 0; + makeSureInitialized(); + long rowCount; + switch (dlaType) { + case HIVE: + rowCount = StatisticsUtil.getHiveRowCount(this); + break; + case ICEBERG: + rowCount = StatisticsUtil.getIcebergRowCount(this); + break; + default: + LOG.warn("getRowCount for dlaType {} is not supported.", dlaType); + rowCount = -1; + } + if (rowCount == -1) { + LOG.debug("Will estimate row count from file list."); + rowCount = StatisticsUtil.getRowCountFromFileList(this); + } + return rowCount; } @Override @@ -416,10 +431,12 @@ public class HMSExternalTable extends ExternalTable { Optional<TableStatistic> tableStatistics = Env.getCurrentEnv().getStatisticsCache().getTableStatistics( catalog.getId(), catalog.getDbOrAnalysisException(dbName).getId(), id); if (tableStatistics.isPresent()) { - return tableStatistics.get().rowCount; + long rowCount = tableStatistics.get().rowCount; + LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount); + return rowCount; } } catch (Exception e) { - LOG.warn(String.format("Fail to get row count for table %s", name), e); + LOG.warn("Fail to get row count for table {}", name, e); } return 1; } @@ -576,6 +593,5 @@ public class HMSExternalTable extends ExternalTable { builder.setMaxValue(Double.MAX_VALUE); } } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 89167d64f2..eb9572bb74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -44,6 +44,8 @@ public class StatisticConstants { public static final int STATISTICS_CACHE_REFRESH_INTERVAL = 24 * 2; + public static final int ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS = 12; + /** * Bucket count fot column_statistics and analysis_job table. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 2963cbea00..f46e19f529 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -72,7 +72,7 @@ public class StatisticsCache { private final AsyncLoadingCache<StatisticsCacheKey, Optional<TableStatistic>> tableStatisticsCache = Caffeine.newBuilder() .maximumSize(Config.stats_cache_size) - .expireAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_VALID_DURATION_IN_HOURS)) + .expireAfterWrite(Duration.ofHours(StatisticConstants.ROW_COUNT_CACHE_VALID_DURATION_IN_HOURS)) .executor(threadPool) .buildAsync(tableStatisticsCacheLoader); @@ -143,8 +143,9 @@ public class StatisticsCache { StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tableId); try { CompletableFuture<Optional<TableStatistic>> f = tableStatisticsCache.get(k); - // Synchronous return the cache value for table row count. - return f.get(); + if (f.isDone()) { + return f.get(); + } } catch (Exception e) { LOG.warn("Unexpected exception while returning Histogram", e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java index 6847dd6b97..817e74540f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatisticsCacheLoader.java @@ -49,7 +49,7 @@ public class TableStatisticsCacheLoader extends StatisticsCacheLoader<Optional<T long rowCount = table.getRowCount(); long lastAnalyzeTimeInMs = System.currentTimeMillis(); String updateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(lastAnalyzeTimeInMs)); - Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, updateTime)); + return Optional.of(new TableStatistic(rowCount, lastAnalyzeTimeInMs, updateTime)); } catch (Exception e) { LOG.warn(String.format("Fail to get row count for table %d", key.tableId), e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 2ee244fbe6..0bdf736aab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -32,17 +32,25 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; +import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.datasource.hive.HivePartition; import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; import org.apache.doris.qe.AutoCloseConnectContext; @@ -59,9 +67,15 @@ import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import java.text.SimpleDateFormat; @@ -80,10 +94,14 @@ import java.util.function.Function; import java.util.stream.Collectors; public class StatisticsUtil { + private static final Logger LOG = LogManager.getLogger(StatisticsUtil.class); private static final String ID_DELIMITER = "-"; private static final String VALUES_DELIMITER = ","; + private static final String TOTAL_SIZE = "totalSize"; + private static final String NUM_ROWS = "numRows"; + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; public static List<ResultRow> executeQuery(String template, Map<String, String> params) { @@ -461,4 +479,119 @@ public class StatisticsUtil { return (int) (healthCoefficient * 100.0); } } + + /** + * Estimate hive table row count. + * First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize + * @param table Hive HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getHiveRowCount(HMSExternalTable table) { + Map<String, String> parameters = table.getRemoteTable().getParameters(); + if (parameters == null) { + return -1; + } + // Table parameters contains row count, simply get and return it. + if (parameters.containsKey(NUM_ROWS)) { + return Long.parseLong(parameters.get(NUM_ROWS)); + } + if (!parameters.containsKey(TOTAL_SIZE)) { + return -1; + } + // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize + long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); + long estimatedRowSize = 0; + for (Column column : table.getFullSchema()) { + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 1; + } + return totalSize / estimatedRowSize; + } + + /** + * Estimate iceberg table row count. + * Get the row count by adding all task file recordCount. + * @param table Iceberg HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getIcebergRowCount(HMSExternalTable table) { + long rowCount = 0; + try { + Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table); + TableScan tableScan = icebergTable.newScan().includeColumnStats(); + for (FileScanTask task : tableScan.planFiles()) { + rowCount += task.file().recordCount(); + } + return rowCount; + } catch (Exception e) { + LOG.warn("Fail to collect row count for db {} table {}", table.getDbName(), table.getName(), e); + } + return -1; + } + + /** + * Estimate hive table row count : totalFileSize/estimatedRowSize + * @param table Hive HMSExternalTable to estimate row count. + * @return estimated row count + */ + public static long getRowCountFromFileList(HMSExternalTable table) { + HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() + .getMetaStoreCache((HMSExternalCatalog) table.getCatalog()); + List<Type> partitionColumnTypes = table.getPartitionColumnTypes(); + HiveMetaStoreCache.HivePartitionValues partitionValues = null; + List<HivePartition> hivePartitions = Lists.newArrayList(); + int samplePartitionSize = Config.hive_stats_partition_sample_size; + int totalPartitionSize = 1; + // Get table partitions from cache. + if (!partitionColumnTypes.isEmpty()) { + partitionValues = cache.getPartitionValues(table.getDbName(), table.getName(), partitionColumnTypes); + } + if (partitionValues != null) { + Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem(); + totalPartitionSize = idToPartitionItem.size(); + Collection<PartitionItem> partitionItems; + List<List<String>> partitionValuesList; + // If partition number is too large, randomly choose part of them to estimate the whole table. + if (samplePartitionSize < totalPartitionSize) { + List<PartitionItem> items = new ArrayList<>(idToPartitionItem.values()); + Collections.shuffle(items); + partitionItems = items.subList(0, samplePartitionSize); + partitionValuesList = Lists.newArrayListWithCapacity(samplePartitionSize); + } else { + partitionItems = idToPartitionItem.values(); + partitionValuesList = Lists.newArrayListWithCapacity(totalPartitionSize); + } + for (PartitionItem item : partitionItems) { + partitionValuesList.add(((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringList()); + } + hivePartitions = cache.getAllPartitions(table.getDbName(), table.getName(), partitionValuesList); + } else { + hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true, + table.getRemoteTable().getSd().getInputFormat(), + table.getRemoteTable().getSd().getLocation(), null)); + } + // Get files for all partitions. + List<HiveMetaStoreCache.FileCacheValue> filesByPartitions = cache.getFilesByPartitions(hivePartitions, true); + long totalSize = 0; + // Calculate the total file size. + for (HiveMetaStoreCache.FileCacheValue files : filesByPartitions) { + for (HiveMetaStoreCache.HiveFileStatus file : files.getFiles()) { + totalSize += file.getLength(); + } + } + // Estimate row count: totalSize/estimatedRowSize + long estimatedRowSize = 0; + for (Column column : table.getFullSchema()) { + estimatedRowSize += column.getDataType().getSlotSize(); + } + if (estimatedRowSize == 0) { + return 1; + } + if (samplePartitionSize < totalPartitionSize) { + totalSize = totalSize * totalPartitionSize / samplePartitionSize; + } + return totalSize / estimatedRowSize; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org