This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d1f597413 [Fix](statistics)Fix hive table statistic bug (#19365)
2d1f597413 is described below

commit 2d1f597413ecb70597cc54436dab5c8a2eaf55b8
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Thu May 11 07:48:58 2023 +0800

    [Fix](statistics)Fix hive table statistic bug (#19365)
    
    Fix hive table statistic bug. Collect table/partition level statistics.
---
 .../doris/catalog/external/HMSExternalTable.java   | 15 ++++
 .../apache/doris/statistics/AnalysisManager.java   | 26 +++++++
 .../apache/doris/statistics/AnalysisTaskInfo.java  |  7 +-
 .../doris/statistics/AnalysisTaskInfoBuilder.java  | 12 +++-
 .../apache/doris/statistics/BaseAnalysisTask.java  |  4 ++
 .../apache/doris/statistics/ColumnStatistic.java   |  7 +-
 .../apache/doris/statistics/HMSAnalysisTask.java   |  8 +--
 .../apache/doris/statistics/HiveAnalysisTask.java  | 84 +++++++++++++++++-----
 .../doris/statistics/IcebergAnalysisTask.java      |  2 +-
 9 files changed, 134 insertions(+), 31 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 8aecb96bb3..b94804ac61 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -18,12 +18,15 @@
 package org.apache.doris.catalog.external;
 
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.Type;
+import org.apache.doris.common.Config;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
 import org.apache.doris.statistics.AnalysisTaskInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
+import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.HiveAnalysisTask;
 import org.apache.doris.statistics.IcebergAnalysisTask;
 import org.apache.doris.thrift.THiveTable;
@@ -322,6 +325,18 @@ public class HMSExternalTable extends ExternalTable {
         return columns;
     }
 
+    @Override
+    public long estimatedRowCount() {
+        ColumnStatistic cache = Config.enable_stats
+                ? 
Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(id, "")
+                : ColumnStatistic.UNKNOWN;
+        if (cache == ColumnStatistic.UNKNOWN) {
+            return 1;
+        } else {
+            return (long) cache.count;
+        }
+    }
+
     private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
         Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
         Schema schema = icebergTable.schema();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index 688924949d..d12baf0bb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -124,6 +124,7 @@ public class AnalysisManager {
         Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
         createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
         createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync);
+        createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
 
         ConnectContext ctx = ConnectContext.get();
         if (!isSync || ctx.getSessionVariable().enableSaveStatisticsSyncJob) {
@@ -425,6 +426,31 @@ public class AnalysisManager {
         }
     }
 
+    private void createTaskForExternalTable(AnalysisTaskInfo jobInfo,
+                                            Map<Long, BaseAnalysisTask> 
analysisTasks,
+                                            boolean isSync) throws 
DdlException {
+        TableIf table;
+        try {
+            table = StatisticsUtil.findTable(jobInfo.catalogName, 
jobInfo.dbName, jobInfo.tblName);
+        } catch (Throwable e) {
+            LOG.warn(e.getMessage());
+            return;
+        }
+        if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType() 
!= TableType.HMS_EXTERNAL_TABLE) {
+            return;
+        }
+        AnalysisTaskInfoBuilder colTaskInfoBuilder = new 
AnalysisTaskInfoBuilder(jobInfo);
+        long taskId = Env.getCurrentEnv().getNextId();
+        AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setIndexId(-1L)
+                .setTaskId(taskId).setExternalTableLevelTask(true).build();
+        analysisTasks.put(taskId, createTask(analysisTaskInfo));
+        try {
+            StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
+        } catch (Exception e) {
+            throw new DdlException("Failed to create analysis task", e);
+        }
+    }
+
     public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState 
jobState, String message, long time) {
         if (analysisJobIdToTaskMap.get(info.jobId) == null) {
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
index 8690682ea2..2860a472c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java
@@ -104,11 +104,15 @@ public class AnalysisTaskInfo {
 
     public String message;
 
+    // True means this task is a table level task for external table.
+    // This kind of task is mainly to collect the number of rows of a table.
+    public boolean externalTableLevelTask;
+
     public AnalysisTaskInfo(long jobId, long taskId, String catalogName, 
String dbName, String tblName,
             Map<String, Set<String>> colToPartitions, String colName, Long 
indexId, JobType jobType,
             AnalysisMode analysisMode, AnalysisMethod analysisMethod, 
AnalysisType analysisType,
             int samplePercent, int sampleRows, int maxBucketNum, long 
periodTimeInMs, String message,
-            long lastExecTimeInMs, AnalysisState state, ScheduleType 
scheduleType) {
+            long lastExecTimeInMs, AnalysisState state, ScheduleType 
scheduleType, boolean isExternalTableLevelTask) {
         this.jobId = jobId;
         this.taskId = taskId;
         this.catalogName = catalogName;
@@ -129,6 +133,7 @@ public class AnalysisTaskInfo {
         this.lastExecTimeInMs = lastExecTimeInMs;
         this.state = state;
         this.scheduleType = scheduleType;
+        this.externalTableLevelTask = isExternalTableLevelTask;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
index 5a6e6b41ad..acaae0baab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java
@@ -47,6 +47,7 @@ public class AnalysisTaskInfoBuilder {
     private AnalysisState state;
     private ScheduleType scheduleType;
     private String message;
+    private boolean externalTableLevelTask;
 
     public AnalysisTaskInfoBuilder() {
     }
@@ -174,10 +175,16 @@ public class AnalysisTaskInfoBuilder {
         return this;
     }
 
+    public AnalysisTaskInfoBuilder setExternalTableLevelTask(boolean 
isTableLevel) {
+        this.externalTableLevelTask = isTableLevel;
+        return this;
+    }
+
     public AnalysisTaskInfo build() {
         return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, 
tblName, colToPartitions,
                 colName, indexId, jobType, analysisMode, analysisMethod, 
analysisType, samplePercent,
-                sampleRows, maxBucketNum, periodTimeInMs, message, 
lastExecTimeInMs, state, scheduleType);
+                sampleRows, maxBucketNum, periodTimeInMs, message, 
lastExecTimeInMs, state, scheduleType,
+                externalTableLevelTask);
     }
 
     public AnalysisTaskInfoBuilder copy() {
@@ -201,6 +208,7 @@ public class AnalysisTaskInfoBuilder {
                 .setMessage(message)
                 .setLastExecTimeInMs(lastExecTimeInMs)
                 .setState(state)
-                .setScheduleType(scheduleType);
+                .setScheduleType(scheduleType)
+                .setExternalTableLevelTask(false);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 0f62a39396..a528229012 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -142,6 +142,10 @@ public abstract class BaseAnalysisTask {
                     info, AnalysisState.FAILED,
                     String.format("Table with name %s not exists", 
info.tblName), System.currentTimeMillis());
         }
+        // External Table level task doesn't contain a column. Don't need to 
do the column related analyze.
+        if (info.externalTableLevelTask) {
+            return;
+        }
         if (info.analysisType != null && 
(info.analysisType.equals(AnalysisType.COLUMN)
                 || info.analysisType.equals(AnalysisType.HISTOGRAM))) {
             col = tbl.getColumn(info.colName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
index ef5da59d81..3dfc66adbd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java
@@ -138,10 +138,9 @@ public class ColumnStatistic {
             String colName = resultRow.getColumnValue("col_id");
             Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, 
idxId, colName);
             if (col == null) {
-                LOG.warn("Failed to deserialize column statistics, ctlId: {} 
dbId: {}"
-                                + "tblId: {} column: {} not exists",
-                        catalogId, dbID, tblId, colName);
-                return ColumnStatistic.UNKNOWN;
+                // Col is null indicates this information is external table 
level info,
+                // which doesn't have a column.
+                return columnStatisticBuilder.build();
             }
             String min = resultRow.getColumnValue("min");
             String max = resultRow.getColumnValue("max");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index 5651f2617a..8bfaa1ca8a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -34,7 +34,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
     /**
      * Collect the column level stats for external table through metadata.
      */
-    protected void getColumnStatsByMeta() throws Exception {
+    protected void getStatsByMeta() throws Exception {
         throw new NotImplementedException("Code is not implemented");
     }
 
@@ -42,16 +42,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
      * Collect the stats for external table through sql.
      * @return ColumnStatistics
      */
-    protected void getColumnStatsBySql() {
+    protected void getStatsBySql() {
         throw new NotImplementedException("getColumnStatsBySql is not 
implemented");
     }
 
     @Override
     public void execute() throws Exception {
         if (Config.collect_external_table_stats_by_sql) {
-            getColumnStatsBySql();
+            getStatsBySql();
         } else {
-            getColumnStatsByMeta();
+            getStatsByMeta();
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
index 3f96c1bccc..76f14a5a68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
@@ -57,21 +57,63 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
     public static final String TIMESTAMP = "transient_lastDdlTime";
     public static final String DELIMITER = "-";
 
+    private final boolean isTableLevelTask;
+
     public HiveAnalysisTask(AnalysisTaskInfo info) {
         super(info);
+        isTableLevelTask = info.externalTableLevelTask;
     }
 
-    private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+    private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT 
INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', NULL, "
+            + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, 
'${update_time}')";
+
+    private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE = 
"INSERT INTO "
             + "${internalDB}.${columnStatTbl}"
             + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', '${partId}', "
             + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, 
'${update_time}')";
 
     private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO "
             + "${internalDB}.${columnStatTbl}"
-            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', NULL, "
-            + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, 
'${update_time}')";
+            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'', NULL, "
+            + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
 
     @Override
+    protected void getStatsByMeta() throws Exception {
+        if (isTableLevelTask) {
+            getTableStatsByMeta();
+        } else {
+            getColumnStatsByMeta();
+        }
+    }
+
+    protected void getTableStatsByMeta() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+        params.put("catalogId", String.valueOf(catalog.getId()));
+        params.put("dbId", String.valueOf(db.getId()));
+        params.put("tblId", String.valueOf(tbl.getId()));
+        params.put("colId", "");
+
+        // Get table level information.
+        Map<String, String> parameters = 
table.getRemoteTable().getParameters();
+        // Collect table level row count, null number and timestamp.
+        setParameterData(parameters, params);
+        if (parameters.containsKey(TOTAL_SIZE)) {
+            params.put("dataSize", parameters.get(TOTAL_SIZE));
+        }
+        params.put("id", genColumnStatId(tbl.getId(), -1, "", null));
+        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
+        String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
+        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
+            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+            this.stmtExecutor.execute();
+        }
+    }
+
     protected void getColumnStatsByMeta() throws Exception {
         List<String> columns = new ArrayList<>();
         columns.add(col.getName());
@@ -89,16 +131,17 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
         setParameterData(parameters, params);
         params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(), 
null));
         List<ColumnStatisticsObj> tableStats = 
table.getHiveTableColumnStats(columns);
+        long rowCount = parameters.containsKey(NUM_ROWS) ? 
Long.parseLong(parameters.get(NUM_ROWS)) : 0;
         // Collect table level ndv, nulls, min and max. tableStats contains at 
most 1 item;
         for (ColumnStatisticsObj tableStat : tableStats) {
             if (!tableStat.isSetStatsData()) {
                 continue;
             }
             ColumnStatisticsData data = tableStat.getStatsData();
-            getStatData(data, params);
+            getStatData(data, params, rowCount);
         }
         StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
-        String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
+        String sql = 
stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE);
         try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
             r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
             this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
@@ -128,11 +171,12 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
             if (!stat.isSetStatsData()) {
                 continue;
             }
+            rowCount = parameters.containsKey(NUM_ROWS) ? 
Long.parseLong(parameters.get(NUM_ROWS)) : 0;
             // Collect ndv, nulls, min and max for different data type.
             ColumnStatisticsData data = stat.getStatsData();
-            getStatData(data, params);
+            getStatData(data, params, rowCount);
             stringSubstitutor = new StringSubstitutor(params);
-            
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
+            
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE));
         }
         // Update partition level stats for this column.
         for (String partitionSql : partitionAnalysisSQLs) {
@@ -145,11 +189,15 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
         
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, 
col.getName());
     }
 
-    private void getStatData(ColumnStatisticsData data, Map<String, String> 
params) {
+    private void getStatData(ColumnStatisticsData data, Map<String, String> 
params, long rowCount) {
         long ndv = 0;
         long nulls = 0;
         String min = "";
         String max = "";
+        long colSize = 0;
+        if (!data.isSetStringStats()) {
+            colSize = rowCount * col.getType().getSlotSize();
+        }
         // Collect ndv, nulls, min and max for different data type.
         if (data.isSetLongStats()) {
             LongColumnStatsData longStats = data.getLongStats();
@@ -161,6 +209,8 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
             StringColumnStatsData stringStats = data.getStringStats();
             ndv = stringStats.getNumDVs();
             nulls = stringStats.getNumNulls();
+            double avgColLen = stringStats.getAvgColLen();
+            colSize = Math.round(avgColLen * rowCount);
         } else if (data.isSetDecimalStats()) {
             DecimalColumnStatsData decimalStats = data.getDecimalStats();
             ndv = decimalStats.getNumDVs();
@@ -211,25 +261,21 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
         params.put("nulls", String.valueOf(nulls));
         params.put("min", min);
         params.put("max", max);
+        params.put("dataSize", String.valueOf(colSize));
     }
 
     private void setParameterData(Map<String, String> parameters, Map<String, 
String> params) {
-        long numRows = 0;
-        long timestamp = 0;
-        long dataSize = 0;
+        String numRows = "";
+        String timestamp = "";
         if (parameters.containsKey(NUM_ROWS)) {
-            numRows = Long.parseLong(parameters.get(NUM_ROWS));
+            numRows = parameters.get(NUM_ROWS);
         }
         if (parameters.containsKey(TIMESTAMP)) {
-            timestamp = Long.parseLong(parameters.get(TIMESTAMP));
-        }
-        if (parameters.containsKey(TOTAL_SIZE)) {
-            dataSize = Long.parseLong(parameters.get(TOTAL_SIZE));
+            timestamp = parameters.get(TIMESTAMP);
         }
-        params.put("dataSize", String.valueOf(dataSize));
-        params.put("numRows", String.valueOf(numRows));
+        params.put("numRows", numRows);
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        params.put("update_time", sdf.format(new Date(timestamp * 1000)));
+        params.put("update_time", sdf.format(new 
Date(Long.parseLong(timestamp) * 1000)));
     }
 
     private String genColumnStatId(long tableId, long indexId, String 
columnName, String partitionName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
index 7c41954dc0..61b3f6ea1c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
@@ -56,7 +56,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask {
 
 
     @Override
-    protected void getColumnStatsByMeta() throws Exception {
+    protected void getStatsByMeta() throws Exception {
         Table icebergTable = getIcebergTable();
         TableScan tableScan = icebergTable.newScan().includeColumnStats();
         for (FileScanTask task : tableScan.planFiles()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to