This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2d1f597413 [Fix](statistics)Fix hive table statistic bug (#19365) 2d1f597413 is described below commit 2d1f597413ecb70597cc54436dab5c8a2eaf55b8 Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Thu May 11 07:48:58 2023 +0800 [Fix](statistics)Fix hive table statistic bug (#19365) Fix hive table statistic bug. Collect table/partition level statistics. --- .../doris/catalog/external/HMSExternalTable.java | 15 ++++ .../apache/doris/statistics/AnalysisManager.java | 26 +++++++ .../apache/doris/statistics/AnalysisTaskInfo.java | 7 +- .../doris/statistics/AnalysisTaskInfoBuilder.java | 12 +++- .../apache/doris/statistics/BaseAnalysisTask.java | 4 ++ .../apache/doris/statistics/ColumnStatistic.java | 7 +- .../apache/doris/statistics/HMSAnalysisTask.java | 8 +-- .../apache/doris/statistics/HiveAnalysisTask.java | 84 +++++++++++++++++----- .../doris/statistics/IcebergAnalysisTask.java | 2 +- 9 files changed, 134 insertions(+), 31 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 8aecb96bb3..b94804ac61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,12 +18,15 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.Type; +import org.apache.doris.common.Config; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient; import org.apache.doris.statistics.AnalysisTaskInfo; import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.HiveAnalysisTask; import org.apache.doris.statistics.IcebergAnalysisTask; import org.apache.doris.thrift.THiveTable; @@ -322,6 +325,18 @@ public class HMSExternalTable extends ExternalTable { return columns; } + @Override + public long estimatedRowCount() { + ColumnStatistic cache = Config.enable_stats + ? Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(id, "") + : ColumnStatistic.UNKNOWN; + if (cache == ColumnStatistic.UNKNOWN) { + return 1; + } else { + return (long) cache.count; + } + } + private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) { Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this); Schema schema = icebergTable.schema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 688924949d..d12baf0bb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -124,6 +124,7 @@ public class AnalysisManager { Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync); createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync); + createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync); ConnectContext ctx = ConnectContext.get(); if (!isSync || ctx.getSessionVariable().enableSaveStatisticsSyncJob) { @@ -425,6 +426,31 @@ public class AnalysisManager { } } + private void createTaskForExternalTable(AnalysisTaskInfo jobInfo, + Map<Long, BaseAnalysisTask> analysisTasks, + boolean isSync) throws DdlException { + TableIf table; + try { + table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); + } catch (Throwable e) { + LOG.warn(e.getMessage()); + return; + } + if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType() != TableType.HMS_EXTERNAL_TABLE) { + return; + } + AnalysisTaskInfoBuilder colTaskInfoBuilder = new AnalysisTaskInfoBuilder(jobInfo); + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setIndexId(-1L) + .setTaskId(taskId).setExternalTableLevelTask(true).build(); + analysisTasks.put(taskId, createTask(analysisTaskInfo)); + try { + StatisticsRepository.persistAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new DdlException("Failed to create analysis task", e); + } + } + public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) { if (analysisJobIdToTaskMap.get(info.jobId) == null) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index 8690682ea2..2860a472c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -104,11 +104,15 @@ public class AnalysisTaskInfo { public String message; + // True means this task is a table level task for external table. + // This kind of task is mainly to collect the number of rows of a table. + public boolean externalTableLevelTask; + public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, Map<String, Set<String>> colToPartitions, String colName, Long indexId, JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message, - long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) { + long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask) { this.jobId = jobId; this.taskId = taskId; this.catalogName = catalogName; @@ -129,6 +133,7 @@ public class AnalysisTaskInfo { this.lastExecTimeInMs = lastExecTimeInMs; this.state = state; this.scheduleType = scheduleType; + this.externalTableLevelTask = isExternalTableLevelTask; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java index 5a6e6b41ad..acaae0baab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java @@ -47,6 +47,7 @@ public class AnalysisTaskInfoBuilder { private AnalysisState state; private ScheduleType scheduleType; private String message; + private boolean externalTableLevelTask; public AnalysisTaskInfoBuilder() { } @@ -174,10 +175,16 @@ public class AnalysisTaskInfoBuilder { return this; } + public AnalysisTaskInfoBuilder setExternalTableLevelTask(boolean isTableLevel) { + this.externalTableLevelTask = isTableLevel; + return this; + } + public AnalysisTaskInfo build() { return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, - sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType); + sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType, + externalTableLevelTask); } public AnalysisTaskInfoBuilder copy() { @@ -201,6 +208,7 @@ public class AnalysisTaskInfoBuilder { .setMessage(message) .setLastExecTimeInMs(lastExecTimeInMs) .setState(state) - .setScheduleType(scheduleType); + .setScheduleType(scheduleType) + .setExternalTableLevelTask(false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 0f62a39396..a528229012 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -142,6 +142,10 @@ public abstract class BaseAnalysisTask { info, AnalysisState.FAILED, String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis()); } + // External Table level task doesn't contain a column. Don't need to do the column related analyze. + if (info.externalTableLevelTask) { + return; + } if (info.analysisType != null && (info.analysisType.equals(AnalysisType.COLUMN) || info.analysisType.equals(AnalysisType.HISTOGRAM))) { col = tbl.getColumn(info.colName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index ef5da59d81..3dfc66adbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -138,10 +138,9 @@ public class ColumnStatistic { String colName = resultRow.getColumnValue("col_id"); Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); if (col == null) { - LOG.warn("Failed to deserialize column statistics, ctlId: {} dbId: {}" - + "tblId: {} column: {} not exists", - catalogId, dbID, tblId, colName); - return ColumnStatistic.UNKNOWN; + // Col is null indicates this information is external table level info, + // which doesn't have a column. + return columnStatisticBuilder.build(); } String min = resultRow.getColumnValue("min"); String max = resultRow.getColumnValue("max"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 5651f2617a..8bfaa1ca8a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -34,7 +34,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { /** * Collect the column level stats for external table through metadata. */ - protected void getColumnStatsByMeta() throws Exception { + protected void getStatsByMeta() throws Exception { throw new NotImplementedException("Code is not implemented"); } @@ -42,16 +42,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask { * Collect the stats for external table through sql. * @return ColumnStatistics */ - protected void getColumnStatsBySql() { + protected void getStatsBySql() { throw new NotImplementedException("getColumnStatsBySql is not implemented"); } @Override public void execute() throws Exception { if (Config.collect_external_table_stats_by_sql) { - getColumnStatsBySql(); + getStatsBySql(); } else { - getColumnStatsByMeta(); + getStatsByMeta(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 3f96c1bccc..76f14a5a68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -57,21 +57,63 @@ public class HiveAnalysisTask extends HMSAnalysisTask { public static final String TIMESTAMP = "transient_lastDdlTime"; public static final String DELIMITER = "-"; + private final boolean isTableLevelTask; + public HiveAnalysisTask(AnalysisTaskInfo info) { super(info); + isTableLevelTask = info.externalTableLevelTask; } - private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " + + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', " + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" - + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " - + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, " + + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')"; @Override + protected void getStatsByMeta() throws Exception { + if (isTableLevelTask) { + getTableStatsByMeta(); + } else { + getColumnStatsByMeta(); + } + } + + protected void getTableStatsByMeta() throws Exception { + Map<String, String> params = new HashMap<>(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(tbl.getId())); + params.put("colId", ""); + + // Get table level information. + Map<String, String> parameters = table.getRemoteTable().getParameters(); + // Collect table level row count, null number and timestamp. + setParameterData(parameters, params); + if (parameters.containsKey(TOTAL_SIZE)) { + params.put("dataSize", parameters.get(TOTAL_SIZE)); + } + params.put("id", genColumnStatId(tbl.getId(), -1, "", null)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + } + protected void getColumnStatsByMeta() throws Exception { List<String> columns = new ArrayList<>(); columns.add(col.getName()); @@ -89,16 +131,17 @@ public class HiveAnalysisTask extends HMSAnalysisTask { setParameterData(parameters, params); params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(), null)); List<ColumnStatisticsObj> tableStats = table.getHiveTableColumnStats(columns); + long rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0; // Collect table level ndv, nulls, min and max. tableStats contains at most 1 item; for (ColumnStatisticsObj tableStat : tableStats) { if (!tableStat.isSetStatsData()) { continue; } ColumnStatisticsData data = tableStat.getStatsData(); - getStatData(data, params); + getStatData(data, params, rowCount); } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); + String sql = stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); this.stmtExecutor = new StmtExecutor(r.connectContext, sql); @@ -128,11 +171,12 @@ public class HiveAnalysisTask extends HMSAnalysisTask { if (!stat.isSetStatsData()) { continue; } + rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0; // Collect ndv, nulls, min and max for different data type. ColumnStatisticsData data = stat.getStatsData(); - getStatData(data, params); + getStatData(data, params, rowCount); stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE)); } // Update partition level stats for this column. for (String partitionSql : partitionAnalysisSQLs) { @@ -145,11 +189,15 @@ public class HiveAnalysisTask extends HMSAnalysisTask { Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); } - private void getStatData(ColumnStatisticsData data, Map<String, String> params) { + private void getStatData(ColumnStatisticsData data, Map<String, String> params, long rowCount) { long ndv = 0; long nulls = 0; String min = ""; String max = ""; + long colSize = 0; + if (!data.isSetStringStats()) { + colSize = rowCount * col.getType().getSlotSize(); + } // Collect ndv, nulls, min and max for different data type. if (data.isSetLongStats()) { LongColumnStatsData longStats = data.getLongStats(); @@ -161,6 +209,8 @@ public class HiveAnalysisTask extends HMSAnalysisTask { StringColumnStatsData stringStats = data.getStringStats(); ndv = stringStats.getNumDVs(); nulls = stringStats.getNumNulls(); + double avgColLen = stringStats.getAvgColLen(); + colSize = Math.round(avgColLen * rowCount); } else if (data.isSetDecimalStats()) { DecimalColumnStatsData decimalStats = data.getDecimalStats(); ndv = decimalStats.getNumDVs(); @@ -211,25 +261,21 @@ public class HiveAnalysisTask extends HMSAnalysisTask { params.put("nulls", String.valueOf(nulls)); params.put("min", min); params.put("max", max); + params.put("dataSize", String.valueOf(colSize)); } private void setParameterData(Map<String, String> parameters, Map<String, String> params) { - long numRows = 0; - long timestamp = 0; - long dataSize = 0; + String numRows = ""; + String timestamp = ""; if (parameters.containsKey(NUM_ROWS)) { - numRows = Long.parseLong(parameters.get(NUM_ROWS)); + numRows = parameters.get(NUM_ROWS); } if (parameters.containsKey(TIMESTAMP)) { - timestamp = Long.parseLong(parameters.get(TIMESTAMP)); - } - if (parameters.containsKey(TOTAL_SIZE)) { - dataSize = Long.parseLong(parameters.get(TOTAL_SIZE)); + timestamp = parameters.get(TIMESTAMP); } - params.put("dataSize", String.valueOf(dataSize)); - params.put("numRows", String.valueOf(numRows)); + params.put("numRows", numRows); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - params.put("update_time", sdf.format(new Date(timestamp * 1000))); + params.put("update_time", sdf.format(new Date(Long.parseLong(timestamp) * 1000))); } private String genColumnStatId(long tableId, long indexId, String columnName, String partitionName) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java index 7c41954dc0..61b3f6ea1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java @@ -56,7 +56,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask { @Override - protected void getColumnStatsByMeta() throws Exception { + protected void getStatsByMeta() throws Exception { Table icebergTable = getIcebergTable(); TableScan tableScan = icebergTable.newScan().includeColumnStats(); for (FileScanTask task : tableScan.planFiles()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org