This is an automated email from the ASF dual-hosted git repository. cambyzju pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new cc1770ca359 [fix](audit) duplicate audit log for multi-statements at the same query (#37933) cc1770ca359 is described below commit cc1770ca359b2d6680c92a51946f69ccd256bf5d Author: camby <camby...@tencent.com> AuthorDate: Wed Jul 17 19:32:41 2024 +0800 [fix](audit) duplicate audit log for multi-statements at the same query (#37933) --- .../java/org/apache/doris/qe/AuditLogHelper.java | 33 ++++---- .../java/org/apache/doris/qe/ConnectProcessor.java | 90 +--------------------- 2 files changed, 21 insertions(+), 102 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java index dcfedc26792..b0a038dbf5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java @@ -25,6 +25,7 @@ import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.service.FrontendOptions; @@ -44,7 +45,13 @@ public class AuditLogHelper { long elapseMs = endTime - ctx.getStartTime(); SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext(); - ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) + AuditEventBuilder auditEventBuilder = ctx.getAuditEventBuilder(); + auditEventBuilder.reset(); + auditEventBuilder.setTimestamp(ctx.getStartTime()) + .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString()) + .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) + .setSqlHash(ctx.getSqlHash()) + .setEventType(EventType.AFTER_QUERY) .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) .setState(ctx.getState().toString()) .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) @@ -78,10 +85,10 @@ public class AuditLogHelper { if (elapseMs > Config.qe_slow_log_ms) { String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); + auditEventBuilder.setSqlDigest(sqlDigest); } } - ctx.getAuditEventBuilder().setIsQuery(true); + auditEventBuilder.setIsQuery(true); if (ctx.getQueryDetail() != null) { ctx.getQueryDetail().setEventTime(endTime); ctx.getQueryDetail().setEndTime(endTime); @@ -91,35 +98,35 @@ public class AuditLogHelper { ctx.setQueryDetail(null); } } else { - ctx.getAuditEventBuilder().setIsQuery(false); + auditEventBuilder.setIsQuery(false); } - ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); + auditEventBuilder.setIsNereids(ctx.getState().isNereids); - ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); + auditEventBuilder.setFeIp(FrontendOptions.getLocalHostAddress()); // We put origin query stmt at the end of audit log, for parsing the log more convenient. if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { - ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); + auditEventBuilder.setStmt(parsedStmt.toSql()); } else { if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. int length = Math.min(1024, origStmt.length()); - ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); + auditEventBuilder.setStmt(origStmt.substring(0, length)); } else { - ctx.getAuditEventBuilder().setStmt(origStmt); + auditEventBuilder.setStmt(origStmt); } } if (!Env.getCurrentEnv().isMaster()) { if (ctx.executor.isForwardToMaster()) { - ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); + auditEventBuilder.setState(ctx.executor.getProxyStatus()); int proxyStatusCode = ctx.executor.getProxyStatusCode(); if (proxyStatusCode != 0) { - ctx.getAuditEventBuilder().setErrorCode(proxyStatusCode); - ctx.getAuditEventBuilder().setErrorMessage(ctx.executor.getProxyErrMsg()); + auditEventBuilder.setErrorCode(proxyStatusCode); + auditEventBuilder.setErrorMessage(ctx.executor.getProxyErrMsg()); } } } - Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(ctx.getAuditEventBuilder().build()); + Env.getCurrentEnv().getWorkloadRuntimeStatusMgr().submitFinishQueryToAudit(auditEventBuilder.build()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index ba4fbfa1fca..d306a533bb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; -import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -34,7 +33,6 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -56,10 +54,8 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.minidump.MinidumpUtils; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.proto.Data; import org.apache.doris.qe.QueryState.MysqlStateType; -import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; @@ -68,7 +64,6 @@ import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; import com.google.common.collect.Maps; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; @@ -276,84 +271,7 @@ public class ConnectProcessor { private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics, boolean printFuzzyVariables) { - origStmt = origStmt.replace("\n", " "); - // slow query - long endTime = System.currentTimeMillis(); - long elapseMs = endTime - ctx.getStartTime(); - SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext(); - - ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) - .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) - .setState(ctx.getState().toString()) - .setErrorCode(ctx.getState().getErrorCode() == null ? 0 : ctx.getState().getErrorCode().getCode()) - .setErrorMessage((ctx.getState().getErrorMessage() == null ? "" : - ctx.getState().getErrorMessage().replace("\n", " ").replace("\t", " "))) - .setQueryTime(elapseMs) - .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) - .setScanRows(statistics == null ? 0 : statistics.getScanRows()) - .setCpuTimeMs(statistics == null ? 0 : statistics.getCpuMs()) - .setPeakMemoryBytes(statistics == null ? 0 : statistics.getMaxPeakMemoryBytes()) - .setReturnRows(ctx.getReturnRows()) - .setStmtId(ctx.getStmtId()) - .setQueryId(ctx.queryId() == null ? "NaN" : DebugUtil.printId(ctx.queryId())) - .setTraceId(spanContext.isValid() ? spanContext.getTraceId() : "") - .setWorkloadGroup(ctx.getWorkloadGroupName()) - .setFuzzyVariables(!printFuzzyVariables ? "" : ctx.getSessionVariable().printFuzzyVariables()); - - if (ctx.getState().isQuery()) { - MetricRepo.COUNTER_QUERY_ALL.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ALL.getOrAdd(ctx.getQualifiedUser()).increase(1L); - if (ctx.getState().getStateType() == MysqlStateType.ERR - && ctx.getState().getErrType() != QueryState.ErrType.ANALYSIS_ERR) { - // err query - MetricRepo.COUNTER_QUERY_ERR.increase(1L); - MetricRepo.USER_COUNTER_QUERY_ERR.getOrAdd(ctx.getQualifiedUser()).increase(1L); - } else if (ctx.getState().getStateType() == MysqlStateType.OK - || ctx.getState().getStateType() == MysqlStateType.EOF) { - // ok query - MetricRepo.HISTO_QUERY_LATENCY.update(elapseMs); - MetricRepo.USER_HISTO_QUERY_LATENCY.getOrAdd(ctx.getQualifiedUser()).update(elapseMs); - - if (elapseMs > Config.qe_slow_log_ms) { - String sqlDigest = DigestUtils.md5Hex(((Queriable) parsedStmt).toDigest()); - ctx.getAuditEventBuilder().setSqlDigest(sqlDigest); - } - } - ctx.getAuditEventBuilder().setIsQuery(true); - if (ctx.getQueryDetail() != null) { - ctx.getQueryDetail().setEventTime(endTime); - ctx.getQueryDetail().setEndTime(endTime); - ctx.getQueryDetail().setLatency(elapseMs); - ctx.getQueryDetail().setState(QueryDetail.QueryMemState.FINISHED); - QueryDetailQueue.addOrUpdateQueryDetail(ctx.getQueryDetail()); - ctx.setQueryDetail(null); - } - } else { - ctx.getAuditEventBuilder().setIsQuery(false); - } - ctx.getAuditEventBuilder().setIsNereids(ctx.getState().isNereids); - - ctx.getAuditEventBuilder().setFeIp(FrontendOptions.getLocalHostAddress()); - - // We put origin query stmt at the end of audit log, for parsing the log more convenient. - if (!ctx.getState().isQuery() && (parsedStmt != null && parsedStmt.needAuditEncryption())) { - ctx.getAuditEventBuilder().setStmt(parsedStmt.toSql()); - } else { - if (parsedStmt instanceof InsertStmt && !((InsertStmt) parsedStmt).needLoadManager() - && ((InsertStmt) parsedStmt).isValuesOrConstantSelect()) { - // INSERT INTO VALUES may be very long, so we only log at most 1K bytes. - int length = Math.min(1024, origStmt.length()); - ctx.getAuditEventBuilder().setStmt(origStmt.substring(0, length)); - } else { - ctx.getAuditEventBuilder().setStmt(origStmt); - } - } - if (!Env.getCurrentEnv().isMaster()) { - if (ctx.executor.isForwardToMaster()) { - ctx.getAuditEventBuilder().setState(ctx.executor.getProxyStatus()); - } - } - AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, null, true); + AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables); } // Process COM_QUERY statement, @@ -372,12 +290,6 @@ public class ConnectProcessor { String sqlHash = DigestUtils.md5Hex(originStmt); ctx.setSqlHash(sqlHash); - ctx.getAuditEventBuilder().reset(); - ctx.getAuditEventBuilder() - .setTimestamp(System.currentTimeMillis()) - .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString()) - .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) - .setSqlHash(ctx.getSqlHash()); List<StatementBase> stmts = null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org