This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bcf79178e [Improvement](statistics, multi catalog)Support iceberg 
table stats collection (#21481)
9bcf79178e is described below

commit 9bcf79178ed4021bfb9c784f340c60b2c7428a8b
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Fri Jul 7 09:18:37 2023 +0800

    [Improvement](statistics, multi catalog)Support iceberg table stats 
collection (#21481)
    
    Fetch iceberg table stats automatically while querying a table.
    Collect accurate statistics for Iceberg table by running analyze sql in 
Doris (remove collect by meta option).
---
 .../main/java/org/apache/doris/common/Config.java  |   7 -
 .../doris/catalog/external/HMSExternalTable.java   |  25 +-
 .../catalog/external/IcebergExternalTable.java     |  10 +
 .../apache/doris/external/hive/util/HiveUtil.java  |   2 +-
 .../apache/doris/statistics/HMSAnalysisTask.java   | 239 ++++++++++++-
 .../apache/doris/statistics/HiveAnalysisTask.java  | 370 ---------------------
 .../doris/statistics/IcebergAnalysisTask.java      | 121 -------
 .../doris/statistics/util/StatisticsUtil.java      |  49 +++
 8 files changed, 298 insertions(+), 525 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index e4a61788ec..c683db9bc5 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1742,13 +1742,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = false)
     public static boolean use_fuzzy_session_variable = false;
 
-    /**
-     * Collect external table statistic info by running sql when set to true.
-     * Otherwise, use external catalog metadata.
-     */
-    @ConfField(mutable = true)
-    public static boolean collect_external_table_stats_by_sql = true;
-
     /**
      * Max num of same name meta informatntion in catalog recycle bin.
      * Default is 3.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 1aee1803ae..b985e4cddd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -29,8 +29,7 @@ import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.statistics.ColumnStatisticBuilder;
-import org.apache.doris.statistics.HiveAnalysisTask;
-import org.apache.doris.statistics.IcebergAnalysisTask;
+import org.apache.doris.statistics.HMSAnalysisTask;
 import org.apache.doris.statistics.TableStatistic;
 import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.thrift.THiveTable;
@@ -322,14 +321,7 @@ public class HMSExternalTable extends ExternalTable {
     @Override
     public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
         makeSureInitialized();
-        switch (dlaType) {
-            case HIVE:
-                return new HiveAnalysisTask(info);
-            case ICEBERG:
-                return new IcebergAnalysisTask(info);
-            default:
-                throw new IllegalArgumentException("Analysis job for dlaType " 
+ dlaType + " not supported.");
-        }
+        return new HMSAnalysisTask(info);
     }
 
     public String getViewText() {
@@ -473,6 +465,19 @@ public class HMSExternalTable extends ExternalTable {
 
     @Override
     public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+        makeSureInitialized();
+        switch (dlaType) {
+            case HIVE:
+                return getHiveColumnStats(colName);
+            case ICEBERG:
+                return StatisticsUtil.getIcebergColumnStats(colName, 
HiveMetaStoreClientHelper.getIcebergTable(this));
+            default:
+                LOG.warn("get column stats for dlaType {} is not supported.", 
dlaType);
+        }
+        return Optional.empty();
+    }
+
+    private Optional<ColumnStatistic> getHiveColumnStats(String colName) {
         List<ColumnStatisticsObj> tableStats = 
getHiveTableColumnStats(Lists.newArrayList(colName));
         if (tableStats == null || tableStats.isEmpty()) {
             LOG.debug(String.format("No table stats found in Hive metastore 
for column %s in table %s.",
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index bf29d93eed..c2be8b90a0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.util.StatisticsUtil;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TIcebergTable;
 import org.apache.doris.thrift.TTableDescriptor;
@@ -33,6 +35,7 @@ import org.apache.iceberg.types.Types;
 
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 
 public class IcebergExternalTable extends ExternalTable {
 
@@ -134,4 +137,11 @@ public class IcebergExternalTable extends ExternalTable {
             return tTableDescriptor;
         }
     }
+
+    @Override
+    public Optional<ColumnStatistic> getColumnStatistic(String colName) {
+        makeSureInitialized();
+        return StatisticsUtil.getIcebergColumnStats(colName,
+            ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
index d42d8c3d7d..704d0fadf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/hive/util/HiveUtil.java
@@ -197,7 +197,7 @@ public final class HiveUtil {
                 method = clazz.getDeclaredMethod("isSplitable", 
FileSystem.class, Path.class);
                 break;
             } catch (NoSuchMethodException ignored) {
-                LOG.warn("Class {} doesn't contain isSplitable method.", 
clazz);
+                LOG.debug("Class {} doesn't contain isSplitable method.", 
clazz);
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
index b5bee03f0b..a7b45c13cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java
@@ -17,41 +17,248 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.util.InternalQueryResult;
+import org.apache.doris.statistics.util.StatisticsUtil;
 
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.text.StringSubstitutor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class HMSAnalysisTask extends BaseAnalysisTask {
+    private static final Logger LOG = 
LogManager.getLogger(HMSAnalysisTask.class);
+
+    public static final String TOTAL_SIZE = "totalSize";
+    public static final String NUM_ROWS = "numRows";
+    public static final String NUM_FILES = "numFiles";
+    public static final String TIMESTAMP = "transient_lastDdlTime";
+
+    private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
+            + "${internalDB}.${columnStatTbl}"
+            + " SELECT "
+            + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
+            + "${catalogId} AS catalog_id, "
+            + "${dbId} AS db_id, "
+            + "${tblId} AS tbl_id, "
+            + "${idxId} AS idx_id, "
+            + "'${colId}' AS col_id, "
+            + "${partId} AS part_id, "
+            + "COUNT(1) AS row_count, "
+            + "NDV(`${colName}`) AS ndv, "
+            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
+            + "MIN(`${colName}`) AS min, "
+            + "MAX(`${colName}`) AS max, "
+            + "${dataSizeFunction} AS data_size, "
+            + "NOW() "
+            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
 
-    protected HMSExternalTable table;
+    private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT 
COUNT(1) as rowCount "
+            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
+
+    private final boolean isTableLevelTask;
+    private final boolean isSamplingPartition;
+    private final boolean isPartitionOnly;
+    private final Set<String> partitionNames;
+    private HMSExternalTable table;
 
     public HMSAnalysisTask(AnalysisInfo info) {
         super(info);
+        isTableLevelTask = info.externalTableLevelTask;
+        isSamplingPartition = info.samplingPartition;
+        isPartitionOnly = info.partitionOnly;
+        partitionNames = info.partitionNames;
         table = (HMSExternalTable) tbl;
     }
 
+    public void execute() throws Exception {
+        if (isTableLevelTask) {
+            getTableStats();
+        } else {
+            getTableColumnStats();
+        }
+    }
+
     /**
-     * Collect the column level stats for external table through metadata.
+     * Get table row count and insert the result to 
__internal_schema.table_statistics
      */
-    protected void getStatsByMeta() throws Exception {
-        throw new NotImplementedException("Code is not implemented");
+    private void getTableStats() throws Exception {
+        // Get table level information. An example sql for table stats:
+        // INSERT INTO __internal_schema.table_statistics VALUES
+        //   ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, 
NOW())
+        Map<String, String> parameters = 
table.getRemoteTable().getParameters();
+        if (isPartitionOnly) {
+            for (String partId : partitionNames) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
+                sb.append(" where ");
+                String[] splits = partId.split("/");
+                for (int i = 0; i < splits.length; i++) {
+                    String value = splits[i].split("=")[1];
+                    splits[i] = splits[i].replace(value, "\'" + value + "\'");
+                }
+                sb.append(StringUtils.join(splits, " and "));
+                Map<String, String> params = buildTableStatsParams(partId);
+                setParameterData(parameters, params);
+                List<InternalQueryResult.ResultRow> columnResult =
+                        StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
+                        .replace(sb.toString()));
+                String rowCount = 
columnResult.get(0).getColumnValue("rowCount");
+                params.put("rowCount", rowCount);
+                StatisticsRepository.persistTableStats(params);
+            }
+        } else {
+            Map<String, String> params = buildTableStatsParams("NULL");
+            List<InternalQueryResult.ResultRow> columnResult =
+                    StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
+                    .replace(ANALYZE_TABLE_COUNT_TEMPLATE));
+            String rowCount = columnResult.get(0).getColumnValue("rowCount");
+            params.put("rowCount", rowCount);
+            StatisticsRepository.persistTableStats(params);
+        }
     }
 
     /**
-     * Collect the stats for external table through sql.
-     * @return ColumnStatistics
+     * Get column statistics and insert the result to 
__internal_schema.column_statistics
      */
-    protected void getStatsBySql() throws Exception {
-        throw new NotImplementedException("getColumnStatsBySql is not 
implemented");
+    private void getTableColumnStats() throws Exception {
+        // An example sql for a column stats:
+        // INSERT INTO __internal_schema.column_statistics
+        //   SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
+        //   13002 AS catalog_id,
+        //   13038 AS db_id,
+        //   13055 AS tbl_id,
+        //   -1 AS idx_id,
+        //   'r_regionkey' AS col_id,
+        //   'NULL' AS part_id,
+        //   COUNT(1) AS row_count,
+        //   NDV(`r_regionkey`) AS ndv,
+        //   SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS 
null_count,
+        //   MIN(`r_regionkey`) AS min,
+        //   MAX(`r_regionkey`) AS max,
+        //   0 AS data_size,
+        //   NOW() FROM `hive`.`tpch100`.`region`
+        if (isPartitionOnly) {
+            for (String partId : partitionNames) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+                sb.append(" where ");
+                String[] splits = partId.split("/");
+                for (int i = 0; i < splits.length; i++) {
+                    String value = splits[i].split("=")[1];
+                    splits[i] = splits[i].replace(value, "\'" + value + "\'");
+                }
+                sb.append(StringUtils.join(splits, " and "));
+                Map<String, String> params = buildTableStatsParams(partId);
+                params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+                params.put("columnStatTbl", 
StatisticConstants.STATISTIC_TBL_NAME);
+                params.put("colName", col.getName());
+                params.put("colId", info.colName);
+                params.put("dataSizeFunction", getDataSizeFunction(col));
+                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+                String sql = stringSubstitutor.replace(sb.toString());
+                try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
+                    
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+                    this.stmtExecutor = new StmtExecutor(r.connectContext, 
sql);
+                    this.stmtExecutor.execute();
+                }
+            }
+        } else {
+            StringBuilder sb = new StringBuilder();
+            sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
+            if (isSamplingPartition) {
+                sb.append(" where 1=1 ");
+                String[] splitExample = 
partitionNames.stream().findFirst().get().split("/");
+                int parts = splitExample.length;
+                List<String> partNames = new ArrayList<>();
+                for (String split : splitExample) {
+                    partNames.add(split.split("=")[0]);
+                }
+                List<List<String>> valueLists = new ArrayList<>();
+                for (int i = 0; i < parts; i++) {
+                    valueLists.add(new ArrayList<>());
+                }
+                for (String partId : partitionNames) {
+                    String[] partIds = partId.split("/");
+                    for (int i = 0; i < partIds.length; i++) {
+                        valueLists.get(i).add("\'" + partIds[i].split("=")[1] 
+ "\'");
+                    }
+                }
+                for (int i = 0; i < parts; i++) {
+                    sb.append(" and ");
+                    sb.append(partNames.get(i));
+                    sb.append(" in (");
+                    sb.append(StringUtils.join(valueLists.get(i), ","));
+                    sb.append(") ");
+                }
+            }
+            Map<String, String> params = buildTableStatsParams("NULL");
+            params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
+            params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
+            params.put("colName", col.getName());
+            params.put("colId", info.colName);
+            params.put("dataSizeFunction", getDataSizeFunction(col));
+            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
+            String sql = stringSubstitutor.replace(sb.toString());
+            try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
+                
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
+                this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
+                this.stmtExecutor.execute();
+            }
+            
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, 
col.getName());
+        }
     }
 
-    @Override
-    public void execute() throws Exception {
-        if (Config.collect_external_table_stats_by_sql) {
-            getStatsBySql();
-        } else {
-            getStatsByMeta();
+    private Map<String, String> buildTableStatsParams(String partId) {
+        Map<String, String> commonParams = new HashMap<>();
+        String id = StatisticsUtil.constructId(tbl.getId(), -1);
+        if (!partId.equals("NULL")) {
+            id = StatisticsUtil.constructId(id, partId);
+        }
+        commonParams.put("id", id);
+        commonParams.put("catalogId", String.valueOf(catalog.getId()));
+        commonParams.put("dbId", String.valueOf(db.getId()));
+        commonParams.put("tblId", String.valueOf(tbl.getId()));
+        commonParams.put("indexId", "-1");
+        commonParams.put("idxId", "-1");
+        commonParams.put("partId", "\'" + partId + "\'");
+        commonParams.put("catalogName", catalog.getName());
+        commonParams.put("dbName", db.getFullName());
+        commonParams.put("tblName", tbl.getName());
+        if (col != null) {
+            commonParams.put("type", col.getType().toString());
+        }
+        commonParams.put("lastAnalyzeTimeInMs", 
String.valueOf(System.currentTimeMillis()));
+        return commonParams;
+    }
+
+    private void setParameterData(Map<String, String> parameters, Map<String, 
String> params) {
+        String numRows = "";
+        String timestamp = "";
+        if (parameters.containsKey(NUM_ROWS)) {
+            numRows = parameters.get(NUM_ROWS);
+        }
+        if (parameters.containsKey(TIMESTAMP)) {
+            timestamp = parameters.get(TIMESTAMP);
         }
+        params.put("numRows", numRows);
+        params.put("rowCount", numRows);
+        params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
+                
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
+                        ZoneId.systemDefault())));
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
deleted file mode 100644
index 8ae74206de..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java
+++ /dev/null
@@ -1,370 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.util.InternalQueryResult;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.text.StringSubstitutor;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.Decimal;
-import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
-import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveAnalysisTask extends HMSAnalysisTask {
-    private static final Logger LOG = 
LogManager.getLogger(HiveAnalysisTask.class);
-
-    public static final String TOTAL_SIZE = "totalSize";
-    public static final String NUM_ROWS = "numRows";
-    public static final String NUM_FILES = "numFiles";
-    public static final String TIMESTAMP = "transient_lastDdlTime";
-
-    private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " SELECT "
-            + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
-            + "${catalogId} AS catalog_id, "
-            + "${dbId} AS db_id, "
-            + "${tblId} AS tbl_id, "
-            + "${idxId} AS idx_id, "
-            + "'${colId}' AS col_id, "
-            + "${partId} AS part_id, "
-            + "COUNT(1) AS row_count, "
-            + "NDV(`${colName}`) AS ndv, "
-            + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS 
null_count, "
-            + "MIN(`${colName}`) AS min, "
-            + "MAX(`${colName}`) AS max, "
-            + "${dataSizeFunction} AS data_size, "
-            + "NOW() "
-            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
-
-    private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT 
COUNT(1) as rowCount "
-            + "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
-
-    private final boolean isTableLevelTask;
-    private final boolean isSamplingPartition;
-    private final boolean isPartitionOnly;
-    private final Set<String> partitionNames;
-
-    public HiveAnalysisTask(AnalysisInfo info) {
-        super(info);
-        isTableLevelTask = info.externalTableLevelTask;
-        isSamplingPartition = info.samplingPartition;
-        isPartitionOnly = info.partitionOnly;
-        partitionNames = info.partitionNames;
-    }
-
-    private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT 
INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', NULL, "
-            + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, 
'${update_time}')";
-
-    private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE = 
"INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', '${partId}', "
-            + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, 
'${update_time}')";
-
-    private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'', NULL, "
-            + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
-
-    /**
-     * Collect the stats for external table through sql.
-     */
-    @Override
-    protected void getStatsBySql() throws Exception {
-        if (isTableLevelTask) {
-            getTableStatsBySql();
-        } else {
-            getTableColumnStatsBySql();
-        }
-    }
-
-    /**
-     * Get table row count and insert the result to 
__internal_schema.table_statistics
-     */
-    private void getTableStatsBySql() throws Exception {
-        // Get table level information. An example sql for table stats:
-        // INSERT INTO __internal_schema.table_statistics VALUES
-        //   ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, 
NOW())
-        Map<String, String> parameters = 
table.getRemoteTable().getParameters();
-        if (isPartitionOnly) {
-            for (String partId : partitionNames) {
-                StringBuilder sb = new StringBuilder();
-                sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
-                sb.append(" where ");
-                String[] splits = partId.split("/");
-                for (int i = 0; i < splits.length; i++) {
-                    String value = splits[i].split("=")[1];
-                    splits[i] = splits[i].replace(value, "\'" + value + "\'");
-                }
-                sb.append(StringUtils.join(splits, " and "));
-                Map<String, String> params = buildTableStatsParams(partId);
-                setParameterData(parameters, params);
-                List<InternalQueryResult.ResultRow> columnResult =
-                        StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
-                        .replace(sb.toString()));
-                String rowCount = 
columnResult.get(0).getColumnValue("rowCount");
-                params.put("rowCount", rowCount);
-                StatisticsRepository.persistTableStats(params);
-            }
-        } else {
-            Map<String, String> params = buildTableStatsParams("NULL");
-            List<InternalQueryResult.ResultRow> columnResult =
-                    StatisticsUtil.execStatisticQuery(new 
StringSubstitutor(params)
-                    .replace(ANALYZE_TABLE_COUNT_TEMPLATE));
-            String rowCount = columnResult.get(0).getColumnValue("rowCount");
-            params.put("rowCount", rowCount);
-            StatisticsRepository.persistTableStats(params);
-        }
-    }
-
-    /**
-     * Get column statistics and insert the result to 
__internal_schema.column_statistics
-     */
-    private void getTableColumnStatsBySql() throws Exception {
-        // An example sql for a column stats:
-        // INSERT INTO __internal_schema.column_statistics
-        //   SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
-        //   13002 AS catalog_id,
-        //   13038 AS db_id,
-        //   13055 AS tbl_id,
-        //   -1 AS idx_id,
-        //   'r_regionkey' AS col_id,
-        //   'NULL' AS part_id,
-        //   COUNT(1) AS row_count,
-        //   NDV(`r_regionkey`) AS ndv,
-        //   SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS 
null_count,
-        //   MIN(`r_regionkey`) AS min,
-        //   MAX(`r_regionkey`) AS max,
-        //   0 AS data_size,
-        //   NOW() FROM `hive`.`tpch100`.`region`
-        if (isPartitionOnly) {
-            for (String partId : partitionNames) {
-                StringBuilder sb = new StringBuilder();
-                sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
-                sb.append(" where ");
-                String[] splits = partId.split("/");
-                for (int i = 0; i < splits.length; i++) {
-                    String value = splits[i].split("=")[1];
-                    splits[i] = splits[i].replace(value, "\'" + value + "\'");
-                }
-                sb.append(StringUtils.join(splits, " and "));
-                Map<String, String> params = buildTableStatsParams(partId);
-                params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
-                params.put("columnStatTbl", 
StatisticConstants.STATISTIC_TBL_NAME);
-                params.put("colName", col.getName());
-                params.put("colId", info.colName);
-                params.put("dataSizeFunction", getDataSizeFunction(col));
-                StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-                String sql = stringSubstitutor.replace(sb.toString());
-                try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
-                    
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-                    this.stmtExecutor = new StmtExecutor(r.connectContext, 
sql);
-                    this.stmtExecutor.execute();
-                }
-            }
-        } else {
-            StringBuilder sb = new StringBuilder();
-            sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
-            if (isSamplingPartition) {
-                sb.append(" where 1=1 ");
-                String[] splitExample = 
partitionNames.stream().findFirst().get().split("/");
-                int parts = splitExample.length;
-                List<String> partNames = new ArrayList<>();
-                for (String split : splitExample) {
-                    partNames.add(split.split("=")[0]);
-                }
-                List<List<String>> valueLists = new ArrayList<>();
-                for (int i = 0; i < parts; i++) {
-                    valueLists.add(new ArrayList<>());
-                }
-                for (String partId : partitionNames) {
-                    String[] partIds = partId.split("/");
-                    for (int i = 0; i < partIds.length; i++) {
-                        valueLists.get(i).add("\'" + partIds[i].split("=")[1] 
+ "\'");
-                    }
-                }
-                for (int i = 0; i < parts; i++) {
-                    sb.append(" and ");
-                    sb.append(partNames.get(i));
-                    sb.append(" in (");
-                    sb.append(StringUtils.join(valueLists.get(i), ","));
-                    sb.append(") ");
-                }
-            }
-            Map<String, String> params = buildTableStatsParams("NULL");
-            params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
-            params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
-            params.put("colName", col.getName());
-            params.put("colId", info.colName);
-            params.put("dataSizeFunction", getDataSizeFunction(col));
-            StringSubstitutor stringSubstitutor = new 
StringSubstitutor(params);
-            String sql = stringSubstitutor.replace(sb.toString());
-            try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext()) {
-                
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-                this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-                this.stmtExecutor.execute();
-            }
-            
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, 
col.getName());
-        }
-    }
-
-    private Map<String, String> buildTableStatsParams(String partId) {
-        Map<String, String> commonParams = new HashMap<>();
-        String id = StatisticsUtil.constructId(tbl.getId(), -1);
-        if (!partId.equals("NULL")) {
-            id = StatisticsUtil.constructId(id, partId);
-        }
-        commonParams.put("id", id);
-        commonParams.put("catalogId", String.valueOf(catalog.getId()));
-        commonParams.put("dbId", String.valueOf(db.getId()));
-        commonParams.put("tblId", String.valueOf(tbl.getId()));
-        commonParams.put("indexId", "-1");
-        commonParams.put("idxId", "-1");
-        commonParams.put("partId", "\'" + partId + "\'");
-        commonParams.put("catalogName", catalog.getName());
-        commonParams.put("dbName", db.getFullName());
-        commonParams.put("tblName", tbl.getName());
-        if (col != null) {
-            commonParams.put("type", col.getType().toString());
-        }
-        commonParams.put("lastAnalyzeTimeInMs", 
String.valueOf(System.currentTimeMillis()));
-        return commonParams;
-    }
-
-    @Override
-    protected void getStatsByMeta() throws Exception {
-        // To be removed.
-    }
-
-    private void getStatData(ColumnStatisticsData data, Map<String, String> 
params, long rowCount) {
-        long ndv = 0;
-        long nulls = 0;
-        String min = "";
-        String max = "";
-        long colSize = 0;
-        if (!data.isSetStringStats()) {
-            colSize = rowCount * col.getType().getSlotSize();
-        }
-        // Collect ndv, nulls, min and max for different data type.
-        if (data.isSetLongStats()) {
-            LongColumnStatsData longStats = data.getLongStats();
-            ndv = longStats.getNumDVs();
-            nulls = longStats.getNumNulls();
-            min = String.valueOf(longStats.getLowValue());
-            max = String.valueOf(longStats.getHighValue());
-        } else if (data.isSetStringStats()) {
-            StringColumnStatsData stringStats = data.getStringStats();
-            ndv = stringStats.getNumDVs();
-            nulls = stringStats.getNumNulls();
-            double avgColLen = stringStats.getAvgColLen();
-            colSize = Math.round(avgColLen * rowCount);
-        } else if (data.isSetDecimalStats()) {
-            DecimalColumnStatsData decimalStats = data.getDecimalStats();
-            ndv = decimalStats.getNumDVs();
-            nulls = decimalStats.getNumNulls();
-            if (decimalStats.isSetLowValue()) {
-                Decimal lowValue = decimalStats.getLowValue();
-                if (lowValue != null) {
-                    BigDecimal lowDecimal = new BigDecimal(new 
BigInteger(lowValue.getUnscaled()), lowValue.getScale());
-                    min = lowDecimal.toString();
-                }
-            }
-            if (decimalStats.isSetHighValue()) {
-                Decimal highValue = decimalStats.getHighValue();
-                if (highValue != null) {
-                    BigDecimal highDecimal = new BigDecimal(
-                            new BigInteger(highValue.getUnscaled()), 
highValue.getScale());
-                    max = highDecimal.toString();
-                }
-            }
-        } else if (data.isSetDoubleStats()) {
-            DoubleColumnStatsData doubleStats = data.getDoubleStats();
-            ndv = doubleStats.getNumDVs();
-            nulls = doubleStats.getNumNulls();
-            min = String.valueOf(doubleStats.getLowValue());
-            max = String.valueOf(doubleStats.getHighValue());
-        } else if (data.isSetDateStats()) {
-            DateColumnStatsData dateStats = data.getDateStats();
-            ndv = dateStats.getNumDVs();
-            nulls = dateStats.getNumNulls();
-            if (dateStats.isSetLowValue()) {
-                org.apache.hadoop.hive.metastore.api.Date lowValue = 
dateStats.getLowValue();
-                if (lowValue != null) {
-                    LocalDate lowDate = 
LocalDate.ofEpochDay(lowValue.getDaysSinceEpoch());
-                    min = lowDate.toString();
-                }
-            }
-            if (dateStats.isSetHighValue()) {
-                org.apache.hadoop.hive.metastore.api.Date highValue = 
dateStats.getHighValue();
-                if (highValue != null) {
-                    LocalDate highDate = 
LocalDate.ofEpochDay(highValue.getDaysSinceEpoch());
-                    max = highDate.toString();
-                }
-            }
-        } else {
-            throw new RuntimeException("Not supported data type.");
-        }
-        params.put("ndv", String.valueOf(ndv));
-        params.put("nulls", String.valueOf(nulls));
-        params.put("min", min);
-        params.put("max", max);
-        params.put("dataSize", String.valueOf(colSize));
-    }
-
-    private void setParameterData(Map<String, String> parameters, Map<String, 
String> params) {
-        String numRows = "";
-        String timestamp = "";
-        if (parameters.containsKey(NUM_ROWS)) {
-            numRows = parameters.get(NUM_ROWS);
-        }
-        if (parameters.containsKey(TIMESTAMP)) {
-            timestamp = parameters.get(TIMESTAMP);
-        }
-        params.put("numRows", numRows);
-        params.put("rowCount", numRows);
-        params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
-                
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
-                        ZoneId.systemDefault())));
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
deleted file mode 100644
index 105ef758f0..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java
+++ /dev/null
@@ -1,121 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.statistics;
-
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.constants.HMSProperties;
-import org.apache.doris.qe.AutoCloseConnectContext;
-import org.apache.doris.qe.StmtExecutor;
-import org.apache.doris.statistics.util.StatisticsUtil;
-
-import org.apache.commons.text.StringSubstitutor;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-
-import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.Map;
-
-public class IcebergAnalysisTask extends HMSAnalysisTask {
-
-    private long numRows = 0;
-    private long dataSize = 0;
-    private long numNulls = 0;
-
-    public IcebergAnalysisTask(AnalysisInfo info) {
-        super(info);
-    }
-
-    private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "
-            + "${internalDB}.${columnStatTbl}"
-            + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', 
'${colId}', NULL, "
-            + "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, 
'${update_time}')";
-
-
-    @Override
-    protected void getStatsByMeta() throws Exception {
-        Table icebergTable = getIcebergTable();
-        TableScan tableScan = icebergTable.newScan().includeColumnStats();
-        for (FileScanTask task : tableScan.planFiles()) {
-            processDataFile(task.file(), task.spec());
-        }
-        updateStats();
-    }
-
-    private Table getIcebergTable() {
-        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new 
org.apache.iceberg.hive.HiveCatalog();
-        Configuration conf = new HdfsConfiguration();
-        for (Map.Entry<String, String> entry : 
table.getHadoopProperties().entrySet()) {
-            conf.set(entry.getKey(), entry.getValue());
-        }
-        hiveCatalog.setConf(conf);
-        Map<String, String> catalogProperties = new HashMap<>();
-        catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, 
table.getMetastoreUri());
-        catalogProperties.put("uri", table.getMetastoreUri());
-        hiveCatalog.initialize("hive", catalogProperties);
-        return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), 
table.getName()));
-    }
-
-    private void processDataFile(DataFile dataFile, PartitionSpec 
partitionSpec) {
-        int colId = -1;
-        for (Types.NestedField column : partitionSpec.schema().columns()) {
-            if (column.name().equals(col.getName())) {
-                colId = column.fieldId();
-                break;
-            }
-        }
-        if (colId == -1) {
-            throw new RuntimeException(String.format("Column %s not exist.", 
col.getName()));
-        }
-        dataSize += dataFile.columnSizes().get(colId);
-        numRows += dataFile.recordCount();
-        numNulls += dataFile.nullValueCounts().get(colId);
-    }
-
-    private void updateStats() throws Exception {
-        Map<String, String> params = new HashMap<>();
-        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
-        params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
-        params.put("id", tbl.getId() + "-" + col.getName());
-        params.put("catalogId", String.valueOf(catalog.getId()));
-        params.put("dbId", String.valueOf(db.getId()));
-        params.put("tblId", String.valueOf(tbl.getId()));
-        params.put("colId", String.valueOf(col.getName()));
-        params.put("numRows", String.valueOf(numRows));
-        params.put("nulls", String.valueOf(numNulls));
-        params.put("dataSize", String.valueOf(dataSize));
-        params.put("update_time", 
TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
-
-        // Update table level stats info of this column.
-        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
-        String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE);
-        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) 
{
-            r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
-            this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
-            this.stmtExecutor.execute();
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
index 0bdf736aab..6e773fc430 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java
@@ -60,6 +60,7 @@ import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.ColumnStatistic;
+import org.apache.doris.statistics.ColumnStatisticBuilder;
 import org.apache.doris.statistics.Histogram;
 import org.apache.doris.statistics.StatisticConstants;
 import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
@@ -71,9 +72,12 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.text.StringSubstitutor;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
+import org.apache.iceberg.types.Types;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -594,4 +598,49 @@ public class StatisticsUtil {
         }
         return totalSize / estimatedRowSize;
     }
+
+    /**
+     * Get Iceberg column statistics.
+     * @param colName
+     * @param table Iceberg table.
+     * @return Optional Column statistic for the given column.
+     */
+    public static Optional<ColumnStatistic> getIcebergColumnStats(String 
colName, org.apache.iceberg.Table table) {
+        TableScan tableScan = table.newScan().includeColumnStats();
+        ColumnStatisticBuilder columnStatisticBuilder = new 
ColumnStatisticBuilder();
+        columnStatisticBuilder.setCount(0);
+        columnStatisticBuilder.setMaxValue(Double.MAX_VALUE);
+        columnStatisticBuilder.setMinValue(Double.MIN_VALUE);
+        columnStatisticBuilder.setDataSize(0);
+        columnStatisticBuilder.setAvgSizeByte(0);
+        columnStatisticBuilder.setNumNulls(0);
+        for (FileScanTask task : tableScan.planFiles()) {
+            processDataFile(task.file(), task.spec(), colName, 
columnStatisticBuilder);
+        }
+        if (columnStatisticBuilder.getCount() > 0) {
+            
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
+                    / columnStatisticBuilder.getCount());
+        }
+        return Optional.of(columnStatisticBuilder.build());
+    }
+
+    private static void processDataFile(DataFile dataFile, PartitionSpec 
partitionSpec,
+                                        String colName, ColumnStatisticBuilder 
columnStatisticBuilder) {
+        int colId = -1;
+        for (Types.NestedField column : partitionSpec.schema().columns()) {
+            if (column.name().equals(colName)) {
+                colId = column.fieldId();
+                break;
+            }
+        }
+        if (colId == -1) {
+            throw new RuntimeException(String.format("Column %s not exist.", 
colName));
+        }
+        // Update the data size, count and num of nulls in 
columnStatisticBuilder.
+        // TODO: Get min max value.
+        
columnStatisticBuilder.setDataSize(columnStatisticBuilder.getDataSize() + 
dataFile.columnSizes().get(colId));
+        columnStatisticBuilder.setCount(columnStatisticBuilder.getCount() + 
dataFile.recordCount());
+        columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls()
+                + dataFile.nullValueCounts().get(colId));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to