This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9bcf79178e [Improvement](statistics, multi catalog)Support iceberg table stats collection (#21481) 9bcf79178e is described below commit 9bcf79178ed4021bfb9c784f340c60b2c7428a8b Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Fri Jul 7 09:18:37 2023 +0800 [Improvement](statistics, multi catalog)Support iceberg table stats collection (#21481) Fetch iceberg table stats automatically while querying a table. Collect accurate statistics for Iceberg table by running analyze sql in Doris (remove collect by meta option). --- .../main/java/org/apache/doris/common/Config.java | 7 - .../doris/catalog/external/HMSExternalTable.java | 25 +- .../catalog/external/IcebergExternalTable.java | 10 + .../apache/doris/external/hive/util/HiveUtil.java | 2 +- .../apache/doris/statistics/HMSAnalysisTask.java | 239 ++++++++++++- .../apache/doris/statistics/HiveAnalysisTask.java | 370 --------------------- .../doris/statistics/IcebergAnalysisTask.java | 121 ------- .../doris/statistics/util/StatisticsUtil.java | 49 +++ 8 files changed, 298 insertions(+), 525 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index e4a61788ec..c683db9bc5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1742,13 +1742,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = false) public static boolean use_fuzzy_session_variable = false; - /** - * Collect external table statistic info by running sql when set to true. - * Otherwise, use external catalog metadata. - */ - @ConfField(mutable = true) - public static boolean collect_external_table_stats_by_sql = true; - /** * Max num of same name meta informatntion in catalog recycle bin. * Default is 3. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 1aee1803ae..b985e4cddd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -29,8 +29,7 @@ import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.ColumnStatisticBuilder; -import org.apache.doris.statistics.HiveAnalysisTask; -import org.apache.doris.statistics.IcebergAnalysisTask; +import org.apache.doris.statistics.HMSAnalysisTask; import org.apache.doris.statistics.TableStatistic; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.THiveTable; @@ -322,14 +321,7 @@ public class HMSExternalTable extends ExternalTable { @Override public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { makeSureInitialized(); - switch (dlaType) { - case HIVE: - return new HiveAnalysisTask(info); - case ICEBERG: - return new IcebergAnalysisTask(info); - default: - throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported."); - } + return new HMSAnalysisTask(info); } public String getViewText() { @@ -473,6 +465,19 @@ public class HMSExternalTable extends ExternalTable { @Override public Optional<ColumnStatistic> getColumnStatistic(String colName) { + makeSureInitialized(); + switch (dlaType) { + case HIVE: + return getHiveColumnStats(colName); + case ICEBERG: + return StatisticsUtil.getIcebergColumnStats(colName, HiveMetaStoreClientHelper.getIcebergTable(this)); + default: + LOG.warn("get column stats for dlaType {} is not supported.", dlaType); + } + return Optional.empty(); + } + + private Optional<ColumnStatistic> getHiveColumnStats(String colName) { List<ColumnStatisticsObj> tableStats = getHiveTableColumnStats(Lists.newArrayList(colName)); if (tableStats == null || tableStats.isEmpty()) { LOG.debug(String.format("No table stats found in Hive metastore for column %s in table %s.", diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java index bf29d93eed..c2be8b90a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TIcebergTable; import org.apache.doris.thrift.TTableDescriptor; @@ -33,6 +35,7 @@ import org.apache.iceberg.types.Types; import java.util.HashMap; import java.util.List; +import java.util.Optional; public class IcebergExternalTable extends ExternalTable { @@ -134,4 +137,11 @@ public class IcebergExternalTable extends ExternalTable { return tTableDescriptor; } } + + @Override + public Optional<ColumnStatistic> getColumnStatistic(String colName) { + makeSureInitialized(); + return StatisticsUtil.getIcebergColumnStats(colName, + ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java index d42d8c3d7d..704d0fadf8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java @@ -197,7 +197,7 @@ public final class HiveUtil { method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class); break; } catch (NoSuchMethodException ignored) { - LOG.warn("Class {} doesn't contain isSplitable method.", clazz); + LOG.debug("Class {} doesn't contain isSplitable method.", clazz); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index b5bee03f0b..a7b45c13cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -17,41 +17,248 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.external.HMSExternalTable; -import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.InternalQueryResult; +import org.apache.doris.statistics.util.StatisticsUtil; -import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; public class HMSAnalysisTask extends BaseAnalysisTask { + private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class); + + public static final String TOTAL_SIZE = "totalSize"; + public static final String NUM_ROWS = "numRows"; + public static final String NUM_FILES = "numFiles"; + public static final String TIMESTAMP = "transient_lastDdlTime"; + + private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "${idxId} AS idx_id, " + + "'${colId}' AS col_id, " + + "${partId} AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; - protected HMSExternalTable table; + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + private final boolean isTableLevelTask; + private final boolean isSamplingPartition; + private final boolean isPartitionOnly; + private final Set<String> partitionNames; + private HMSExternalTable table; public HMSAnalysisTask(AnalysisInfo info) { super(info); + isTableLevelTask = info.externalTableLevelTask; + isSamplingPartition = info.samplingPartition; + isPartitionOnly = info.partitionOnly; + partitionNames = info.partitionNames; table = (HMSExternalTable) tbl; } + public void execute() throws Exception { + if (isTableLevelTask) { + getTableStats(); + } else { + getTableColumnStats(); + } + } + /** - * Collect the column level stats for external table through metadata. + * Get table row count and insert the result to __internal_schema.table_statistics */ - protected void getStatsByMeta() throws Exception { - throw new NotImplementedException("Code is not implemented"); + private void getTableStats() throws Exception { + // Get table level information. An example sql for table stats: + // INSERT INTO __internal_schema.table_statistics VALUES + // ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, NOW()) + Map<String, String> parameters = table.getRemoteTable().getParameters(); + if (isPartitionOnly) { + for (String partId : partitionNames) { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_TABLE_COUNT_TEMPLATE); + sb.append(" where "); + String[] splits = partId.split("/"); + for (int i = 0; i < splits.length; i++) { + String value = splits[i].split("=")[1]; + splits[i] = splits[i].replace(value, "\'" + value + "\'"); + } + sb.append(StringUtils.join(splits, " and ")); + Map<String, String> params = buildTableStatsParams(partId); + setParameterData(parameters, params); + List<InternalQueryResult.ResultRow> columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(sb.toString())); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } + } else { + Map<String, String> params = buildTableStatsParams("NULL"); + List<InternalQueryResult.ResultRow> columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } } /** - * Collect the stats for external table through sql. - * @return ColumnStatistics + * Get column statistics and insert the result to __internal_schema.column_statistics */ - protected void getStatsBySql() throws Exception { - throw new NotImplementedException("getColumnStatsBySql is not implemented"); + private void getTableColumnStats() throws Exception { + // An example sql for a column stats: + // INSERT INTO __internal_schema.column_statistics + // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, + // 13002 AS catalog_id, + // 13038 AS db_id, + // 13055 AS tbl_id, + // -1 AS idx_id, + // 'r_regionkey' AS col_id, + // 'NULL' AS part_id, + // COUNT(1) AS row_count, + // NDV(`r_regionkey`) AS ndv, + // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, + // MIN(`r_regionkey`) AS min, + // MAX(`r_regionkey`) AS max, + // 0 AS data_size, + // NOW() FROM `hive`.`tpch100`.`region` + if (isPartitionOnly) { + for (String partId : partitionNames) { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_SQL_TABLE_TEMPLATE); + sb.append(" where "); + String[] splits = partId.split("/"); + for (int i = 0; i < splits.length; i++) { + String value = splits[i].split("=")[1]; + splits[i] = splits[i].replace(value, "\'" + value + "\'"); + } + sb.append(StringUtils.join(splits, " and ")); + Map<String, String> params = buildTableStatsParams(partId); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_SQL_TABLE_TEMPLATE); + if (isSamplingPartition) { + sb.append(" where 1=1 "); + String[] splitExample = partitionNames.stream().findFirst().get().split("/"); + int parts = splitExample.length; + List<String> partNames = new ArrayList<>(); + for (String split : splitExample) { + partNames.add(split.split("=")[0]); + } + List<List<String>> valueLists = new ArrayList<>(); + for (int i = 0; i < parts; i++) { + valueLists.add(new ArrayList<>()); + } + for (String partId : partitionNames) { + String[] partIds = partId.split("/"); + for (int i = 0; i < partIds.length; i++) { + valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'"); + } + } + for (int i = 0; i < parts; i++) { + sb.append(" and "); + sb.append(partNames.get(i)); + sb.append(" in ("); + sb.append(StringUtils.join(valueLists.get(i), ",")); + sb.append(") "); + } + } + Map<String, String> params = buildTableStatsParams("NULL"); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); + } } - @Override - public void execute() throws Exception { - if (Config.collect_external_table_stats_by_sql) { - getStatsBySql(); - } else { - getStatsByMeta(); + private Map<String, String> buildTableStatsParams(String partId) { + Map<String, String> commonParams = new HashMap<>(); + String id = StatisticsUtil.constructId(tbl.getId(), -1); + if (!partId.equals("NULL")) { + id = StatisticsUtil.constructId(id, partId); + } + commonParams.put("id", id); + commonParams.put("catalogId", String.valueOf(catalog.getId())); + commonParams.put("dbId", String.valueOf(db.getId())); + commonParams.put("tblId", String.valueOf(tbl.getId())); + commonParams.put("indexId", "-1"); + commonParams.put("idxId", "-1"); + commonParams.put("partId", "\'" + partId + "\'"); + commonParams.put("catalogName", catalog.getName()); + commonParams.put("dbName", db.getFullName()); + commonParams.put("tblName", tbl.getName()); + if (col != null) { + commonParams.put("type", col.getType().toString()); + } + commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + return commonParams; + } + + private void setParameterData(Map<String, String> parameters, Map<String, String> params) { + String numRows = ""; + String timestamp = ""; + if (parameters.containsKey(NUM_ROWS)) { + numRows = parameters.get(NUM_ROWS); + } + if (parameters.containsKey(TIMESTAMP)) { + timestamp = parameters.get(TIMESTAMP); } + params.put("numRows", numRows); + params.put("rowCount", numRows); + params.put("update_time", TimeUtils.DATETIME_FORMAT.format( + LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000), + ZoneId.systemDefault()))); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java deleted file mode 100644 index 8ae74206de..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ /dev/null @@ -1,370 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.util.InternalQueryResult; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.text.StringSubstitutor; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; -import org.apache.hadoop.hive.metastore.api.Decimal; -import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; -import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; -import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; -import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class HiveAnalysisTask extends HMSAnalysisTask { - private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class); - - public static final String TOTAL_SIZE = "totalSize"; - public static final String NUM_ROWS = "numRows"; - public static final String NUM_FILES = "numFiles"; - public static final String TIMESTAMP = "transient_lastDdlTime"; - - private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " - + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "${idxId} AS idx_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(`${colName}`) AS ndv, " - + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(`${colName}`) AS min, " - + "MAX(`${colName}`) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW() " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; - - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; - - private final boolean isTableLevelTask; - private final boolean isSamplingPartition; - private final boolean isPartitionOnly; - private final Set<String> partitionNames; - - public HiveAnalysisTask(AnalysisInfo info) { - super(info); - isTableLevelTask = info.externalTableLevelTask; - isSamplingPartition = info.samplingPartition; - isPartitionOnly = info.partitionOnly; - partitionNames = info.partitionNames; - } - - private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " - + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - - private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', " - + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - - private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, " - + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')"; - - /** - * Collect the stats for external table through sql. - */ - @Override - protected void getStatsBySql() throws Exception { - if (isTableLevelTask) { - getTableStatsBySql(); - } else { - getTableColumnStatsBySql(); - } - } - - /** - * Get table row count and insert the result to __internal_schema.table_statistics - */ - private void getTableStatsBySql() throws Exception { - // Get table level information. An example sql for table stats: - // INSERT INTO __internal_schema.table_statistics VALUES - // ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, NOW()) - Map<String, String> parameters = table.getRemoteTable().getParameters(); - if (isPartitionOnly) { - for (String partId : partitionNames) { - StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_TABLE_COUNT_TEMPLATE); - sb.append(" where "); - String[] splits = partId.split("/"); - for (int i = 0; i < splits.length; i++) { - String value = splits[i].split("=")[1]; - splits[i] = splits[i].replace(value, "\'" + value + "\'"); - } - sb.append(StringUtils.join(splits, " and ")); - Map<String, String> params = buildTableStatsParams(partId); - setParameterData(parameters, params); - List<InternalQueryResult.ResultRow> columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(sb.toString())); - String rowCount = columnResult.get(0).getColumnValue("rowCount"); - params.put("rowCount", rowCount); - StatisticsRepository.persistTableStats(params); - } - } else { - Map<String, String> params = buildTableStatsParams("NULL"); - List<InternalQueryResult.ResultRow> columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); - String rowCount = columnResult.get(0).getColumnValue("rowCount"); - params.put("rowCount", rowCount); - StatisticsRepository.persistTableStats(params); - } - } - - /** - * Get column statistics and insert the result to __internal_schema.column_statistics - */ - private void getTableColumnStatsBySql() throws Exception { - // An example sql for a column stats: - // INSERT INTO __internal_schema.column_statistics - // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, - // 13002 AS catalog_id, - // 13038 AS db_id, - // 13055 AS tbl_id, - // -1 AS idx_id, - // 'r_regionkey' AS col_id, - // 'NULL' AS part_id, - // COUNT(1) AS row_count, - // NDV(`r_regionkey`) AS ndv, - // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, - // MIN(`r_regionkey`) AS min, - // MAX(`r_regionkey`) AS max, - // 0 AS data_size, - // NOW() FROM `hive`.`tpch100`.`region` - if (isPartitionOnly) { - for (String partId : partitionNames) { - StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_SQL_TABLE_TEMPLATE); - sb.append(" where "); - String[] splits = partId.split("/"); - for (int i = 0; i < splits.length; i++) { - String value = splits[i].split("=")[1]; - splits[i] = splits[i].replace(value, "\'" + value + "\'"); - } - sb.append(StringUtils.join(splits, " and ")); - Map<String, String> params = buildTableStatsParams(partId); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("colName", col.getName()); - params.put("colId", info.colName); - params.put("dataSizeFunction", getDataSizeFunction(col)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(sb.toString()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - } - } - } else { - StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_SQL_TABLE_TEMPLATE); - if (isSamplingPartition) { - sb.append(" where 1=1 "); - String[] splitExample = partitionNames.stream().findFirst().get().split("/"); - int parts = splitExample.length; - List<String> partNames = new ArrayList<>(); - for (String split : splitExample) { - partNames.add(split.split("=")[0]); - } - List<List<String>> valueLists = new ArrayList<>(); - for (int i = 0; i < parts; i++) { - valueLists.add(new ArrayList<>()); - } - for (String partId : partitionNames) { - String[] partIds = partId.split("/"); - for (int i = 0; i < partIds.length; i++) { - valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'"); - } - } - for (int i = 0; i < parts; i++) { - sb.append(" and "); - sb.append(partNames.get(i)); - sb.append(" in ("); - sb.append(StringUtils.join(valueLists.get(i), ",")); - sb.append(") "); - } - } - Map<String, String> params = buildTableStatsParams("NULL"); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("colName", col.getName()); - params.put("colId", info.colName); - params.put("dataSizeFunction", getDataSizeFunction(col)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(sb.toString()); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - } - Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); - } - } - - private Map<String, String> buildTableStatsParams(String partId) { - Map<String, String> commonParams = new HashMap<>(); - String id = StatisticsUtil.constructId(tbl.getId(), -1); - if (!partId.equals("NULL")) { - id = StatisticsUtil.constructId(id, partId); - } - commonParams.put("id", id); - commonParams.put("catalogId", String.valueOf(catalog.getId())); - commonParams.put("dbId", String.valueOf(db.getId())); - commonParams.put("tblId", String.valueOf(tbl.getId())); - commonParams.put("indexId", "-1"); - commonParams.put("idxId", "-1"); - commonParams.put("partId", "\'" + partId + "\'"); - commonParams.put("catalogName", catalog.getName()); - commonParams.put("dbName", db.getFullName()); - commonParams.put("tblName", tbl.getName()); - if (col != null) { - commonParams.put("type", col.getType().toString()); - } - commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); - return commonParams; - } - - @Override - protected void getStatsByMeta() throws Exception { - // To be removed. - } - - private void getStatData(ColumnStatisticsData data, Map<String, String> params, long rowCount) { - long ndv = 0; - long nulls = 0; - String min = ""; - String max = ""; - long colSize = 0; - if (!data.isSetStringStats()) { - colSize = rowCount * col.getType().getSlotSize(); - } - // Collect ndv, nulls, min and max for different data type. - if (data.isSetLongStats()) { - LongColumnStatsData longStats = data.getLongStats(); - ndv = longStats.getNumDVs(); - nulls = longStats.getNumNulls(); - min = String.valueOf(longStats.getLowValue()); - max = String.valueOf(longStats.getHighValue()); - } else if (data.isSetStringStats()) { - StringColumnStatsData stringStats = data.getStringStats(); - ndv = stringStats.getNumDVs(); - nulls = stringStats.getNumNulls(); - double avgColLen = stringStats.getAvgColLen(); - colSize = Math.round(avgColLen * rowCount); - } else if (data.isSetDecimalStats()) { - DecimalColumnStatsData decimalStats = data.getDecimalStats(); - ndv = decimalStats.getNumDVs(); - nulls = decimalStats.getNumNulls(); - if (decimalStats.isSetLowValue()) { - Decimal lowValue = decimalStats.getLowValue(); - if (lowValue != null) { - BigDecimal lowDecimal = new BigDecimal(new BigInteger(lowValue.getUnscaled()), lowValue.getScale()); - min = lowDecimal.toString(); - } - } - if (decimalStats.isSetHighValue()) { - Decimal highValue = decimalStats.getHighValue(); - if (highValue != null) { - BigDecimal highDecimal = new BigDecimal( - new BigInteger(highValue.getUnscaled()), highValue.getScale()); - max = highDecimal.toString(); - } - } - } else if (data.isSetDoubleStats()) { - DoubleColumnStatsData doubleStats = data.getDoubleStats(); - ndv = doubleStats.getNumDVs(); - nulls = doubleStats.getNumNulls(); - min = String.valueOf(doubleStats.getLowValue()); - max = String.valueOf(doubleStats.getHighValue()); - } else if (data.isSetDateStats()) { - DateColumnStatsData dateStats = data.getDateStats(); - ndv = dateStats.getNumDVs(); - nulls = dateStats.getNumNulls(); - if (dateStats.isSetLowValue()) { - org.apache.hadoop.hive.metastore.api.Date lowValue = dateStats.getLowValue(); - if (lowValue != null) { - LocalDate lowDate = LocalDate.ofEpochDay(lowValue.getDaysSinceEpoch()); - min = lowDate.toString(); - } - } - if (dateStats.isSetHighValue()) { - org.apache.hadoop.hive.metastore.api.Date highValue = dateStats.getHighValue(); - if (highValue != null) { - LocalDate highDate = LocalDate.ofEpochDay(highValue.getDaysSinceEpoch()); - max = highDate.toString(); - } - } - } else { - throw new RuntimeException("Not supported data type."); - } - params.put("ndv", String.valueOf(ndv)); - params.put("nulls", String.valueOf(nulls)); - params.put("min", min); - params.put("max", max); - params.put("dataSize", String.valueOf(colSize)); - } - - private void setParameterData(Map<String, String> parameters, Map<String, String> params) { - String numRows = ""; - String timestamp = ""; - if (parameters.containsKey(NUM_ROWS)) { - numRows = parameters.get(NUM_ROWS); - } - if (parameters.containsKey(TIMESTAMP)) { - timestamp = parameters.get(TIMESTAMP); - } - params.put("numRows", numRows); - params.put("rowCount", numRows); - params.put("update_time", TimeUtils.DATETIME_FORMAT.format( - LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000), - ZoneId.systemDefault()))); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java deleted file mode 100644 index 105ef758f0..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java +++ /dev/null @@ -1,121 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.property.constants.HMSProperties; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.commons.text.StringSubstitutor; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.types.Types; - -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -public class IcebergAnalysisTask extends HMSAnalysisTask { - - private long numRows = 0; - private long dataSize = 0; - private long numNulls = 0; - - public IcebergAnalysisTask(AnalysisInfo info) { - super(info); - } - - private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " - + "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, '${update_time}')"; - - - @Override - protected void getStatsByMeta() throws Exception { - Table icebergTable = getIcebergTable(); - TableScan tableScan = icebergTable.newScan().includeColumnStats(); - for (FileScanTask task : tableScan.planFiles()) { - processDataFile(task.file(), task.spec()); - } - updateStats(); - } - - private Table getIcebergTable() { - org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); - Configuration conf = new HdfsConfiguration(); - for (Map.Entry<String, String> entry : table.getHadoopProperties().entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - hiveCatalog.setConf(conf); - Map<String, String> catalogProperties = new HashMap<>(); - catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, table.getMetastoreUri()); - catalogProperties.put("uri", table.getMetastoreUri()); - hiveCatalog.initialize("hive", catalogProperties); - return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName())); - } - - private void processDataFile(DataFile dataFile, PartitionSpec partitionSpec) { - int colId = -1; - for (Types.NestedField column : partitionSpec.schema().columns()) { - if (column.name().equals(col.getName())) { - colId = column.fieldId(); - break; - } - } - if (colId == -1) { - throw new RuntimeException(String.format("Column %s not exist.", col.getName())); - } - dataSize += dataFile.columnSizes().get(colId); - numRows += dataFile.recordCount(); - numNulls += dataFile.nullValueCounts().get(colId); - } - - private void updateStats() throws Exception { - Map<String, String> params = new HashMap<>(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("id", tbl.getId() + "-" + col.getName()); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("colId", String.valueOf(col.getName())); - params.put("numRows", String.valueOf(numRows)); - params.put("nulls", String.valueOf(numNulls)); - params.put("dataSize", String.valueOf(dataSize)); - params.put("update_time", TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now())); - - // Update table level stats info of this column. - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0bdf736aab..6e773fc430 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -60,6 +60,7 @@ import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.ColumnStatisticBuilder; import org.apache.doris.statistics.Histogram; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; @@ -71,9 +72,12 @@ import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; +import org.apache.iceberg.types.Types; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -594,4 +598,49 @@ public class StatisticsUtil { } return totalSize / estimatedRowSize; } + + /** + * Get Iceberg column statistics. + * @param colName + * @param table Iceberg table. + * @return Optional Column statistic for the given column. + */ + public static Optional<ColumnStatistic> getIcebergColumnStats(String colName, org.apache.iceberg.Table table) { + TableScan tableScan = table.newScan().includeColumnStats(); + ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(); + columnStatisticBuilder.setCount(0); + columnStatisticBuilder.setMaxValue(Double.MAX_VALUE); + columnStatisticBuilder.setMinValue(Double.MIN_VALUE); + columnStatisticBuilder.setDataSize(0); + columnStatisticBuilder.setAvgSizeByte(0); + columnStatisticBuilder.setNumNulls(0); + for (FileScanTask task : tableScan.planFiles()) { + processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder); + } + if (columnStatisticBuilder.getCount() > 0) { + columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize() + / columnStatisticBuilder.getCount()); + } + return Optional.of(columnStatisticBuilder.build()); + } + + private static void processDataFile(DataFile dataFile, PartitionSpec partitionSpec, + String colName, ColumnStatisticBuilder columnStatisticBuilder) { + int colId = -1; + for (Types.NestedField column : partitionSpec.schema().columns()) { + if (column.name().equals(colName)) { + colId = column.fieldId(); + break; + } + } + if (colId == -1) { + throw new RuntimeException(String.format("Column %s not exist.", colName)); + } + // Update the data size, count and num of nulls in columnStatisticBuilder. + // TODO: Get min max value. + columnStatisticBuilder.setDataSize(columnStatisticBuilder.getDataSize() + dataFile.columnSizes().get(colId)); + columnStatisticBuilder.setCount(columnStatisticBuilder.getCount() + dataFile.recordCount()); + columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls() + + dataFile.nullValueCounts().get(colId)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org