This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 31bffdb5fc [enhancement](stats) audit for stats collection #24074 31bffdb5fc is described below commit 31bffdb5fc307a970880c483227d58795e860799 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Mon Sep 11 09:26:12 2023 +0900 [enhancement](stats) audit for stats collection #24074 log stas collection sqls in audit log --- .../apache/doris/blockrule/SqlBlockRuleMgr.java | 8 ++ .../java/org/apache/doris/qe/AuditLogHelper.java | 120 +++++++++++++++++++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 84 +-------------- .../java/org/apache/doris/qe/StmtExecutor.java | 2 + .../apache/doris/statistics/BaseAnalysisTask.java | 25 +++-- 5 files changed, 147 insertions(+), 92 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java index 1bc5505edf..ca4e68a6ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java @@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SqlBlockUtil; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -224,6 +225,10 @@ public class SqlBlockRuleMgr implements Writable { * Match SQL according to rules. **/ public void matchSql(String originSql, String sqlHash, String user) throws AnalysisException { + if (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().internalSession) { + return; + } // match global rule List<SqlBlockRule> globalRules = nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList()); @@ -260,6 +265,9 @@ public class SqlBlockRuleMgr implements Writable { **/ public void checkLimitations(Long partitionNum, Long tabletNum, Long cardinality, String user) throws AnalysisException { + if (ConnectContext.get().getSessionVariable().internalSession) { + return; + } // match global rule List<SqlBlockRule> globalRules = nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java new file mode 100644 index 0000000000..40f870eee1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.analysis.InsertStmt; +import org.apache.doris.analysis.Queriable; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.catalog.Env; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.plugin.AuditEvent.EventType; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.service.FrontendOptions; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import org.apache.commons.codec.digest.DigestUtils; + +public class AuditLogHelper { + + public static void logAuditLog(ConnectContext ctx, String origStmt, StatementBase parsedStmt, + org.apache.doris.proto.Data.PQueryStatistics statistics, boolean printFuzzyVariables) { + origStmt = origStmt.replace("\n", " "); + // slow query + long endTime = System.currentTimeMillis(); + long elapseMs = endTime - ctx.getStartTime(); + SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext(); + + ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) + .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) + .setState(ctx.getState().toString()) + .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) + .setErrorMessage((ctx.getState().getErrorMessage() == null ? "" : + ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " "))) + .setQueryTime(elapseMs) + .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) + .setScanRows(statistics == null ? 0 : statistics.getScanRows()) + .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) + .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes()) + .setReturnRows(ctx.getReturnRows()) + .setStmtId(ctx.getStmtId()) + .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())) + .setTraceId(spanContext.isValid() ? spanContext.getTraceId() : "") + .setWorkloadGroup(ctx.getWorkloadGroupName()) + .setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables()); + + if (ctx.getState().isQuery()) { + MetricRepo.COUNTER_QUERY_ALL.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); + if (ctx.getState().getStateType() == MysqlStateType.ERR + && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { + // err query + MetricRepo.COUNTER_QUERY_ERR.increase(1L); + MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); + } else if (ctx.getState().getStateType() == MysqlStateType.OK + || ctx.getState().getStateType() == MysqlStateType.EOF) { + // ok query + MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); + MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); + + if (elapseMs > Config.qe_slow_log_ms) { + String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); + ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); + } + } + ctx.getAuditEventBuilder().setIsQuery(true); + if (ctx.getQueryDetail() != null) { + ctx.getQueryDetail().setEventTime(endTime); + ctx.getQueryDetail().setEndTime(endTime); + ctx.getQueryDetail().setLatency(elapseMs); + ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED); + QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail()); + ctx.setQueryDetail(null); + } + } else { + ctx.getAuditEventBuilder().setIsQuery(false); + } + ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); + + ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); + + // We put origin query stmt at the end of audit log, for parsing the log more convenient. + if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { + ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); + } else { + if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() + && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { + // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. + int length = Math.min(1024, origStmt.length()); + ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); + } else { + ctx.getAuditEventBuilder().setStmt(origStmt); + } + } + if (!Env.getCurrentEnv().isMaster()) { + if (ctx.executor.isForwardToMaster()) { + ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); + } + } + Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 73c6debb69..80a4eb80e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; -import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -56,17 +54,14 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.proto.Data; import org.apache.doris.qe.QueryState.MysqlStateType; -import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -279,84 +274,7 @@ public class ConnectProcessor { private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics, boolean printFuzzyVariables) { - origStmt = origStmt.replace("\n", " "); - // slow query - long endTime = System.currentTimeMillis(); - long elapseMs = endTime - ctx.getStartTime(); - SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext(); - - ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) - .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) - .setState(ctx.getState().toString()) - .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) - .setErrorMessage((ctx.getState().getErrorMessage() == null ? "" : - ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " "))) - .setQueryTime(elapseMs) - .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) - .setScanRows(statistics == null ? 0 : statistics.getScanRows()) - .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) - .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes()) - .setReturnRows(ctx.getReturnRows()) - .setStmtId(ctx.getStmtId()) - .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())) - .setTraceId(spanContext.isValid() ? spanContext.getTraceId() : "") - .setWorkloadGroup(ctx.getWorkloadGroupName()) - .setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables()); - - if (ctx.getState().isQuery()) { - MetricRepo.COUNTER_QUERY_ALL.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); - if (ctx.getState().getStateType() == MysqlStateType.ERR - && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { - // err query - MetricRepo.COUNTER_QUERY_ERR.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); - } else if (ctx.getState().getStateType() == MysqlStateType.OK - || ctx.getState().getStateType() == MysqlStateType.EOF) { - // ok query - MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); - MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); - - if (elapseMs > Config.qe_slow_log_ms) { - String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); - } - } - ctx.getAuditEventBuilder().setIsQuery(true); - if (ctx.getQueryDetail() != null) { - ctx.getQueryDetail().setEventTime(endTime); - ctx.getQueryDetail().setEndTime(endTime); - ctx.getQueryDetail().setLatency(elapseMs); - ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED); - QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail()); - ctx.setQueryDetail(null); - } - } else { - ctx.getAuditEventBuilder().setIsQuery(false); - } - ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); - - ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); - - // We put origin query stmt at the end of audit log, for parsing the log more convenient. - if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { - ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); - } else { - if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() - && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { - // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. - int length = Math.min(1024, origStmt.length()); - ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); - } else { - ctx.getAuditEventBuilder().setStmt(origStmt); - } - } - if (!Env.getCurrentEnv().isMaster()) { - if (ctx.executor.isForwardToMaster()) { - ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); - } - } - Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build()); + AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables); } // Process COM_QUERY statement, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3ab4f31623..a4d1e1880a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2622,6 +2622,8 @@ public class StmtExecutor { fetchResultSpan.end(); } } finally { + AuditLogHelper.logAuditLog(context, originStmt.toString(), parsedStmt, getQueryStatisticsForAuditLog(), + true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 0323929081..48f2e0e86a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -188,7 +189,7 @@ public abstract class BaseAnalysisTask { protected void setTaskStateToRunning() { Env.getCurrentEnv().getAnalysisManager() - .updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis()); + .updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis()); } public void cancel() { @@ -228,8 +229,8 @@ public abstract class BaseAnalysisTask { @Override public String toString() { return String.format("Job id [%d], Task id [%d], catalog [%s], db [%s], table [%s], column [%s]", - info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(), - col == null ? "TableRowCount" : col.getName()); + info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(), + col == null ? "TableRowCount" : col.getName()); } protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception { @@ -237,12 +238,18 @@ public abstract class BaseAnalysisTask { return; } LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt()); - stmtExecutor.execute(); - QueryState queryState = stmtExecutor.getContext().getState(); - if (queryState.getStateType().equals(MysqlStateType.ERR)) { - throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", - info.catalogName, info.dbName, info.colName, stmtExecutor.getOriginStmt().toString(), - queryState.getErrorMessage())); + try { + stmtExecutor.execute(); + QueryState queryState = stmtExecutor.getContext().getState(); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s", + info.catalogName, info.dbName, info.colName, stmtExecutor.getOriginStmt().toString(), + queryState.getErrorMessage())); + } + } finally { + AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(), + stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), + true); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org