morningman commented on code in PR #26435: URL: https://github.com/apache/doris/pull/26435#discussion_r1390322603
########## fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java: ########## @@ -174,30 +174,33 @@ public static ColumnStatistic fromResultRow(ResultRow row) { String min = row.get(10); String max = row.get(11); if (min != null && !min.equalsIgnoreCase("NULL")) { - min = new String(Base64.getDecoder().decode(min), - StandardCharsets.UTF_8); - - try { - columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); - columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); - } catch (AnalysisException e) { - LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + min = new String(Base64.getDecoder().decode(min), StandardCharsets.UTF_8); + if (catalogId != 0 && min.equalsIgnoreCase("NULL")) { Review Comment: Why checking `catalogId != 0` here? Do you mean to check if this is an internal catalog? if yes, 2 questions: 1. Why need to check if this is an internal catalog? 2. Better use `catalogId != InternalCatalog.INTERNAL_CATALOG_ID` ########## fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java: ########## @@ -120,34 +108,64 @@ private boolean isPartitionColumn() { return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName())); } + // Get ordinary column stats. Ordinary column means not partition column. private void getOrdinaryColumnStats() throws Exception { - // An example sql for a column stats: - // INSERT INTO __internal_schema.column_statistics - // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, - // 13002 AS catalog_id, - // 13038 AS db_id, - // 13055 AS tbl_id, - // -1 AS idx_id, - // 'r_regionkey' AS col_id, - // 'NULL' AS part_id, - // COUNT(1) AS row_count, - // NDV(`r_regionkey`) AS ndv, - // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, - // MIN(`r_regionkey`) AS min, - // MAX(`r_regionkey`) AS max, - // 0 AS data_size, - // NOW() FROM `hive`.`tpch100`.`region` StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_TABLE_TEMPLATE); Map<String, String> params = buildStatsParams("NULL"); - params.put("dataSizeFunction", getDataSizeFunction(col)); - params.put("minFunction", getMinFunction()); - params.put("maxFunction", getMaxFunction()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + params.put("min", getMinFunction()); + params.put("max", getMaxFunction()); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); + Pair<Double, Long> sampleInfo = getSampleInfo(); + params.put("scaleFactor", String.valueOf(sampleInfo.first)); + StringSubstitutor stringSubstitutor; + if (tableSample == null) { + // Do full analyze + LOG.info(String.format("Will do full collection for column %s", col.getName())); Review Comment: Can be `debug` level? ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -85,46 +75,95 @@ public void doExecute() throws Exception { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.info(String.format("Will do sample collection for column %s", col.getName())); + Pair<List<Long>, Long> pair = calcActualSampleTablets(isPartitionColumn()); + LOG.info(String.format("Number of tablets selected %d, rows in tablets %d", pair.first.size(), pair.second)); List<Long> tabletIds = pair.first; double scaleFactor = (double) tbl.getRowCount() / (double) pair.second; // might happen if row count in fe metadata hasn't been updated yet if (Double.isInfinite(scaleFactor) || Double.isNaN(scaleFactor)) { + LOG.warn("Scale factor is infinite or Nan, will set scale factor to 1."); Review Comment: Under what circumstances does this problem occur? ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -85,46 +75,95 @@ public void doExecute() throws Exception { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.info(String.format("Will do sample collection for column %s", col.getName())); Review Comment: ```suggestion LOG.debug(String.format("Will do sample collection for column %s", col.getName())); ``` ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -85,46 +75,95 @@ public void doExecute() throws Exception { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.info(String.format("Will do sample collection for column %s", col.getName())); + Pair<List<Long>, Long> pair = calcActualSampleTablets(isPartitionColumn()); + LOG.info(String.format("Number of tablets selected %d, rows in tablets %d", pair.first.size(), pair.second)); Review Comment: ```suggestion LOG.debug(String.format("Number of tablets selected %d, rows in tablets %d", pair.first.size(), pair.second)); ``` ########## fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java: ########## @@ -174,30 +174,33 @@ public static ColumnStatistic fromResultRow(ResultRow row) { String min = row.get(10); String max = row.get(11); if (min != null && !min.equalsIgnoreCase("NULL")) { - min = new String(Base64.getDecoder().decode(min), - StandardCharsets.UTF_8); - - try { - columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); - columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); - } catch (AnalysisException e) { - LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + min = new String(Base64.getDecoder().decode(min), StandardCharsets.UTF_8); + if (catalogId != 0 && min.equalsIgnoreCase("NULL")) { columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } else { + try { + columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); + columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); + } catch (AnalysisException e) { + LOG.warn("Failed to deserialize column {} min value {}.", col, min, e); + columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } } } else { columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); } if (max != null && !max.equalsIgnoreCase("NULL")) { - - max = new String(Base64.getDecoder().decode(max), - StandardCharsets.UTF_8); - - try { - columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); - columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); - } catch (AnalysisException e) { - LOG.warn("Failed to deserialize column {} max value {}.", col, max, e); + max = new String(Base64.getDecoder().decode(max), StandardCharsets.UTF_8); + if (catalogId != 0 && max.equalsIgnoreCase("NULL")) { Review Comment: ditto ########## fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java: ########## @@ -120,34 +108,64 @@ private boolean isPartitionColumn() { return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName())); } + // Get ordinary column stats. Ordinary column means not partition column. private void getOrdinaryColumnStats() throws Exception { - // An example sql for a column stats: - // INSERT INTO __internal_schema.column_statistics - // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, - // 13002 AS catalog_id, - // 13038 AS db_id, - // 13055 AS tbl_id, - // -1 AS idx_id, - // 'r_regionkey' AS col_id, - // 'NULL' AS part_id, - // COUNT(1) AS row_count, - // NDV(`r_regionkey`) AS ndv, - // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, - // MIN(`r_regionkey`) AS min, - // MAX(`r_regionkey`) AS max, - // 0 AS data_size, - // NOW() FROM `hive`.`tpch100`.`region` StringBuilder sb = new StringBuilder(); - sb.append(ANALYZE_TABLE_TEMPLATE); Map<String, String> params = buildStatsParams("NULL"); - params.put("dataSizeFunction", getDataSizeFunction(col)); - params.put("minFunction", getMinFunction()); - params.put("maxFunction", getMaxFunction()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + params.put("min", getMinFunction()); + params.put("max", getMaxFunction()); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); + Pair<Double, Long> sampleInfo = getSampleInfo(); + params.put("scaleFactor", String.valueOf(sampleInfo.first)); + StringSubstitutor stringSubstitutor; + if (tableSample == null) { + // Do full analyze + LOG.info(String.format("Will do full collection for column %s", col.getName())); + sb.append(COLLECT_COL_STATISTICS); + } else { + // Do sample analyze + LOG.info(String.format("Will do sample collection for column %s", col.getName())); Review Comment: ```suggestion LOG.debug(String.format("Will do sample collection for column %s", col.getName())); ``` ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -85,46 +75,95 @@ public void doExecute() throws Exception { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.info(String.format("Will do sample collection for column %s", col.getName())); + Pair<List<Long>, Long> pair = calcActualSampleTablets(isPartitionColumn()); + LOG.info(String.format("Number of tablets selected %d, rows in tablets %d", pair.first.size(), pair.second)); List<Long> tabletIds = pair.first; double scaleFactor = (double) tbl.getRowCount() / (double) pair.second; // might happen if row count in fe metadata hasn't been updated yet if (Double.isInfinite(scaleFactor) || Double.isNaN(scaleFactor)) { + LOG.warn("Scale factor is infinite or Nan, will set scale factor to 1."); scaleFactor = 1; tabletIds = Collections.emptyList(); + pair.second = tbl.getRowCount(); } String tabletStr = tabletIds.stream() .map(Object::toString) .collect(Collectors.joining(", ")); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) { + // Get basic stats, including min and max. + ResultRow basicStats = collectBasicStat(r); + long rowCount = tbl.getRowCount(); + String min = Base64.getEncoder().encodeToString(basicStats.get(0).getBytes(StandardCharsets.UTF_8)); + String max = Base64.getEncoder().encodeToString(basicStats.get(1).getBytes(StandardCharsets.UTF_8)); + + boolean limitFlag = false; + long rowsToSample = pair.second; Map<String, String> params = new HashMap<>(); params.put("internalDB", FeConstants.INTERNAL_DB_NAME); params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); params.put("catalogId", String.valueOf(catalog.getId())); + params.put("catalogName", catalog.getName()); params.put("dbId", String.valueOf(db.getId())); params.put("tblId", String.valueOf(tbl.getId())); params.put("idxId", String.valueOf(info.indexId)); params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); params.put("dbName", db.getFullName()); params.put("colName", info.colName); params.put("tblName", tbl.getName()); params.put("scaleFactor", String.valueOf(scaleFactor)); - params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("ndvFunction", getNdvFunction(String.valueOf(rowCount))); + params.put("min", min); + params.put("max", max); + params.put("rowCount", String.valueOf(rowCount)); + params.put("type", col.getType().toString()); + params.put("limit", ""); + if (needLimit()) { + // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate + // the scaleFactor. + limitFlag = true; + rowsToSample = Math.min(getSampleRows(), pair.second); + params.put("limit", "limit " + rowsToSample); + params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample)); + } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE)); - // Scalar query only return one row - ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); - job.appendBuf(this, Collections.singletonList(colStatsData)); + String sql; + // Distribution columns don't fit for DUJ1 estimator, use linear estimator. + if (isDistributionColumn()) { + params.put("min", StatisticsUtil.quote(min)); + params.put("max", StatisticsUtil.quote(max)); + sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE); + } else { + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE); + } + LOG.info(String.format("Sample for column [%s]. Total rows [%s], rows to sample [%d], scale factor [%s], " + + "limited [%b], distribute column [%b], partition column [%b], key column [%b]", + col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"), + limitFlag, isDistributionColumn(), isPartitionColumn(), col.isKey())); + runQuery(sql, false); } } + protected ResultRow collectBasicStat(AutoCloseConnectContext context) { + Map<String, String> params = new HashMap<>(); + params.put("dbName", db.getFullName()); + params.put("colName", info.colName); + params.put("tblName", tbl.getName()); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + stmtExecutor = new StmtExecutor(context.connectContext, stringSubstitutor.replace(BASIC_STATS_TEMPLATE)); + return stmtExecutor.executeInternalQuery().get(0); + } + /** * 1. Get stats of each partition * 2. insert partition in batch * 3. calculate column stats based on partition stats */ protected void doFull() throws Exception { + LOG.info(String.format("Will do full collection for column %s", col.getName())); Review Comment: ```suggestion LOG.debug(String.format("Will do full collection for column %s", col.getName())); ``` ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -194,22 +226,66 @@ protected Pair<List<Long>, Long> calcActualSampleTablets() { long tabletId = ids.get(seekTid); sampleTabletIds.add(tabletId); actualSampledRowCount += baseIndex.getTablet(tabletId).getRowCount(true); + if (actualSampledRowCount >= sampleRows && !forPartitionColumn) { + enough = true; + break; + } } - totalRows += p.getBaseIndex().getRowCount(); totalTablet += ids.size(); + if (enough) { + break; + } } // all hit, direct full if (totalRows < sampleRows) { // can't fill full sample rows sampleTabletIds.clear(); - } else if (sampleTabletIds.size() == totalTablet) { - // TODO add limit + } else if (sampleTabletIds.size() == totalTablet && !enough) { sampleTabletIds.clear(); - } else if (!sampleTabletIds.isEmpty()) { - // TODO add limit } return Pair.of(sampleTabletIds, actualSampledRowCount); } + + /** + * For ordinary column (neither key column nor partition column), need to limit sample size to user specified value. + * @return Return true when need to limit. + */ + protected boolean needLimit() { + // Key column is sorted, use limit will cause the ndv not accurate enough, so skip key columns. + if (col.isKey()) { + return false; + } + // Partition column need to scan tablets from all partitions. + if (isPartitionColumn()) { + return false; + } + return true; + } + + protected boolean isDistributionColumn() { Review Comment: I think we can add 2 new interface: `isDistributionColumn(String colName)` and `isPartitionColumn(String colName)` in `TableIf`? So that when implement other type of AnalysisTask, we can just use that interface. ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -194,22 +226,66 @@ protected Pair<List<Long>, Long> calcActualSampleTablets() { long tabletId = ids.get(seekTid); sampleTabletIds.add(tabletId); actualSampledRowCount += baseIndex.getTablet(tabletId).getRowCount(true); + if (actualSampledRowCount >= sampleRows && !forPartitionColumn) { + enough = true; + break; + } } - totalRows += p.getBaseIndex().getRowCount(); totalTablet += ids.size(); + if (enough) { + break; + } } // all hit, direct full if (totalRows < sampleRows) { // can't fill full sample rows sampleTabletIds.clear(); - } else if (sampleTabletIds.size() == totalTablet) { - // TODO add limit + } else if (sampleTabletIds.size() == totalTablet && !enough) { sampleTabletIds.clear(); - } else if (!sampleTabletIds.isEmpty()) { - // TODO add limit } return Pair.of(sampleTabletIds, actualSampledRowCount); } + + /** + * For ordinary column (neither key column nor partition column), need to limit sample size to user specified value. + * @return Return true when need to limit. + */ + protected boolean needLimit() { + // Key column is sorted, use limit will cause the ndv not accurate enough, so skip key columns. + if (col.isKey()) { Review Comment: We may implement `non-sorted` table which data is not sorted by key. So need to take care of it. ########## fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java: ########## @@ -85,46 +75,95 @@ public void doExecute() throws Exception { * 3. insert col stats and partition stats */ protected void doSample() throws Exception { - Pair<List<Long>, Long> pair = calcActualSampleTablets(); + LOG.info(String.format("Will do sample collection for column %s", col.getName())); + Pair<List<Long>, Long> pair = calcActualSampleTablets(isPartitionColumn()); + LOG.info(String.format("Number of tablets selected %d, rows in tablets %d", pair.first.size(), pair.second)); List<Long> tabletIds = pair.first; double scaleFactor = (double) tbl.getRowCount() / (double) pair.second; // might happen if row count in fe metadata hasn't been updated yet if (Double.isInfinite(scaleFactor) || Double.isNaN(scaleFactor)) { + LOG.warn("Scale factor is infinite or Nan, will set scale factor to 1."); scaleFactor = 1; tabletIds = Collections.emptyList(); + pair.second = tbl.getRowCount(); } String tabletStr = tabletIds.stream() .map(Object::toString) .collect(Collectors.joining(", ")); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) { + // Get basic stats, including min and max. + ResultRow basicStats = collectBasicStat(r); + long rowCount = tbl.getRowCount(); + String min = Base64.getEncoder().encodeToString(basicStats.get(0).getBytes(StandardCharsets.UTF_8)); + String max = Base64.getEncoder().encodeToString(basicStats.get(1).getBytes(StandardCharsets.UTF_8)); + + boolean limitFlag = false; + long rowsToSample = pair.second; Map<String, String> params = new HashMap<>(); params.put("internalDB", FeConstants.INTERNAL_DB_NAME); params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); params.put("catalogId", String.valueOf(catalog.getId())); + params.put("catalogName", catalog.getName()); params.put("dbId", String.valueOf(db.getId())); params.put("tblId", String.valueOf(tbl.getId())); params.put("idxId", String.valueOf(info.indexId)); params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); params.put("dbName", db.getFullName()); params.put("colName", info.colName); params.put("tblName", tbl.getName()); params.put("scaleFactor", String.valueOf(scaleFactor)); - params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("ndvFunction", getNdvFunction(String.valueOf(rowCount))); + params.put("min", min); + params.put("max", max); + params.put("rowCount", String.valueOf(rowCount)); + params.put("type", col.getType().toString()); + params.put("limit", ""); + if (needLimit()) { + // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate + // the scaleFactor. + limitFlag = true; + rowsToSample = Math.min(getSampleRows(), pair.second); + params.put("limit", "limit " + rowsToSample); + params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample)); + } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE)); - // Scalar query only return one row - ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); - job.appendBuf(this, Collections.singletonList(colStatsData)); + String sql; + // Distribution columns don't fit for DUJ1 estimator, use linear estimator. + if (isDistributionColumn()) { + params.put("min", StatisticsUtil.quote(min)); + params.put("max", StatisticsUtil.quote(max)); + sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE); + } else { + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE); + } + LOG.info(String.format("Sample for column [%s]. Total rows [%s], rows to sample [%d], scale factor [%s], " Review Comment: ```suggestion LOG.debug(String.format("Sample for column [%s]. Total rows [%s], rows to sample [%d], scale factor [%s], " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org