This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 03a73d3f39c [Refactor](statistics)Refactor of statistics buildConnectContext. (#41553) 03a73d3f39c is described below commit 03a73d3f39c33939d714cce8da7a6b23e47ea5fe Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Mon Oct 14 09:39:46 2024 +0800 [Refactor](statistics)Refactor of statistics buildConnectContext. (#41553) 1. Remove useless session variable enableScanRunSerial. 2. Refactor StatisticsUtil.buildConnectContext 3. Remove nested call of buildConnectContext. 4. Not record metrics for internal statistics sql. --- .../doris/datasource/jdbc/JdbcExternalTable.java | 2 +- .../java/org/apache/doris/qe/AuditLogHelper.java | 22 ++-- .../java/org/apache/doris/qe/ConnectProcessor.java | 2 +- .../org/apache/doris/statistics/AnalysisJob.java | 2 +- .../apache/doris/statistics/BaseAnalysisTask.java | 4 +- .../apache/doris/statistics/OlapAnalysisTask.java | 115 ++++++++++----------- .../doris/statistics/util/StatisticsUtil.java | 14 +-- .../doris/statistics/OlapAnalysisTaskTest.java | 6 +- 8 files changed, 84 insertions(+), 83 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java index 495311bc087..d60006af709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java @@ -119,7 +119,7 @@ public class JdbcExternalTable extends ExternalTable { params.put("tblName", name); switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) { case JdbcResource.MYSQL: - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false, false)) { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL); StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index c9016940c0d..0bbc37cd47b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -211,8 +211,10 @@ public class AuditLogHelper { .setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables()); if (ctx.getState().isQuery()) { - MetricRepo.COUNTER_QUERY_ALL.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); + if (!ctx.getSessionVariable().internalSession) { + MetricRepo.COUNTER_QUERY_ALL.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); + } try { if (Config.isCloudMode()) { cloudCluster = ctx.getCloudCluster(false); @@ -225,15 +227,19 @@ public class AuditLogHelper { if (ctx.getState().getStateType() == MysqlStateType.ERR && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { // err query - MetricRepo.COUNTER_QUERY_ERR.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); - MetricRepo.increaseClusterQueryErr(cloudCluster); + if (!ctx.getSessionVariable().internalSession) { + MetricRepo.COUNTER_QUERY_ERR.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); + MetricRepo.increaseClusterQueryErr(cloudCluster); + } } else if (ctx.getState().getStateType() == MysqlStateType.OK || ctx.getState().getStateType() == MysqlStateType.EOF) { // ok query - MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); - MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); - MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); + if (!ctx.getSessionVariable().internalSession) { + MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); + MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); + MetricRepo.updateClusterQueryLatency(cloudCluster, elapseMs); + } if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 42724eebbe1..6e495217c29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -251,7 +251,7 @@ public abstract class ConnectProcessor { } public void executeQuery(String originStmt) throws Exception { - if (MetricRepo.isInit) { + if (MetricRepo.isInit && !ctx.getSessionVariable().internalSession) { MetricRepo.COUNTER_REQUEST_ALL.increase(1L); if (Config.isCloudMode()) { try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index a2a094cbeca..acfd457d8a2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -126,7 +126,7 @@ public class AnalysisJob { values.add(data.toSQL(true)); } insertStmt += values.toString(); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { stmtExecutor = new StmtExecutor(r.connectContext, insertStmt); executeWithExceptionOnFail(stmtExecutor); } catch (Exception t) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 91cddb333f4..7d637d97e06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -481,7 +481,7 @@ public abstract class BaseAnalysisTask { protected void runQuery(String sql) { long startTime = System.currentTimeMillis(); String queryId = ""; - try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) { + try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext(false)) { stmtExecutor = new StmtExecutor(a.connectContext, sql); ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); // Update index row count after analyze. @@ -509,7 +509,7 @@ public abstract class BaseAnalysisTask { } protected void runInsert(String sql) throws Exception { - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { stmtExecutor = new StmtExecutor(r.connectContext, sql); try { stmtExecutor.execute(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index 5abf6120f01..99a29c601db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -32,7 +32,6 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; @@ -113,63 +112,60 @@ public class OlapAnalysisTask extends BaseAnalysisTask { String tabletStr = tabletIds.stream() .map(Object::toString) .collect(Collectors.joining(", ")); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext( - info.jobType.equals(JobType.SYSTEM), false)) { - // Get basic stats, including min and max. - ResultRow basicStats = collectBasicStat(r); - String min = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 0 - ? basicStats.get(0) : null); - String max = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 1 - ? basicStats.get(1) : null); + // Get basic stats, including min and max. + ResultRow basicStats = collectBasicStat(); + String min = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 0 + ? basicStats.get(0) : null); + String max = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 1 + ? basicStats.get(1) : null); - boolean limitFlag = false; - long rowsToSample = pair.second; - Map<String, String> params = buildSqlParams(); - params.put("scaleFactor", String.valueOf(scaleFactor)); - params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); - params.put("ndvFunction", getNdvFunction(String.valueOf(totalRowCount))); - params.put("min", StatisticsUtil.quote(min)); - params.put("max", StatisticsUtil.quote(max)); - params.put("rowCount", String.valueOf(totalRowCount)); - params.put("type", col.getType().toString()); - params.put("limit", ""); - if (needLimit()) { - // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate - // the scaleFactor. - rowsToSample = Math.min(getSampleRows(), pair.second); - // Empty table doesn't need to limit. - if (rowsToSample > 0) { - limitFlag = true; - params.put("limit", "limit " + rowsToSample); - params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample)); - } + boolean limitFlag = false; + long rowsToSample = pair.second; + Map<String, String> params = buildSqlParams(); + params.put("scaleFactor", String.valueOf(scaleFactor)); + params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr)); + params.put("ndvFunction", getNdvFunction(String.valueOf(totalRowCount))); + params.put("min", StatisticsUtil.quote(min)); + params.put("max", StatisticsUtil.quote(max)); + params.put("rowCount", String.valueOf(totalRowCount)); + params.put("type", col.getType().toString()); + params.put("limit", ""); + if (needLimit()) { + // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate + // the scaleFactor. + rowsToSample = Math.min(getSampleRows(), pair.second); + // Empty table doesn't need to limit. + if (rowsToSample > 0) { + limitFlag = true; + params.put("limit", "limit " + rowsToSample); + params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample)); } - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql; - if (useLinearAnalyzeTemplate()) { - // For single unique key, use count as ndv. - if (isSingleUniqueKey()) { - params.put("ndvFunction", String.valueOf(totalRowCount)); - } else { - params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})"); - } - sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE); + } + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql; + if (useLinearAnalyzeTemplate()) { + // For single unique key, use count as ndv. + if (isSingleUniqueKey()) { + params.put("ndvFunction", String.valueOf(totalRowCount)); } else { - params.put("dataSizeFunction", getDataSizeFunction(col, true)); - params.put("subStringColName", getStringTypeColName(col)); - sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE); + params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})"); } - LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], " - + "limited [{}], distribute column [{}], partition column [{}], key column [{}], " - + "is single unique key [{}]", - col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"), - limitFlag, tbl.isDistributionColumn(col.getName()), - tbl.isPartitionColumn(col.getName()), col.isKey(), isSingleUniqueKey()); - runQuery(sql); + sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE); + } else { + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + params.put("subStringColName", getStringTypeColName(col)); + sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE); } + LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], " + + "limited [{}], distribute column [{}], partition column [{}], key column [{}], " + + "is single unique key [{}]", + col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"), + limitFlag, tbl.isDistributionColumn(col.getName()), + tbl.isPartitionColumn(col.getName()), col.isKey(), isSingleUniqueKey()); + runQuery(sql); } - protected ResultRow collectBasicStat(AutoCloseConnectContext context) { + protected ResultRow collectBasicStat() { // Agg table value columns has no zone map. // For these columns, skip collecting min and max value to avoid scan whole table. if (((OlapTable) tbl).getKeysType().equals(KeysType.AGG_KEYS) && !col.isKey()) { @@ -181,14 +177,17 @@ public class OlapAnalysisTask extends BaseAnalysisTask { Map<String, String> params = buildSqlParams(); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); String sql = stringSubstitutor.replace(BASIC_STATS_TEMPLATE); - stmtExecutor = new StmtExecutor(context.connectContext, sql); - ResultRow resultRow = stmtExecutor.executeInternalQuery().get(0); - if (LOG.isDebugEnabled()) { - LOG.debug("Cost time in millisec: " + (System.currentTimeMillis() - startTime) - + " Min max SQL: " + sql + " QueryId: " + DebugUtil.printId(stmtExecutor.getContext().queryId())); + ResultRow resultRow = null; + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) { + stmtExecutor = new StmtExecutor(r.connectContext, sql); + resultRow = stmtExecutor.executeInternalQuery().get(0); + if (LOG.isDebugEnabled()) { + LOG.debug("Cost time in millisec: " + (System.currentTimeMillis() - startTime) + " Min max SQL: " + + sql + " QueryId: " + DebugUtil.printId(stmtExecutor.getContext().queryId())); + } + // Release the reference to stmtExecutor, reduce memory usage. + stmtExecutor = null; } - // Release the reference to stmtExecutor, reduce memory usage. - stmtExecutor = null; return resultRow; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 39904871ed7..d51281eb0e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -147,7 +147,7 @@ public class StatisticsUtil { return Collections.emptyList(); } boolean useFileCacheForStat = (enableFileCache && Config.allow_analyze_statistics_info_polluting_file_cache); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false, useFileCacheForStat)) { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(useFileCacheForStat)) { if (Config.isCloudMode()) { try { r.connectContext.getCloudCluster(); @@ -164,7 +164,7 @@ public class StatisticsUtil { public static QueryState execUpdate(String sql) throws Exception { StmtExecutor stmtExecutor = null; - AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(); + AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false); try { stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); @@ -202,11 +202,7 @@ public class StatisticsUtil { return PartitionColumnStatistic.fromResultRow(resultBatches); } - public static AutoCloseConnectContext buildConnectContext() { - return buildConnectContext(false, false); - } - - public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boolean useFileCacheForStat) { + public static AutoCloseConnectContext buildConnectContext(boolean useFileCacheForStat) { ConnectContext connectContext = new ConnectContext(); SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; @@ -247,7 +243,7 @@ public class StatisticsUtil { } public static void analyze(StatementBase statementBase) throws UserException { - try (AutoCloseConnectContext r = buildConnectContext()) { + try (AutoCloseConnectContext r = buildConnectContext(false)) { Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), r.connectContext); statementBase.analyze(analyzer); } @@ -495,7 +491,7 @@ public class StatisticsUtil { LOG.info("there are no available backends"); return false; } - try (AutoCloseConnectContext r = buildConnectContext()) { + try (AutoCloseConnectContext r = buildConnectContext(false)) { try { r.connectContext.getCloudCluster(); } catch (ComputeGroupException e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java index b4e835c1bc5..565005dde76 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/OlapAnalysisTaskTest.java @@ -118,7 +118,7 @@ public class OlapAnalysisTaskTest { } @Mock - public ResultRow collectBasicStat(AutoCloseConnectContext context) { + public ResultRow collectBasicStat() { List<String> values = Lists.newArrayList(); values.add("1"); values.add("2"); @@ -198,7 +198,7 @@ public class OlapAnalysisTaskTest { } @Mock - public ResultRow collectBasicStat(AutoCloseConnectContext context) { + public ResultRow collectBasicStat() { List<String> values = Lists.newArrayList(); values.add("1"); values.add("2"); @@ -279,7 +279,7 @@ public class OlapAnalysisTaskTest { } @Mock - public ResultRow collectBasicStat(AutoCloseConnectContext context) { + public ResultRow collectBasicStat() { List<String> values = Lists.newArrayList(); values.add("1"); values.add("2"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org