This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 31bffdb5fc [enhancement](stats) audit for stats collection #24074
31bffdb5fc is described below

commit 31bffdb5fc307a970880c483227d58795e860799
Author: AKIRA <33112463+kikyou1...@users.noreply.github.com>
AuthorDate: Mon Sep 11 09:26:12 2023 +0900

    [enhancement](stats) audit for stats collection #24074
    
    log stas collection sqls in audit log
---
 .../apache/doris/blockrule/SqlBlockRuleMgr.java    |   8 ++
 .../java/org/apache/doris/qe/AuditLogHelper.java   | 120 +++++++++++++++++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java |  84 +--------------
 .../java/org/apache/doris/qe/StmtExecutor.java     |   2 +
 .../apache/doris/statistics/BaseAnalysisTask.java  |  25 +++--
 5 files changed, 147 insertions(+), 92 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
index 1bc5505edf..ca4e68a6ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SqlBlockUtil;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -224,6 +225,10 @@ public class SqlBlockRuleMgr implements Writable {
      * Match SQL according to rules.
      **/
     public void matchSql(String originSql, String sqlHash, String user) throws 
AnalysisException {
+        if (ConnectContext.get() != null
+                && ConnectContext.get().getSessionVariable().internalSession) {
+            return;
+        }
         // match global rule
         List<SqlBlockRule> globalRules =
                 
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());
@@ -260,6 +265,9 @@ public class SqlBlockRuleMgr implements Writable {
      **/
     public void checkLimitations(Long partitionNum, Long tabletNum, Long 
cardinality, String user)
             throws AnalysisException {
+        if (ConnectContext.get().getSessionVariable().internalSession) {
+            return;
+        }
         // match global rule
         List<SqlBlockRule> globalRules =
                 
nameToSqlBlockRuleMap.values().stream().filter(SqlBlockRule::getGlobal).collect(Collectors.toList());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
new file mode 100644
index 0000000000..40f870eee1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.InsertStmt;
+import org.apache.doris.analysis.Queriable;
+import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.plugin.AuditEvent.EventType;
+import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.service.FrontendOptions;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.context.Context;
+import org.apache.commons.codec.digest.DigestUtils;
+
+public class AuditLogHelper {
+
+    public static void logAuditLog(ConnectContext ctx, String origStmt, 
StatementBase parsedStmt,
+            org.apache.doris.proto.Data.PQueryStatistics statistics, boolean 
printFuzzyVariables) {
+        origStmt = origStmt.replace("\n", " ");
+        // slow query
+        long endTime = System.currentTimeMillis();
+        long elapseMs = endTime - ctx.getStartTime();
+        SpanContext spanContext = 
Span.fromContext(Context.current()).getSpanContext();
+
+        ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
+                .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
+                .setState(ctx.getState().toString())
+                .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : 
ctx.getState().getErrorCode().getCode())
+                .setErrorMessage((ctx.getState().getErrorMessage() == null ? 
"" :
+                        ctx.getState().getErrorMessage().replace("\n", " 
").replace("\t", " ")))
+                .setQueryTime(elapseMs)
+                .setScanBytes(statistics == null ? 0 : 
statistics.getScanBytes())
+                .setScanRows(statistics == null ? 0 : statistics.getScanRows())
+                .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
+                .setPeakMemoryBytes(statistics == null ? 0 : 
statistics.getMaxPeakMemoryBytes())
+                .setReturnRows(ctx.getReturnRows())
+                .setStmtId(ctx.getStmtId())
+                .setQueryId(ctx.queryId() == null ? "NaN" : 
DebugUtil.printId(ctx.queryId()))
+                .setTraceId(spanContext.isValid() ? spanContext.getTraceId() : 
"")
+                .setWorkloadGroup(ctx.getWorkloadGroupName())
+                .setFuzzyVariables(!printFuzzyVariables ? "" : 
ctx.getSessionVariable().printFuzzyVariables());
+
+        if (ctx.getState().isQuery()) {
+            MetricRepo.COUNTER_QUERY_ALL.increase(1L);
+            
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
+            if (ctx.getState().getStateType() == MysqlStateType.ERR
+                    && ctx.getState().getErrType() != 
QueryState.ErrType.ANALYSIS_ERR) {
+                // err query
+                MetricRepo.COUNTER_QUERY_ERR.increase(1L);
+                
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
+            } else if (ctx.getState().getStateType() == MysqlStateType.OK
+                    || ctx.getState().getStateType() == MysqlStateType.EOF) {
+                // ok query
+                MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
+                
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
+
+                if (elapseMs > Config.qe_slow_log_ms) {
+                    String sqlDigest = DigestUtils.md5Hex(((Queriable) 
parsedStmt).toDigest());
+                    ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
+                }
+            }
+            ctx.getAuditEventBuilder().setIsQuery(true);
+            if (ctx.getQueryDetail() != null) {
+                ctx.getQueryDetail().setEventTime(endTime);
+                ctx.getQueryDetail().setEndTime(endTime);
+                ctx.getQueryDetail().setLatency(elapseMs);
+                
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
+                QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
+                ctx.setQueryDetail(null);
+            }
+        } else {
+            ctx.getAuditEventBuilder().setIsQuery(false);
+        }
+        ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
+
+        
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
+
+        // We put origin query stmt at the end of audit log, for parsing the 
log more convenient.
+        if (!ctx.getState().isQuery() && (parsedStmt != null && 
parsedStmt.needAuditEncryption())) {
+            ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
+        } else {
+            if (parsedStmt instanceof InsertStmt && !((InsertStmt) 
parsedStmt).needLoadManager()
+                    && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
+                // INSERT INTO VALUES may be very long, so we only log at most 
1K bytes.
+                int length = Math.min(1024, origStmt.length());
+                ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, 
length));
+            } else {
+                ctx.getAuditEventBuilder().setStmt(origStmt);
+            }
+        }
+        if (!Env.getCurrentEnv().isMaster()) {
+            if (ctx.executor.isForwardToMaster()) {
+                
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
+            }
+        }
+        
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 73c6debb69..80a4eb80e0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.KillStmt;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
-import org.apache.doris.analysis.Queriable;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
@@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -56,17 +54,14 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.minidump.MinidumpUtils;
 import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
-import org.apache.doris.plugin.AuditEvent.EventType;
 import org.apache.doris.proto.Data;
 import org.apache.doris.qe.QueryState.MysqlStateType;
-import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Strings;
 import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanContext;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.context.Context;
 import io.opentelemetry.context.Scope;
@@ -279,84 +274,7 @@ public class ConnectProcessor {
 
     private void auditAfterExec(String origStmt, StatementBase parsedStmt,
                     Data.PQueryStatistics statistics, boolean 
printFuzzyVariables) {
-        origStmt = origStmt.replace("\n", " ");
-        // slow query
-        long endTime = System.currentTimeMillis();
-        long elapseMs = endTime - ctx.getStartTime();
-        SpanContext spanContext = 
Span.fromContext(Context.current()).getSpanContext();
-
-        ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY)
-                .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase()))
-                .setState(ctx.getState().toString())
-                .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : 
ctx.getState().getErrorCode().getCode())
-                .setErrorMessage((ctx.getState().getErrorMessage() == null ? 
"" :
-                        ctx.getState().getErrorMessage().replace("\n", " 
").replace("\t", " ")))
-                .setQueryTime(elapseMs)
-                .setScanBytes(statistics == null ? 0 : 
statistics.getScanBytes())
-                .setScanRows(statistics == null ? 0 : statistics.getScanRows())
-                .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs())
-                .setPeakMemoryBytes(statistics == null ? 0 : 
statistics.getMaxPeakMemoryBytes())
-                .setReturnRows(ctx.getReturnRows())
-                .setStmtId(ctx.getStmtId())
-                .setQueryId(ctx.queryId() == null ? "NaN" : 
DebugUtil.printId(ctx.queryId()))
-                .setTraceId(spanContext.isValid() ? spanContext.getTraceId() : 
"")
-                .setWorkloadGroup(ctx.getWorkloadGroupName())
-                .setFuzzyVariables(!printFuzzyVariables ? "" : 
ctx.getSessionVariable().printFuzzyVariables());
-
-        if (ctx.getState().isQuery()) {
-            MetricRepo.COUNTER_QUERY_ALL.increase(1L);
-            
MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L);
-            if (ctx.getState().getStateType() == MysqlStateType.ERR
-                    && ctx.getState().getErrType() != 
QueryState.ErrType.ANALYSIS_ERR) {
-                // err query
-                MetricRepo.COUNTER_QUERY_ERR.increase(1L);
-                
MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L);
-            } else if (ctx.getState().getStateType() == MysqlStateType.OK
-                    || ctx.getState().getStateType() == MysqlStateType.EOF) {
-                // ok query
-                MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs);
-                
MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs);
-
-                if (elapseMs > Config.qe_slow_log_ms) {
-                    String sqlDigest = DigestUtils.md5Hex(((Queriable) 
parsedStmt).toDigest());
-                    ctx.getAuditEventBuilder().setSqlDigest(sqlDigest);
-                }
-            }
-            ctx.getAuditEventBuilder().setIsQuery(true);
-            if (ctx.getQueryDetail() != null) {
-                ctx.getQueryDetail().setEventTime(endTime);
-                ctx.getQueryDetail().setEndTime(endTime);
-                ctx.getQueryDetail().setLatency(elapseMs);
-                
ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED);
-                QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail());
-                ctx.setQueryDetail(null);
-            }
-        } else {
-            ctx.getAuditEventBuilder().setIsQuery(false);
-        }
-        ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids);
-
-        
ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress());
-
-        // We put origin query stmt at the end of audit log, for parsing the 
log more convenient.
-        if (!ctx.getState().isQuery() && (parsedStmt != null && 
parsedStmt.needAuditEncryption())) {
-            ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql());
-        } else {
-            if (parsedStmt instanceof InsertStmt && !((InsertStmt) 
parsedStmt).needLoadManager()
-                    && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) {
-                // INSERT INTO VALUES may be very long, so we only log at most 
1K bytes.
-                int length = Math.min(1024, origStmt.length());
-                ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, 
length));
-            } else {
-                ctx.getAuditEventBuilder().setStmt(origStmt);
-            }
-        }
-        if (!Env.getCurrentEnv().isMaster()) {
-            if (ctx.executor.isForwardToMaster()) {
-                
ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus());
-            }
-        }
-        
Env.getCurrentAuditEventProcessor().handleAuditEvent(ctx.getAuditEventBuilder().build());
+        AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, 
printFuzzyVariables);
     }
 
     // Process COM_QUERY statement,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3ab4f31623..a4d1e1880a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2622,6 +2622,8 @@ public class StmtExecutor {
                 fetchResultSpan.end();
             }
         } finally {
+            AuditLogHelper.logAuditLog(context, originStmt.toString(), 
parsedStmt, getQueryStatisticsForAuditLog(),
+                    true);
             QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 0323929081..48f2e0e86a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.qe.AuditLogHelper;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
@@ -188,7 +189,7 @@ public abstract class BaseAnalysisTask {
 
     protected void setTaskStateToRunning() {
         Env.getCurrentEnv().getAnalysisManager()
-            .updateTaskStatus(info, AnalysisState.RUNNING, "", 
System.currentTimeMillis());
+                .updateTaskStatus(info, AnalysisState.RUNNING, "", 
System.currentTimeMillis());
     }
 
     public void cancel() {
@@ -228,8 +229,8 @@ public abstract class BaseAnalysisTask {
     @Override
     public String toString() {
         return String.format("Job id [%d], Task id [%d], catalog [%s], db 
[%s], table [%s], column [%s]",
-            info.jobId, info.taskId, catalog.getName(), db.getFullName(), 
tbl.getName(),
-            col == null ? "TableRowCount" : col.getName());
+                info.jobId, info.taskId, catalog.getName(), db.getFullName(), 
tbl.getName(),
+                col == null ? "TableRowCount" : col.getName());
     }
 
     protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) 
throws Exception {
@@ -237,12 +238,18 @@ public abstract class BaseAnalysisTask {
             return;
         }
         LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
-        stmtExecutor.execute();
-        QueryState queryState = stmtExecutor.getContext().getState();
-        if (queryState.getStateType().equals(MysqlStateType.ERR)) {
-            throw new RuntimeException(String.format("Failed to analyze 
%s.%s.%s, error: %s sql: %s",
-                    info.catalogName, info.dbName, info.colName, 
stmtExecutor.getOriginStmt().toString(),
-                    queryState.getErrorMessage()));
+        try {
+            stmtExecutor.execute();
+            QueryState queryState = stmtExecutor.getContext().getState();
+            if (queryState.getStateType().equals(MysqlStateType.ERR)) {
+                throw new RuntimeException(String.format("Failed to analyze 
%s.%s.%s, error: %s sql: %s",
+                        info.catalogName, info.dbName, info.colName, 
stmtExecutor.getOriginStmt().toString(),
+                        queryState.getErrorMessage()));
+            }
+        } finally {
+            AuditLogHelper.logAuditLog(stmtExecutor.getContext(), 
stmtExecutor.getOriginStmt().toString(),
+                    stmtExecutor.getParsedStmt(), 
stmtExecutor.getQueryStatisticsForAuditLog(),
+                    true);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to