This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9c6c2f736ed [Improvement](statistics)Improve stats sample strategy (#26435) 9c6c2f736ed is described below commit 9c6c2f736edcae50ac8779c874dde2cd2e546c4f Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Mon Nov 13 15:52:21 2023 +0800 [Improvement](statistics)Improve stats sample strategy (#26435) Improve the accuracy of sample stats collection. For non distribution columns, use `n*d / (n - f1 + f1*n/N)` where `f1` is the number of distinct values that occurred exactly once in our sample of n rows (from a total of N), and `d` is the total number of distinct values in the sample. For distribution columns, use `ndv(n) * fraction of tablets sampled` for NDV. For very large tablet to sample, use limit to control the total lines to scan (for non key column only, because key column is sorted and will be inaccurate using limit). --- docs/en/docs/query-acceleration/statistics.md | 11 +- docs/zh-CN/docs/query-acceleration/statistics.md | 11 +- .../java/org/apache/doris/catalog/OlapTable.java | 14 ++ .../java/org/apache/doris/catalog/TableIf.java | 8 + .../doris/catalog/external/HMSExternalTable.java | 6 + .../org/apache/doris/statistics/AnalysisJob.java | 13 ++ .../apache/doris/statistics/AnalysisManager.java | 2 +- .../apache/doris/statistics/BaseAnalysisTask.java | 158 ++++++++++------ .../org/apache/doris/statistics/ColStatsData.java | 16 +- .../apache/doris/statistics/ColumnStatistic.java | 40 ++-- .../apache/doris/statistics/HMSAnalysisTask.java | 177 ++++++++++-------- .../apache/doris/statistics/JdbcAnalysisTask.java | 9 +- .../apache/doris/statistics/OlapAnalysisTask.java | 149 ++++++++++----- .../doris/statistics/StatisticConstants.java | 4 +- .../org/apache/doris/statistics/AnalyzeTest.java | 2 +- .../doris/statistics/BaseAnalysisTaskTest.java | 63 +++++++ .../doris/statistics/HMSAnalysisTaskTest.java | 141 ++++++++++++++ .../doris/statistics/OlapAnalysisTaskTest.java | 204 +++++++++++++++++++++ 18 files changed, 827 insertions(+), 201 deletions(-) diff --git a/docs/en/docs/query-acceleration/statistics.md b/docs/en/docs/query-acceleration/statistics.md index c25054094b6..976161b1fcb 100644 --- a/docs/en/docs/query-acceleration/statistics.md +++ b/docs/en/docs/query-acceleration/statistics.md @@ -54,8 +54,7 @@ Syntax: ```SQL ANALYZE < TABLE | DATABASE table_name | db_name > [ (column_name [, ...]) ] - [ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ] - [ PROPERTIES ("key" = "value", ...) ]; + [ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ]; ``` Where: @@ -64,7 +63,6 @@ Where: - `column_name`: The specified target column. It must be an existing column in `table_name`. You can specify multiple column names separated by commas. - `sync`: Collect statistics synchronously. Returns after collection. If not specified, it executes asynchronously and returns a JOB ID. - `sample percent | rows`: Collect statistics with sampling. You can specify a sampling percentage or a number of sampling rows. -- `sql`: Execute SQL to collect statistics for partitioned columns in external tables. By default, partitioned column information is collected from metadata, which is efficient but may not be accurate in terms of row count and data size. Users can specify using SQL to collect accurate partitioned column information. Here are some examples: @@ -90,6 +88,13 @@ The collection jobs for statistics themselves consume a certain amount of system If you are concerned about automatic collection jobs interfering with your business, you can specify a time frame for the automatic collection jobs to run during low business loads by setting the `full_auto_analyze_start_time` and `full_auto_analyze_end_time` parameters according to your needs. You can also completely disable this feature by setting the `enable_full_auto_analyze` parameter to `false`. +External catalogs do not participate in automatic collection by default. Because external catalogs often contain massive historical data, if they participate in automatic collection, it may occupy too many resources. You can turn on and off the automatic collection of external catalogs by setting the catalog's properties. + +```sql +ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // Turn on +ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // Turn off +``` + <br /> diff --git a/docs/zh-CN/docs/query-acceleration/statistics.md b/docs/zh-CN/docs/query-acceleration/statistics.md index 7700ae3db44..e93deb9d22c 100644 --- a/docs/zh-CN/docs/query-acceleration/statistics.md +++ b/docs/zh-CN/docs/query-acceleration/statistics.md @@ -56,8 +56,7 @@ Doris支持用户通过提交ANALYZE语句来手动触发统计信息的收集 ```SQL ANALYZE < TABLE | DATABASE table_name | db_name > [ (column_name [, ...]) ] - [ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] [ WITH SQL ] ] - [ PROPERTIES ("key" = "value", ...) ]; + [ [ WITH SYNC ] [ WITH SAMPLE PERCENT | ROWS ] ]; ``` 其中: @@ -66,7 +65,6 @@ ANALYZE < TABLE | DATABASE table_name | db_name > - column_name: 指定的目标列。必须是 `table_name` 中存在的列,多个列名称用逗号分隔。 - sync:同步收集统计信息。收集完后返回。若不指定则异步执行并返回JOB ID。 - sample percent | rows:抽样收集统计信息。可以指定抽样比例或者抽样行数。 -- sql:执行sql来收集外表分区列统计信息。默认从元数据收集分区列信息,这样效率比较高但是行数和数据量大小可能不准。用户可以指定使用sql来收集,这样可以收集到准确的分区列信息。 下面是一些例子 @@ -93,6 +91,13 @@ ANALYZE TABLE lineitem WITH SAMPLE ROWS 100000; 如果担心自动收集作业对业务造成干扰,可结合自身需求通过设置参数`full_auto_analyze_start_time`和参数`full_auto_analyze_end_time`指定自动收集作业在业务负载较低的时间段执行。也可以通过设置参数`enable_full_auto_analyze` 为`false`来彻底关闭本功能。 +External catalog 默认不参与自动收集。因为 external catalog 往往包含海量历史数据,如果参与自动收集,可能占用过多资源。可以通过设置 catalog 的 property 来打开和关闭 external catalog 的自动收集。 + +```sql +ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='true'); // 打开自动收集 +ALTER CATALOG external_catalog SET PROPERTIES ('enable.auto.analyze'='false'); // 关闭自动收集 +``` + <br /> ## 2. 作业管理 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 2e0df9cb3b6..83b9079cabd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -90,6 +90,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -2371,4 +2372,17 @@ public class OlapTable extends Table { } } } + + @Override + public boolean isDistributionColumn(String columnName) { + Set<String> distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase(Locale.ROOT)); + } + + @Override + public boolean isPartitionColumn(String columnName) { + return getPartitionInfo().getPartitionColumns().stream() + .anyMatch(c -> c.getName().equalsIgnoreCase(columnName)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 16aa4a6d370..c325d57f575 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -254,5 +254,13 @@ public interface TableIf { // TODO: Each tableIf should impl it by itself. return 0; } + + default boolean isDistributionColumn(String columnName) { + return false; + } + + default boolean isPartitionColumn(String columnName) { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index b5e45b5bcfe..f13e27436ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -699,6 +699,12 @@ public class HMSExternalTable extends ExternalTable { } return total; } + + @Override + public boolean isDistributionColumn(String columnName) { + return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) + .collect(Collectors.toSet()).contains(columnName.toLowerCase(Locale.ROOT)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index 904dc21e337..acea77b1b72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -76,6 +76,16 @@ public class AnalysisJob { queryingTask.remove(task); buf.addAll(statsData); queryFinished.add(task); + markOneTaskDone(); + } + + public synchronized void rowCountDone(BaseAnalysisTask task) { + queryingTask.remove(task); + queryFinished.add(task); + markOneTaskDone(); + } + + protected void markOneTaskDone() { queryFinishedTaskCount += 1; if (queryFinishedTaskCount == totalTaskCount) { writeBuf(); @@ -183,6 +193,9 @@ public class AnalysisJob { protected void syncLoadStats() { long tblId = jobInfo.tblId; for (BaseAnalysisTask task : queryFinished) { + if (task.info.externalTableLevelTask) { + continue; + } String colName = task.col.getName(); if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) { analysisManager.removeColStatsStatus(tblId, colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index e4b1b01fae8..17e151b77da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -333,11 +333,11 @@ public class AnalysisManager implements Writable { boolean isSync = stmt.isSync(); Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync); - constructJob(jobInfo, analysisTaskInfos.values()); if (!jobInfo.partitionOnly && stmt.isAllColumns() && StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) { createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync); } + constructJob(jobInfo, analysisTaskInfos.values()); if (isSync) { syncExecute(analysisTaskInfos.values()); updateTableStats(jobInfo); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 3fcebd6c38b..a278200e5c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -36,6 +36,7 @@ import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.text.MessageFormat; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -43,46 +44,80 @@ public abstract class BaseAnalysisTask { public static final Logger LOG = LogManager.getLogger(BaseAnalysisTask.class); - protected static final String NDV_MULTIPLY_THRESHOLD = "0.3"; - - protected static final String NDV_SAMPLE_TEMPLATE = "case when NDV(`${colName}`)/count('${colName}') < " - + NDV_MULTIPLY_THRESHOLD - + " then NDV(`${colName}`) " - + "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, " - ; + public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB + public static final double LIMIT_FACTOR = 1.2; protected static final String COLLECT_COL_STATISTICS = - "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " - + " ${catalogId} AS catalog_id, " - + " ${dbId} AS db_id, " - + " ${tblId} AS tbl_id, " - + " ${idxId} AS idx_id, " - + " '${colId}' AS col_id, " - + " NULL AS part_id, " - + " COUNT(1) AS row_count, " - + " NDV(`${colName}`) AS ndv, " - + " COUNT(1) - COUNT(${colName}) AS null_count, " - + " CAST(MIN(${colName}) AS STRING) AS min, " - + " CAST(MAX(${colName}) AS STRING) AS max, " - + " ${dataSizeFunction} AS data_size, " - + " NOW() AS update_time " - + " FROM `${dbName}`.`${tblName}`"; - - protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = - " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "NULL AS part_id, " - + "${row_count} AS row_count, " - + "${ndv} AS ndv, " - + "${null_count} AS null_count, " - + "'${min}' AS min, " - + "'${max}' AS max, " - + "${data_size} AS data_size, " + "SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + + " ${catalogId} AS `catalog_id`, " + + " ${dbId} AS `db_id`, " + + " ${tblId} AS `tbl_id`, " + + " ${idxId} AS `idx_id`, " + + " '${colId}' AS `col_id`, " + + " NULL AS `part_id`, " + + " COUNT(1) AS `row_count`, " + + " NDV(`${colName}`) AS `ndv`, " + + " COUNT(1) - COUNT(${colName}) AS `null_count`, " + + " CAST(MIN(${colName}) AS STRING) AS `min`, " + + " CAST(MAX(${colName}) AS STRING) AS `max`, " + + " ${dataSizeFunction} AS `data_size`, " + + " NOW() AS `update_time` " + + " FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + protected static final String LINEAR_ANALYZE_TEMPLATE = " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + + "${catalogId} AS `catalog_id`, " + + "${dbId} AS `db_id`, " + + "${tblId} AS `tbl_id`, " + + "${idxId} AS `idx_id`, " + + "'${colId}' AS `col_id`, " + + "NULL AS `part_id`, " + + "ROUND(COUNT(1) * ${scaleFactor}) AS `row_count`, " + + "ROUND(NDV(`${colName}`) * ${scaleFactor}) as `ndv`, " + + "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS `null_count`, " + + "${min} AS `min`, " + + "${max} AS `max`, " + + "${dataSizeFunction} * ${scaleFactor} AS `data_size`, " + + "NOW() " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints} ${limit}"; + + protected static final String DUJ1_ANALYZE_TEMPLATE = "SELECT " + + "CONCAT('${tblId}', '-', '${idxId}', '-', '${colId}') AS `id`, " + + "${catalogId} AS `catalog_id`, " + + "${dbId} AS `db_id`, " + + "${tblId} AS `tbl_id`, " + + "${idxId} AS `idx_id`, " + + "'${colId}' AS `col_id`, " + + "NULL AS `part_id`, " + + "${rowCount} AS `row_count`, " + + "${ndvFunction} as `ndv`, " + + "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * ${scaleFactor} as `null_count`, " + + "'${min}' AS `min`, " + + "'${max}' AS `max`, " + + "${dataSizeFunction} * ${scaleFactor} AS `data_size`, " + + "NOW() " + + "FROM ( " + + " SELECT t0.`${colName}` as column_key, COUNT(1) as `count` " + + " FROM " + + " (SELECT `${colName}` FROM `${catalogName}`.`${dbName}`.`${tblName}` " + + " ${sampleHints} ${limit}) as `t0` " + + " GROUP BY `t0`.`${colName}` " + + ") as `t1` "; + + protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, " + + "${catalogId} AS `catalog_id`, " + + "${dbId} AS `db_id`, " + + "${tblId} AS `tbl_id`, " + + "${idxId} AS `idx_id`, " + + "'${colId}' AS `col_id`, " + + "NULL AS `part_id`, " + + "${row_count} AS `row_count`, " + + "${ndv} AS `ndv`, " + + "${null_count} AS `null_count`, " + + "'${min}' AS `min`, " + + "'${max}' AS `max`, " + + "${data_size} AS `data_size`, " + "NOW() "; protected AnalysisInfo info; @@ -199,29 +234,51 @@ public abstract class BaseAnalysisTask { return info.jobId; } - // TODO : time cost is intolerable when column is string type, return 0 directly for now. - protected String getDataSizeFunction(Column column) { - if (column.getType().isStringType()) { - return "SUM(LENGTH(`${colName}`))"; + protected String getDataSizeFunction(Column column, boolean useDuj1) { + if (useDuj1) { + if (column.getType().isStringType()) { + return "SUM(LENGTH(`column_key`) * count)"; + } else { + return "SUM(t1.count) * " + column.getType().getSlotSize(); + } + } else { + if (column.getType().isStringType()) { + return "SUM(LENGTH(`${colName}`))"; + } else { + return "COUNT(1) * " + column.getType().getSlotSize(); + } } - return "COUNT(1) * " + column.getType().getSlotSize(); } - // Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan. protected String getMinFunction() { if (tableSample == null) { - return "CAST(MIN(`${colName}`) as ${type}) "; + return "to_base64(CAST(MIN(`${colName}`) as ${type})) "; } else { - return "NULL "; + // Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan. + return "NULL"; } } + protected String getNdvFunction(String totalRows) { + String sampleRows = "SUM(t1.count)"; + String onceCount = "SUM(IF(t1.count = 1, 1, 0))"; + String countDistinct = "COUNT(1)"; + // DUJ1 estimator: n*d / (n - f1 + f1*n/N) + // f1 is the count of element that appears only once in the sample. + // (https://github.com/postgres/postgres/blob/master/src/backend/commands/analyze.c) + // (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.93.8637&rep=rep1&type=pdf) + // sample_row * count_distinct / ( sample_row - once_count + once_count * sample_row / total_row) + String fn = MessageFormat.format("{0} * {1} / ({0} - {2} + {2} * {0} / {3})", sampleRows, + countDistinct, onceCount, totalRows); + return fn; + } + // Max value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan. protected String getMaxFunction() { if (tableSample == null) { - return "CAST(MAX(`${colName}`) as ${type}) "; + return "to_base64(CAST(MAX(`${colName}`) as ${type})) "; } else { - return "NULL "; + return "NULL"; } } @@ -254,12 +311,11 @@ public abstract class BaseAnalysisTask { this.job = job; } - protected void runQuery(String sql) { + protected void runQuery(String sql, boolean needEncode) { long startTime = System.currentTimeMillis(); try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) { stmtExecutor = new StmtExecutor(a.connectContext, sql); - stmtExecutor.executeInternalQuery(); - ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); + ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0), needEncode); job.appendBuf(this, Collections.singletonList(colStatsData)); } finally { LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java index 41936232afd..fa7546abd5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java @@ -56,6 +56,8 @@ public class ColStatsData { public final String updateTime; + public final boolean needEncode; + @VisibleForTesting public ColStatsData() { statsId = new StatsId(); @@ -66,9 +68,10 @@ public class ColStatsData { maxLit = null; dataSizeInBytes = 0; updateTime = null; + needEncode = true; } - public ColStatsData(ResultRow row) { + public ColStatsData(ResultRow row, boolean needEncode) { this.statsId = new StatsId(row); this.count = (long) Double.parseDouble(row.get(7)); this.ndv = (long) Double.parseDouble(row.getWithDefault(8, "0")); @@ -77,6 +80,7 @@ public class ColStatsData { this.maxLit = row.get(11); this.dataSizeInBytes = (long) Double.parseDouble(row.getWithDefault(12, "0")); this.updateTime = row.get(13); + this.needEncode = needEncode; } public String toSQL(boolean roundByParentheses) { @@ -89,10 +93,12 @@ public class ColStatsData { sj.add(String.valueOf(count)); sj.add(String.valueOf(ndv)); sj.add(String.valueOf(nullCount)); - sj.add(minLit == null ? "NULL" : - "'" + Base64.getEncoder().encodeToString(minLit.getBytes(StandardCharsets.UTF_8)) + "'"); - sj.add(maxLit == null ? "NULL" : - "'" + Base64.getEncoder().encodeToString(maxLit.getBytes(StandardCharsets.UTF_8)) + "'"); + sj.add(minLit == null ? "NULL" : needEncode + ? "'" + Base64.getEncoder().encodeToString(minLit.getBytes(StandardCharsets.UTF_8)) + "'" + : "'" + minLit + "'"); + sj.add(maxLit == null ? "NULL" : needEncode + ? "'" + Base64.getEncoder().encodeToString(maxLit.getBytes(StandardCharsets.UTF_8)) + "'" + : "'" + maxLit + "'"); sj.add(String.valueOf(dataSizeInBytes)); sj.add(StatisticsUtil.quote(updateTime)); return sj.toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 1ec22cbc47b..eefaf7badeb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.collect.Sets; @@ -174,30 +175,35 @@ public class ColumnStatistic { String min = row.get(10); String max = row.get(11); if (min != null && !min.equalsIgnoreCase("NULL")) { - min = new String(Base64.getDecoder().decode(min), - StandardCharsets.UTF_8); - - try { - columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); - columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); - } catch (AnalysisException e) { - LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + min = new String(Base64.getDecoder().decode(min), StandardCharsets.UTF_8); + // Internal catalog get the min/max value using a separate SQL, + // and the value is already encoded by base64. Need to handle internal and external catalog separately. + if (catalogId != InternalCatalog.INTERNAL_CATALOG_ID && min.equalsIgnoreCase("NULL")) { columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } else { + try { + columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); + columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } } } else { columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); } if (max != null && !max.equalsIgnoreCase("NULL")) { - - max = new String(Base64.getDecoder().decode(max), - StandardCharsets.UTF_8); - - try { - columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); - columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); - } catch (AnalysisException e) { - LOG.warn("Failed to deserialize column {} max value {}.", col, max, e); + max = new String(Base64.getDecoder().decode(max), StandardCharsets.UTF_8); + if (catalogId != InternalCatalog.INTERNAL_CATALOG_ID && max.equalsIgnoreCase("NULL")) { columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } else { + try { + columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); + columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} max value {}.", col, max, e); + columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } } } else { columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 049e80d52fd..e4555dcd8bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.statistics.util.StatisticsUtil; @@ -36,44 +37,23 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.stream.Collectors; public class HMSAnalysisTask extends BaseAnalysisTask { private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class); - // While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size) - // if ndv(col)/count(col) is greater than this threshold. - - private static final String ANALYZE_TABLE_TEMPLATE = " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "NULL AS part_id, " - + "ROUND(COUNT(1) * ${scaleFactor}) AS row_count, " - + NDV_SAMPLE_TEMPLATE - + "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS null_count, " - + "to_base64(${minFunction}) AS min, " - + "to_base64(${maxFunction}) AS max, " - + "${dataSizeFunction} * ${scaleFactor} AS data_size, " - + "NOW() " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}"; - - private final boolean isTableLevelTask; - private final boolean isPartitionOnly; - private Set<String> partitionNames; + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}"; + private boolean isTableLevelTask; + private boolean isPartitionOnly; private HMSExternalTable table; + public HMSAnalysisTask() { + } + public HMSAnalysisTask(AnalysisInfo info) { super(info); isTableLevelTask = info.externalTableLevelTask; isPartitionOnly = info.partitionOnly; - partitionNames = info.partitionNames; table = (HMSExternalTable) tbl; } @@ -85,6 +65,11 @@ public class HMSAnalysisTask extends BaseAnalysisTask { } } + // For test + protected void setTable(HMSExternalTable table) { + this.table = table; + } + /** * Get table row count */ @@ -97,6 +82,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { Env.getCurrentEnv().getAnalysisManager() .updateTableStatsStatus( new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info)); + job.rowCountDone(this); } /** @@ -120,34 +106,62 @@ public class HMSAnalysisTask extends BaseAnalysisTask { return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName())); } + // Get ordinary column stats. Ordinary column means not partition column. private void getOrdinaryColumnStats() throws Exception { - // An example sql for a column stats: - // INSERT INTO __internal_schema.column_statistics - // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, - // 13002 AS catalog_id, - // 13038 AS db_id, - // 13055 AS tbl_id, - // -1 AS idx_id, - // 'r_regionkey' AS col_id, - // 'NULL' AS part_id, - // COUNT(1) AS row_count, - // NDV(`r_regionkey`) AS ndv, - // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, - // MIN(`r_regionkey`) AS min, - // MAX(`r_regionkey`) AS max, - // 0 AS data_size, - // NOW() FROM `hive`.`tpch100`.`region` StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_TABLE_TEMPLATE); Map<String, String> params = buildStatsParams("NULL"); - params.put("dataSizeFunction", getDataSizeFunction(col)); - params.put("minFunction", getMinFunction()); - params.put("maxFunction", getMaxFunction()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + params.put("min", getMinFunction()); + params.put("max", getMaxFunction()); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); + Pair<Double, Long> sampleInfo = getSampleInfo(); + params.put("scaleFactor", String.valueOf(sampleInfo.first)); + StringSubstitutor stringSubstitutor; + if (tableSample == null) { + // Do full analyze + LOG.debug("Will do full collection for column {}", col.getName()); + sb.append(COLLECT_COL_STATISTICS); + } else { + // Do sample analyze + LOG.debug("Will do sample collection for column {}", col.getName()); + boolean limitFlag = false; + boolean bucketFlag = false; + // If sample size is too large, use limit to control the sample size. + if (needLimit(sampleInfo.second, sampleInfo.first)) { + limitFlag = true; + long columnSize = 0; + for (Column column : table.getFullSchema()) { + columnSize += column.getDataType().getSlotSize(); + } + double targetRows = (double) sampleInfo.second / columnSize; + // Estimate the new scaleFactor based on the schema. + if (targetRows > StatisticsUtil.getHugeTableSampleRows()) { + params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows()); + params.put("scaleFactor", + String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows())); + } + } + // Distribution columns don't fit for DUJ1 estimator, use linear estimator. + if (tbl.isDistributionColumn(col.getName())) { + bucketFlag = true; + sb.append(LINEAR_ANALYZE_TEMPLATE); + params.put("rowCount", "ROUND(count(1) * ${scaleFactor})"); + } else { + sb.append(DUJ1_ANALYZE_TEMPLATE); + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})")); + params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})"); + } + LOG.info("Sample for column [{}]. Scale factor [{}], " + + "limited [{}], is distribute column [{}]", + col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag); + } + stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - runQuery(sql); + runQuery(sql, true); } + // Collect the partition column stats through HMS metadata. + // Get all the partition values and calculate the stats based on the values. private void getPartitionColumnStats() throws Exception { Set<String> partitionNames = table.getPartitionNames(); Set<String> ndvPartValues = Sets.newHashSet(); @@ -190,7 +204,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { params.put("data_size", String.valueOf(dataSize)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE); - runQuery(sql); + runQuery(sql, true); } private String updateMinValue(String currentMin, String value) { @@ -235,20 +249,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask { return value.compareTo(currentMax) > 0 ? value : currentMax; } - private void getPartitionNames() { - if (partitionNames == null) { - if (info.isAllPartition) { - partitionNames = table.getPartitionNames(); - } else if (info.partitionCount > 0) { - partitionNames = table.getPartitionNames().stream() - .limit(info.partitionCount).collect(Collectors.toSet()); - } - if (partitionNames == null || partitionNames.isEmpty()) { - throw new RuntimeException("Not a partition table or no partition specified."); - } - } - } - private Map<String, String> buildStatsParams(String partId) { Map<String, String> commonParams = new HashMap<>(); String id = StatisticsUtil.constructId(tbl.getId(), -1); @@ -271,8 +271,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask { commonParams.put("catalogName", catalog.getName()); commonParams.put("dbName", db.getFullName()); commonParams.put("tblName", tbl.getName()); - commonParams.put("sampleExpr", getSampleExpression()); - commonParams.put("scaleFactor", getSampleScaleFactor()); + commonParams.put("sampleHints", getSampleHint()); + commonParams.put("limit", ""); + commonParams.put("scaleFactor", "1"); if (col != null) { commonParams.put("type", col.getType().toString()); } @@ -280,7 +281,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { return commonParams; } - protected String getSampleExpression() { + protected String getSampleHint() { if (tableSample == null) { return ""; } @@ -291,13 +292,17 @@ public class HMSAnalysisTask extends BaseAnalysisTask { } } - // Get the sample scale factor. While analyzing, the result of count, null count and data size need to - // multiply this factor to get more accurate result. - protected String getSampleScaleFactor() { + /** + * Get the pair of sample scale factor and the file size going to sample. + * While analyzing, the result of count, null count and data size need to + * multiply this scale factor to get more accurate result. + * @return Pair of sample scale factor and the file size going to sample. + */ + protected Pair<Double, Long> getSampleInfo() { if (tableSample == null) { - return "1"; + return Pair.of(1.0, 0L); } - long target = 0; + long target; // Get list of all files' size in this HMS table. List<Long> chunkSizes = table.getChunkSizes(); Collections.shuffle(chunkSizes, new Random(tableSample.getSeek())); @@ -324,7 +329,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { break; } } - return Double.toString(Math.max(((double) total) / cumulate, 1)); + return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate); } @Override @@ -336,4 +341,30 @@ public class HMSAnalysisTask extends BaseAnalysisTask { } Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); } + + /** + * If the size to sample is larger than LIMIT_SIZE (1GB) + * and is much larger (1.2*) than the size user want to sample, + * use limit to control the total sample size. + * @param sizeToRead The file size to sample. + * @param factor sizeToRead * factor = Table total size. + * @return True if need to limit. + */ + protected boolean needLimit(long sizeToRead, double factor) { + long total = (long) (sizeToRead * factor); + long target; + if (tableSample.isPercent()) { + target = total * tableSample.getSampleValue() / 100; + } else { + int columnSize = 0; + for (Column column : table.getFullSchema()) { + columnSize += column.getDataType().getSlotSize(); + } + target = columnSize * tableSample.getSampleValue(); + } + if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) { + return true; + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java index 649b075c673..700710f6e42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java @@ -63,7 +63,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask { if (isTableLevelTask) { getTableStats(); } else { - getTableColumnStats(); + getColumnStats(); } } @@ -77,12 +77,13 @@ public class JdbcAnalysisTask extends BaseAnalysisTask { String rowCount = columnResult.get(0).get(0); Env.getCurrentEnv().getAnalysisManager() .updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info)); + job.rowCountDone(this); } /** * Get column statistics and insert the result to __internal_schema.column_statistics */ - private void getTableColumnStats() throws Exception { + private void getColumnStats() throws Exception { // An example sql for a column stats: // INSERT INTO __internal_schema.column_statistics // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, @@ -106,10 +107,10 @@ public class JdbcAnalysisTask extends BaseAnalysisTask { params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); params.put("colName", col.getName()); params.put("colId", info.colName); - params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(sb.toString()); - runQuery(sql); + runQuery(sql, true); } private Map<String, String> buildTableStatsParams(String partId) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index b0c4b0b6c0e..42b43f52a7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -30,8 +30,10 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.text.StringSubstitutor; +import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -44,23 +46,10 @@ import java.util.stream.Collectors; */ public class OlapAnalysisTask extends BaseAnalysisTask { - private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "NULL AS part_id, " - + "ROUND(COUNT(1) * ${scaleFactor}) AS row_count, " - + NDV_SAMPLE_TEMPLATE - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor} AS null_count, " - + "NULL AS min, " - + "NULL AS max, " - + "${dataSizeFunction} * ${scaleFactor} AS data_size, " - + "NOW() " - + "FROM `${dbName}`.`${tblName}`" - + "${tablets}"; + private static final String BASIC_STATS_TEMPLATE = "SELECT " + + "MIN(`${colName}`) as min, " + + "MAX(`${colName}`) as max " + + "FROM `${dbName}`.`${tblName}`"; @VisibleForTesting public OlapAnalysisTask() { @@ -85,46 +74,96 @@ public class OlapAnalysisTask extends BaseAnalysisTask { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.debug("Will do sample collection for column {}", col.getName()); + Pair<List<Long>, Long> pair = calcActualSampleTablets(tbl.isPartitionColumn(col.getName())); + LOG.info("Number of tablets selected {}, rows in tablets {}", pair.first.size(), pair.second); List<Long> tabletIds = pair.first; double scaleFactor = (double) tbl.getRowCount() / (double) pair.second; // might happen if row count in fe metadata hasn't been updated yet if (Double.isInfinite(scaleFactor) || Double.isNaN(scaleFactor)) { + LOG.warn("Scale factor is infinite or Nan, will set scale factor to 1."); scaleFactor = 1; tabletIds = Collections.emptyList(); + pair.second = tbl.getRowCount(); } String tabletStr = tabletIds.stream() .map(Object::toString) .collect(Collectors.joining(", ")); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) { + // Get basic stats, including min and max. + ResultRow basicStats = collectBasicStat(r); + long rowCount = tbl.getRowCount(); + String min = Base64.getEncoder().encodeToString(basicStats.get(0).getBytes(StandardCharsets.UTF_8)); + String max = Base64.getEncoder().encodeToString(basicStats.get(1).getBytes(StandardCharsets.UTF_8)); + + boolean limitFlag = false; + long rowsToSample = pair.second; Map<String, String> params = new HashMap<>(); params.put("internalDB", FeConstants.INTERNAL_DB_NAME); params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); params.put("catalogId", String.valueOf(catalog.getId())); + params.put("catalogName", catalog.getName()); params.put("dbId", String.valueOf(db.getId())); params.put("tblId", String.valueOf(tbl.getId())); params.put("idxId", String.valueOf(info.indexId)); params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); params.put("dbName", db.getFullName()); params.put("colName", info.colName); params.put("tblName", tbl.getName()); params.put("scaleFactor", String.valueOf(scaleFactor)); - params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("ndvFunction", getNdvFunction(String.valueOf(rowCount))); + params.put("min", min); + params.put("max", max); + params.put("rowCount", String.valueOf(rowCount)); + params.put("type", col.getType().toString()); + params.put("limit", ""); + if (needLimit()) { + // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate + // the scaleFactor. + limitFlag = true; + rowsToSample = Math.min(getSampleRows(), pair.second); + params.put("limit", "limit " + rowsToSample); + params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample)); + } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE)); - // Scalar query only return one row - ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); - job.appendBuf(this, Collections.singletonList(colStatsData)); + String sql; + // Distribution columns don't fit for DUJ1 estimator, use linear estimator. + if (tbl.isDistributionColumn(col.getName())) { + params.put("min", StatisticsUtil.quote(min)); + params.put("max", StatisticsUtil.quote(max)); + sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE); + } else { + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE); + } + LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], " + + "limited [{}], distribute column [{}], partition column [{}], key column [{}]", + col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"), + limitFlag, tbl.isDistributionColumn(col.getName()), + tbl.isPartitionColumn(col.getName()), col.isKey()); + runQuery(sql, false); } } + protected ResultRow collectBasicStat(AutoCloseConnectContext context) { + Map<String, String> params = new HashMap<>(); + params.put("dbName", db.getFullName()); + params.put("colName", info.colName); + params.put("tblName", tbl.getName()); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + stmtExecutor = new StmtExecutor(context.connectContext, stringSubstitutor.replace(BASIC_STATS_TEMPLATE)); + return stmtExecutor.executeInternalQuery().get(0); + } + /** * 1. Get stats of each partition * 2. insert partition in batch * 3. calculate column stats based on partition stats */ protected void doFull() throws Exception { + LOG.debug("Will do full collection for column {}", col.getName()); Set<String> partitionNames = info.colToPartitions.get(info.colName); if (partitionNames.isEmpty()) { job.appendBuf(this, Collections.emptyList()); @@ -138,37 +177,30 @@ public class OlapAnalysisTask extends BaseAnalysisTask { params.put("tblId", String.valueOf(tbl.getId())); params.put("idxId", String.valueOf(info.indexId)); params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); + params.put("catalogName", catalog.getName()); params.put("dbName", db.getFullName()); params.put("colName", String.valueOf(info.colName)); params.put("tblName", String.valueOf(tbl.getName())); - execSQL(params); - } - - @VisibleForTesting - public void execSQL(Map<String, String> params) throws Exception { StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS); - runQuery(collectColStats); + runQuery(collectColStats, true); } // Get sample tablets id and scale up scaleFactor - protected Pair<List<Long>, Long> calcActualSampleTablets() { + protected Pair<List<Long>, Long> calcActualSampleTablets(boolean forPartitionColumn) { // Below code copied from OlapScanNode.java long sampleRows; // The total number of sample rows long totalRows = 0; // The total number of partition rows hit long totalTablet = 0; // The total number of tablets in the hit partition OlapTable olapTable = (OlapTable) tbl; - if (tableSample.isPercent()) { - sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1); - } else { - sampleRows = Math.max(tableSample.getSampleValue(), 1); - } + sampleRows = getSampleRows(); // calculate the number of tablets by each partition long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1); List<Long> sampleTabletIds = new ArrayList<>(); long actualSampledRowCount = 0; + boolean enough = false; for (Partition p : olapTable.getPartitions()) { List<Long> ids = p.getBaseIndex().getTabletIdsInOrder(); @@ -194,22 +226,55 @@ public class OlapAnalysisTask extends BaseAnalysisTask { long tabletId = ids.get(seekTid); sampleTabletIds.add(tabletId); actualSampledRowCount += baseIndex.getTablet(tabletId).getRowCount(true); + if (actualSampledRowCount >= sampleRows && !forPartitionColumn) { + enough = true; + break; + } } - totalRows += p.getBaseIndex().getRowCount(); totalTablet += ids.size(); + if (enough) { + break; + } } // all hit, direct full if (totalRows < sampleRows) { // can't fill full sample rows sampleTabletIds.clear(); - } else if (sampleTabletIds.size() == totalTablet) { - // TODO add limit + } else if (sampleTabletIds.size() == totalTablet && !enough) { sampleTabletIds.clear(); - } else if (!sampleTabletIds.isEmpty()) { - // TODO add limit } return Pair.of(sampleTabletIds, actualSampledRowCount); } + + /** + * For ordinary column (neither key column nor partition column), need to limit sample size to user specified value. + * @return Return true when need to limit. + */ + protected boolean needLimit() { + // Key column is sorted, use limit will cause the ndv not accurate enough, so skip key columns. + if (col.isKey()) { + return false; + } + // Partition column need to scan tablets from all partitions. + if (tbl.isPartitionColumn(col.getName())) { + return false; + } + return true; + } + + /** + * Calculate rows to sample based on user given sample value. + * @return Rows to sample. + */ + protected long getSampleRows() { + long sampleRows; + if (tableSample.isPercent()) { + sampleRows = (long) Math.max(tbl.getRowCount() * (tableSample.getSampleValue() / 100.0), 1); + } else { + sampleRows = Math.max(tableSample.getSampleValue(), 1); + } + return sampleRows; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index f008c8fe301..e6f71cd5911 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.FeConstants; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.system.SystemInfoService; import java.util.ArrayList; @@ -68,7 +69,8 @@ public class StatisticConstants { public static final String DB_NAME = SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME; - public static final String FULL_QUALIFIED_STATS_TBL_NAME = FeConstants.INTERNAL_DB_NAME + "." + STATISTIC_TBL_NAME; + public static final String FULL_QUALIFIED_STATS_TBL_NAME = InternalCatalog.INTERNAL_CATALOG_NAME + + "." + FeConstants.INTERNAL_DB_NAME + "." + STATISTIC_TBL_NAME; public static final int STATISTIC_INTERNAL_TABLE_REPLICA_NUM = 3; diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java index 268540885da..74d52cf20f1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalyzeTest.java @@ -158,7 +158,7 @@ public class AnalyzeTest extends TestWithFeService { new MockUp<BaseAnalysisTask>() { @Mock - protected void runQuery(String sql) {} + protected void runQuery(String sql, boolean needEncode) {} }; HashMap<String, Set<String>> colToPartitions = Maps.newHashMap(); colToPartitions.put("col1", Collections.singleton("t1")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java new file mode 100644 index 00000000000..e3d080fea0a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/BaseAnalysisTaskTest.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.TableSample; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class BaseAnalysisTaskTest { + + @Test + public void testGetFunctions() { + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + Column column = new Column("string_column", PrimitiveType.STRING); + String dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, true); + Assertions.assertEquals("SUM(LENGTH(`column_key`) * count)", dataSizeFunction); + dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, false); + Assertions.assertEquals("SUM(LENGTH(`${colName}`))", dataSizeFunction); + + column = new Column("int_column", PrimitiveType.INT); + dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, false); + Assertions.assertEquals("COUNT(1) * 4", dataSizeFunction); + dataSizeFunction = olapAnalysisTask.getDataSizeFunction(column, true); + Assertions.assertEquals("SUM(t1.count) * 4", dataSizeFunction); + + String minFunction = olapAnalysisTask.getMinFunction(); + Assertions.assertEquals("to_base64(CAST(MIN(`${colName}`) as ${type})) ", minFunction); + olapAnalysisTask.tableSample = new TableSample(true, 20L); + minFunction = olapAnalysisTask.getMinFunction(); + Assertions.assertEquals("NULL", minFunction); + + olapAnalysisTask.tableSample = null; + String maxFunction = olapAnalysisTask.getMaxFunction(); + Assertions.assertEquals("to_base64(CAST(MAX(`${colName}`) as ${type})) ", maxFunction); + olapAnalysisTask.tableSample = new TableSample(true, 20L); + maxFunction = olapAnalysisTask.getMaxFunction(); + Assertions.assertEquals("NULL", maxFunction); + + String ndvFunction = olapAnalysisTask.getNdvFunction(String.valueOf(100)); + Assertions.assertEquals("SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) " + + "+ SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 100)", ndvFunction); + System.out.println(ndvFunction); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java new file mode 100644 index 00000000000..24a74053bb6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.TableSample; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.collect.Lists; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +public class HMSAnalysisTaskTest { + + @Test + public void testNeedLimit(@Mocked HMSExternalTable tableIf) + throws Exception { + + new MockUp<HMSExternalTable>() { + @Mock + public List<Column> getFullSchema() { + ArrayList<Column> objects = Lists.newArrayList(); + objects.add(new Column("int_column", PrimitiveType.INT)); + return objects; + } + }; + HMSAnalysisTask task = new HMSAnalysisTask(); + task.setTable(tableIf); + task.tableSample = new TableSample(true, 10L); + Assertions.assertFalse(task.needLimit(100, 5.0)); + + task.tableSample = new TableSample(false, 100L); + Assertions.assertFalse(task.needLimit(100, 5.0)); + Assertions.assertTrue(task.needLimit(2L * 1024 * 1024 * 1024, 5.0)); + task.tableSample = new TableSample(false, 512L * 1024 * 1024); + Assertions.assertFalse(task.needLimit(2L * 1024 * 1024 * 1024, 5.0)); + } + + @Test + public void testAutoSampleHugeTable(@Mocked HMSExternalTable tableIf) + throws Exception { + new MockUp<HMSExternalTable>() { + @Mock + public long getDataSize(boolean singleReplica) { + return 6L * 1024 * 1024 * 1024; + } + }; + HMSAnalysisTask task = new HMSAnalysisTask(); + task.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.SYSTEM); + analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL); + task.info = analysisInfoBuilder.build(); + TableSample tableSample = task.getTableSample(); + Assertions.assertFalse(tableSample.isPercent()); + Assertions.assertEquals(StatisticsUtil.getHugeTableSampleRows(), tableSample.getSampleValue()); + } + + @Test + public void testAutoSampleSmallTable(@Mocked HMSExternalTable tableIf) + throws Exception { + new MockUp<HMSExternalTable>() { + @Mock + public long getDataSize(boolean singleReplica) { + return 1000; + } + }; + HMSAnalysisTask task = new HMSAnalysisTask(); + task.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.SYSTEM); + analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL); + task.info = analysisInfoBuilder.build(); + TableSample tableSample = task.getTableSample(); + Assertions.assertNull(tableSample); + } + + @Test + public void testManualFull(@Mocked HMSExternalTable tableIf) + throws Exception { + new MockUp<HMSExternalTable>() { + @Mock + public long getDataSize(boolean singleReplica) { + return 1000; + } + }; + HMSAnalysisTask task = new HMSAnalysisTask(); + task.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL); + analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.FULL); + task.info = analysisInfoBuilder.build(); + TableSample tableSample = task.getTableSample(); + Assertions.assertNull(tableSample); + } + + @Test + public void testManualSample(@Mocked HMSExternalTable tableIf) + throws Exception { + new MockUp<HMSExternalTable>() { + @Mock + public long getDataSize(boolean singleReplica) { + return 1000; + } + }; + HMSAnalysisTask task = new HMSAnalysisTask(); + task.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL); + analysisInfoBuilder.setAnalysisMethod(AnalysisInfo.AnalysisMethod.SAMPLE); + analysisInfoBuilder.setSampleRows(1000); + task.info = analysisInfoBuilder.build(); + TableSample tableSample = task.getTableSample(); + Assertions.assertNotNull(tableSample); + Assertions.assertEquals(1000, tableSample.getSampleValue()); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java index f2b9f84f0d0..0431e373d48 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java @@ -18,20 +18,35 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.TableSample; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionType; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.Pair; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public class OlapAnalysisTaskTest { // test manual @@ -98,7 +113,196 @@ public class OlapAnalysisTaskTest { olapAnalysisTask.tbl = tbl; TableSample tableSample = olapAnalysisTask.getTableSample(); Assertions.assertNull(tableSample); + } + + @Test + public void testManualSampleNonDistributeKey(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf) + throws Exception { + + new Expectations() { + { + tableIf.getRowCount(); + result = 500; + tableIf.getId(); + result = 30001; + catalogIf.getId(); + result = 10001; + catalogIf.getName(); + result = "catalogName"; + databaseIf.getId(); + result = 20001; + } + }; + + new MockUp<OlapAnalysisTask>() { + @Mock + public Pair<List<Long>, Long> calcActualSampleTablets() { + return Pair.of(Lists.newArrayList(), 100L); + } + + @Mock + public ResultRow collectBasicStat(AutoCloseConnectContext context) { + List<String> values = Lists.newArrayList(); + values.add("1"); + values.add("2"); + return new ResultRow(values); + } + + @Mock + public void runQuery(String sql, boolean needEncode) { + Assertions.assertFalse(needEncode); + Assertions.assertEquals("SELECT CONCAT('30001', '-', '-1', '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, 500 AS `row_count`, SUM(t1.count) * COUNT(1) / (SUM(t1.count) - SUM(IF(t1.count = 1, 1, 0)) + SUM(IF(t1.count = 1, 1, 0)) * SUM(t1.count) / 500) as `ndv`, IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.count, 0)), 0) * 5.0 as `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LEN [...] + return; + } + }; + + new MockUp<StatisticsUtil>() { + @Mock + public AutoCloseConnectContext buildConnectContext(boolean scanLimit) { + return null; + } + }; + + new MockUp<OlapTable>() { + @Mock + public Set<String> getDistributionColumnNames() { + return Sets.newHashSet(); + } + }; + + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.col = new Column("test", PrimitiveType.STRING); + olapAnalysisTask.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL); + olapAnalysisTask.info = analysisInfoBuilder.build(); + olapAnalysisTask.catalog = catalogIf; + olapAnalysisTask.db = databaseIf; + olapAnalysisTask.tableSample = new TableSample(false, 100L); + olapAnalysisTask.doSample(); + } + + @Test + public void testManualSampleDistributeKey(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf) + throws Exception { + new Expectations() { + { + tableIf.getRowCount(); + result = 500; + tableIf.getId(); + result = 30001; + catalogIf.getId(); + result = 10001; + catalogIf.getName(); + result = "catalogName"; + databaseIf.getId(); + result = 20001; + } + }; + + new MockUp<OlapAnalysisTask>() { + @Mock + public Pair<List<Long>, Long> calcActualSampleTablets() { + return Pair.of(Lists.newArrayList(), 100L); + } + + @Mock + public ResultRow collectBasicStat(AutoCloseConnectContext context) { + List<String> values = Lists.newArrayList(); + values.add("1"); + values.add("2"); + return new ResultRow(values); + } + + @Mock + public void runQuery(String sql, boolean needEncode) { + Assertions.assertFalse(needEncode); + Assertions.assertEquals(" SELECT CONCAT(30001, '-', -1, '-', 'null') AS `id`, 10001 AS `catalog_id`, 20001 AS `db_id`, 30001 AS `tbl_id`, -1 AS `idx_id`, 'null' AS `col_id`, NULL AS `part_id`, ROUND(COUNT(1) * 5.0) AS `row_count`, ROUND(NDV(`${colName}`) * 5.0) as `ndv`, ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * 5.0) AS `null_count`, 'MQ==' AS `min`, 'Mg==' AS `max`, SUM(LENGTH(`${colName}`)) * 5.0 AS `data_size`, NOW() FROM `catalogName`.`${dbName}`. [...] + return; + } + }; + + new MockUp<StatisticsUtil>() { + @Mock + public AutoCloseConnectContext buildConnectContext(boolean scanLimit) { + return null; + } + }; + + new MockUp<OlapTable>() { + @Mock + public Set<String> getDistributionColumnNames() { + HashSet<String> cols = Sets.newHashSet(); + cols.add("test"); + return cols; + } + + @Mock + public boolean isDistributionColumn(String columnName) { + return true; + } + }; + + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.col = new Column("test", PrimitiveType.STRING); + olapAnalysisTask.tbl = tableIf; + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobType(AnalysisInfo.JobType.MANUAL); + olapAnalysisTask.info = analysisInfoBuilder.build(); + olapAnalysisTask.catalog = catalogIf; + olapAnalysisTask.db = databaseIf; + olapAnalysisTask.tableSample = new TableSample(false, 100L); + olapAnalysisTask.doSample(); + } + + @Test + public void testNeedLimitFalse(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf) + throws Exception { + + new MockUp<OlapTable>() { + @Mock + public PartitionInfo getPartitionInfo() { + ArrayList<Column> columns = Lists.newArrayList(); + columns.add(new Column("test", PrimitiveType.STRING)); + return new PartitionInfo(PartitionType.RANGE, columns); + } + + @Mock + public boolean isPartitionColumn(String columnName) { + return true; + } + }; + + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING), + true, null, null, null); + olapAnalysisTask.tbl = tableIf; + Assertions.assertFalse(olapAnalysisTask.needLimit()); + + olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING), + false, null, null, null); + Assertions.assertFalse(olapAnalysisTask.needLimit()); + } + + @Test + public void testNeedLimitTrue(@Mocked CatalogIf catalogIf, @Mocked DatabaseIf databaseIf, @Mocked OlapTable tableIf) + throws Exception { + + new MockUp<OlapTable>() { + @Mock + public PartitionInfo getPartitionInfo() { + ArrayList<Column> columns = Lists.newArrayList(); + columns.add(new Column("NOFOUND", PrimitiveType.STRING)); + return new PartitionInfo(PartitionType.RANGE, columns); + } + }; + + OlapAnalysisTask olapAnalysisTask = new OlapAnalysisTask(); + olapAnalysisTask.tbl = tableIf; + olapAnalysisTask.col = new Column("test", Type.fromPrimitiveType(PrimitiveType.STRING), + false, null, null, null); + Assertions.assertTrue(olapAnalysisTask.needLimit()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org