This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2cdb8e20305607a069672bdc97bdcff9e6714966 Author: wxy <dut.xian...@gmail.com> AuthorDate: Wed Dec 7 13:54:21 2022 +0800 [fix](audit) fix duplicate audit log. (#14246) fix duplicate audit log. --- .../org/apache/doris/parser/DorisSqlSeparator.g4} | 65 +++++---- .../org/apache/doris/common/util/SqlUtils.java | 56 +++++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 162 ++++++++++++--------- .../java/org/apache/doris/qe/StmtExecutor.java | 8 +- .../apache/doris/common/util/SqlUtilsTest.java} | 34 ++--- .../org/apache/doris/qe/ConnectProcessorTest.java | 86 +++++++++++ 6 files changed, 286 insertions(+), 125 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java b/fe/fe-core/src/main/antlr4/org/apache/doris/parser/DorisSqlSeparator.g4 similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java copy to fe/fe-core/src/main/antlr4/org/apache/doris/parser/DorisSqlSeparator.g4 index 6690883209..ef9bc65a6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/parser/DorisSqlSeparator.g4 @@ -15,33 +15,38 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; - -import com.google.common.base.Strings; - -public class SqlUtils { - public static String escapeUnquote(String ident) { - return ident.replaceAll("``", "`"); - } - - public static String getIdentSql(String ident) { - StringBuilder sb = new StringBuilder(); - sb.append('`'); - for (char ch : ident.toCharArray()) { - if (ch == '`') { - sb.append("``"); - } else { - sb.append(ch); - } - } - sb.append('`'); - return sb.toString(); - } - - public static String escapeQuota(String str) { - if (Strings.isNullOrEmpty(str)) { - return str; - } - return str.replaceAll("\"", "\\\\\""); - } -} +grammar DorisSqlSeparator; + +statements : statement (SEPARATOR statement)* EOF ; + +statement + : (comment | string | quoteIdentifier | ws | someText)+ + | // empty statement + ; + +quoteIdentifier + : '`' (~('`') | '``')* '`' + ; + +string + : SINGLE_QUOTE_STRING + | DOUBLE_QUOTE_STRING + ; + +comment + : TRADITIONAL_COMMENT + | END_OF_LINE_COMMENT + ; + +ws: WHITESPACE+; +someText: NON_SEPARATOR+; + +WHITESPACE: ' ' | '\t' | '\f' | LINE_TERMINATOR; +SINGLE_QUOTE_STRING: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''; +DOUBLE_QUOTE_STRING: '"' ( ~('"'|'\\') | ('\\' .) )* '"'; +TRADITIONAL_COMMENT: '/*' .*? '*/' ; +END_OF_LINE_COMMENT: '--' (~[\r\n])* LINE_TERMINATOR? ; +NON_SEPARATOR: (~';'); +SEPARATOR: ';'; + +fragment LINE_TERMINATOR: '\r' | '\n' | '\r\n'; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java index 6690883209..096f94a4af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java @@ -17,7 +17,20 @@ package org.apache.doris.common.util; +import org.apache.doris.parser.DorisSqlSeparatorLexer; +import org.apache.doris.parser.DorisSqlSeparatorParser; + import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.ParserRuleContext; +import org.antlr.v4.runtime.atn.PredictionMode; +import org.antlr.v4.runtime.misc.ParseCancellationException; +import org.antlr.v4.runtime.tree.ParseTree; + +import java.util.Collections; +import java.util.List; public class SqlUtils { public static String escapeUnquote(String ident) { @@ -44,4 +57,47 @@ public class SqlUtils { } return str.replaceAll("\"", "\\\\\""); } + + public static List<String> splitMultiStmts(String sql) { + DorisSqlSeparatorLexer lexer = new DorisSqlSeparatorLexer(CharStreams.fromString(sql)); + CommonTokenStream tokenStream = new CommonTokenStream(lexer); + DorisSqlSeparatorParser parser = new DorisSqlSeparatorParser(tokenStream); + ParserRuleContext tree; + + try { + // first, try parsing with potentially faster SLL mode + parser.getInterpreter().setPredictionMode(PredictionMode.SLL); + tree = parser.statements(); + } catch (ParseCancellationException ex) { + // if we fail, parse with LL mode + tokenStream.seek(0); + parser.reset(); + + parser.getInterpreter().setPredictionMode(PredictionMode.LL); + tree = parser.statement(); + } + + DorisSqlSeparatorParser.StatementsContext stmt = (DorisSqlSeparatorParser.StatementsContext) tree; + List<String> singleStmtList = Lists.newArrayList(); + for (DorisSqlSeparatorParser.StatementContext statementContext : stmt.statement()) { + if (!isEmptySql(statementContext)) { + singleStmtList.add(statementContext.getText()); + } + } + + return Collections.unmodifiableList(singleStmtList); + } + + private static boolean isEmptySql(DorisSqlSeparatorParser.StatementContext statementContext) { + if (statementContext.children == null) { + return true; + } + for (ParseTree child : statementContext.children) { + if (!(child instanceof DorisSqlSeparatorParser.CommentContext) + && !(child instanceof DorisSqlSeparatorParser.WsContext)) { + return false; + } + } + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index a3e78fab70..2715aa3a6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -36,11 +36,11 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.metric.MetricRepo; @@ -61,7 +61,6 @@ import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; @@ -171,12 +170,14 @@ public class ConnectProcessor { } private void auditAfterExec(String origStmt, StatementBase parsedStmt, Data.PQueryStatistics statistics) { + origStmt = origStmt.replace("\n", " "); // slow query long endTime = System.currentTimeMillis(); long elapseMs = endTime - ctx.getStartTime(); SpanContext spanContext = Span.fromContext(Context.current()).getSpanContext(); ctx.getAuditEventBuilder().setEventType(EventType.AFTER_QUERY) + .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) .setState(ctx.getState().toString()).setQueryTime(elapseMs) .setScanBytes(statistics == null ? 0 : statistics.getScanBytes()) .setScanRows(statistics == null ? 0 : statistics.getScanRows()) @@ -254,94 +255,117 @@ public class ConnectProcessor { .setTimestamp(System.currentTimeMillis()) .setClientIp(ctx.getMysqlChannel().getRemoteHostPortString()) .setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser())) - .setDb(ClusterNamespace.getNameFromFullName(ctx.getDatabase())) .setSqlHash(ctx.getSqlHash()); - StatementBase parsedStmt = null; - List<Pair<StatementBase, Data.PQueryStatistics>> auditInfoList = Lists.newArrayList(); - boolean alreadyAddedToAuditInfoList = false; - try { - List<StatementBase> stmts = null; - Exception nereidsParseException = null; - // ctx could be null in some unit tests - if (ctx != null && ctx.getSessionVariable().isEnableNereidsPlanner()) { - NereidsParser nereidsParser = new NereidsParser(); - try { - stmts = nereidsParser.parseSQL(originStmt); - } catch (Exception e) { - // TODO: We should catch all exception here until we support all query syntax. - nereidsParseException = e; - LOG.info(" Fallback to stale planner." - + " Nereids cannot process this statement: \"{}\".", originStmt); - } + + Exception nereidsParseException = null; + List<StatementBase> stmts = null; + + if (ctx.getSessionVariable().isEnableNereidsPlanner()) { + try { + stmts = new NereidsParser().parseSQL(originStmt); + } catch (Exception e) { + // TODO: We should catch all exception here until we support all query syntax. + nereidsParseException = e; + LOG.info(" Fallback to stale planner." + + " Nereids cannot process this statement: \"{}\".", originStmt); } - // stmts == null when Nereids cannot planner this query or Nereids is disabled. - if (stmts == null) { + } + + // stmts == null when Nereids cannot planner this query or Nereids is disabled. + if (stmts == null) { + try { stmts = parse(originStmt); + } catch (Throwable throwable) { + // Parse sql failed, audit it and return + handleQueryException(throwable, originStmt, null, null); + return; } - for (int i = 0; i < stmts.size(); ++i) { - alreadyAddedToAuditInfoList = false; - ctx.getState().reset(); - if (i > 0) { - ctx.resetReturnRows(); - } - parsedStmt = stmts.get(i); - if (parsedStmt instanceof SelectStmt && nereidsParseException != null - && ctx.getSessionVariable().isEnableNereidsPlanner() - && !ctx.getSessionVariable().enableFallbackToOriginalPlanner) { - throw new Exception(String.format("nereids cannot anaylze sql, and fall-back disabled: %s", - parsedStmt.toSql()), nereidsParseException); - } - parsedStmt.setOrigStmt(new OriginStatement(originStmt, i)); - parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); - executor = new StmtExecutor(ctx, parsedStmt); - ctx.setExecutor(executor); - executor.execute(); + } + List<String> origSingleStmtList = null; + // if stmts.size() > 1, split originStmt to multi singleStmts + if (stmts.size() > 1) { + try { + origSingleStmtList = SqlUtils.splitMultiStmts(originStmt); + } catch (Exception ignore) { + LOG.warn("Try to parse multi origSingleStmt failed, originStmt: \"{}\"", originStmt); + } + } + + boolean usingOrigSingleStmt = origSingleStmtList != null && origSingleStmtList.size() == stmts.size(); + for (int i = 0; i < stmts.size(); ++i) { + String auditStmt = usingOrigSingleStmt ? origSingleStmtList.get(i) : originStmt; + + ctx.getState().reset(); + if (i > 0) { + ctx.resetReturnRows(); + } + + StatementBase parsedStmt = stmts.get(i); + if (parsedStmt instanceof SelectStmt && nereidsParseException != null + && ctx.getSessionVariable().isEnableNereidsPlanner() + && !ctx.getSessionVariable().enableFallbackToOriginalPlanner) { + Exception exception = new Exception( + String.format("nereids cannot anaylze sql, and fall-back disabled: %s", + parsedStmt.toSql()), nereidsParseException); + // audit it and break + handleQueryException(exception, auditStmt, null, null); + break; + } + + parsedStmt.setOrigStmt(new OriginStatement(originStmt, i)); + parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); + executor = new StmtExecutor(ctx, parsedStmt); + ctx.setExecutor(executor); + + try { + executor.execute(); if (i != stmts.size() - 1) { ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS; finalizeCommand(); } - auditInfoList.add(Pair.of(executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog())); - alreadyAddedToAuditInfoList = true; + auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog()); + // execute failed, skip remaining stmts + if (ctx.getState().getStateType() == MysqlStateType.ERR) { + break; + } + } catch (Throwable throwable) { + handleQueryException(throwable, auditStmt, executor.getParsedStmt(), + executor.getQueryStatisticsForAuditLog()); + // execute failed, skip remaining stmts + break; + } finally { + executor.addProfileToSpan(); } - } catch (IOException e) { + + } + + } + + // Use a handler for exception to avoid big try catch block which is a little hard to understand + private void handleQueryException(Throwable throwable, String origStmt, + StatementBase parsedStmt, Data.PQueryStatistics statistics) { + if (throwable instanceof IOException) { // Client failed. - LOG.warn("Process one query failed because IOException: ", e); + LOG.warn("Process one query failed because IOException: ", throwable); ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris process failed"); - } catch (UserException e) { - LOG.warn("Process one query failed because.", e); - ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage()); - // set is as ANALYSIS_ERR so that it won't be treated as a query failure. + } else if (throwable instanceof UserException) { + LOG.warn("Process one query failed because.", throwable); + ctx.getState().setError(((UserException) throwable).getMysqlErrorCode(), throwable.getMessage()); + // set it as ANALYSIS_ERR so that it won't be treated as a query failure. ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); - } catch (Throwable e) { + } else { // Catch all throwable. // If reach here, maybe palo bug. - LOG.warn("Process one query failed because unknown reason: ", e); + LOG.warn("Process one query failed because unknown reason: ", throwable); ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, - e.getClass().getSimpleName() + ", msg: " + e.getMessage()); + throwable.getClass().getSimpleName() + ", msg: " + throwable.getMessage()); if (parsedStmt instanceof KillStmt) { // ignore kill stmt execute err(not monitor it) ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } } - - // that means execute some statement failed - if (!alreadyAddedToAuditInfoList && executor != null) { - auditInfoList.add(Pair.of(executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog())); - } - - // audit after exec, analysis query would not be recorded - if (!auditInfoList.isEmpty()) { - for (Pair<StatementBase, Data.PQueryStatistics> audit : auditInfoList) { - auditAfterExec(originStmt.replace("\n", " "), audit.first, audit.second); - } - } else if (QueryState.ErrType.ANALYSIS_ERR != ctx.getState().getErrType()) { - // auditInfoList can be empty if we encounter error. - auditAfterExec(originStmt.replace("\n", " "), null, null); - } - if (executor != null) { - executor.addProfileToSpan(); - } + auditAfterExec(origStmt, parsedStmt, statistics); } // analyze the origin stmt and return multi-statements diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5e3f8bed4f..3acaa2c824 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -421,8 +421,11 @@ public class StmtExecutor implements ProfileWriter { plannerProfile.setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); - context.setQueryId(queryId); + // set isQuery first otherwise this state will be lost if some error occurs + if (parsedStmt instanceof QueryStmt || parsedStmt instanceof LogicalPlanAdapter) { + context.getState().setIsQuery(true); + } try { if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt) @@ -478,7 +481,6 @@ public class StmtExecutor implements ProfileWriter { } if (parsedStmt instanceof QueryStmt || parsedStmt instanceof LogicalPlanAdapter) { - context.getState().setIsQuery(true); if (!parsedStmt.isExplain()) { // sql/sqlHash block try { @@ -1771,7 +1773,7 @@ public class StmtExecutor implements ProfileWriter { if (!statisticsForAuditLog.hasScanRows()) { statisticsForAuditLog.setScanRows(0L); } - if (statisticsForAuditLog.hasReturnedRows()) { + if (!statisticsForAuditLog.hasReturnedRows()) { statisticsForAuditLog.setReturnedRows(0L); } if (!statisticsForAuditLog.hasCpuMs()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/SqlUtilsTest.java similarity index 54% copy from fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java copy to fe/fe-core/src/test/java/org/apache/doris/common/util/SqlUtilsTest.java index 6690883209..6c52d7242f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/SqlUtils.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/SqlUtilsTest.java @@ -17,31 +17,19 @@ package org.apache.doris.common.util; -import com.google.common.base.Strings; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; -public class SqlUtils { - public static String escapeUnquote(String ident) { - return ident.replaceAll("``", "`"); - } +import java.util.List; - public static String getIdentSql(String ident) { - StringBuilder sb = new StringBuilder(); - sb.append('`'); - for (char ch : ident.toCharArray()) { - if (ch == '`') { - sb.append("``"); - } else { - sb.append(ch); - } - } - sb.append('`'); - return sb.toString(); - } +public class SqlUtilsTest { - public static String escapeQuota(String str) { - if (Strings.isNullOrEmpty(str)) { - return str; - } - return str.replaceAll("\"", "\\\\\""); + @Test + public void testSplitMultiStmts() { + List<String> stmtList = SqlUtils.splitMultiStmts("select `AD``D` from t1 where a = 1;" + + "explain graph select `AD``D` from t1 where a = 1;"); + Assertions.assertTrue(stmtList.size() == 2); + Assertions.assertTrue("select `AD``D` from t1 where a = 1".equals(stmtList.get(0))); + Assertions.assertTrue("explain graph select `AD``D` from t1 where a = 1".equals(stmtList.get(1))); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index 6283378535..0b62673a3b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -48,6 +48,7 @@ public class ConnectProcessorTest { private static ByteBuffer pingPacket; private static ByteBuffer quitPacket; private static ByteBuffer queryPacket; + private static ByteBuffer multiQueryPacket; private static ByteBuffer fieldListPacket; private static AuditEventBuilder auditBuilder = new AuditEventBuilder(); private static ConnectContext myContext; @@ -89,6 +90,14 @@ public class ConnectProcessorTest { queryPacket = serializer.toByteBuffer(); } // CHECKSTYLE IGNORE THIS LINE + // Multi query packet + { // CHECKSTYLE IGNORE THIS LINE + MysqlSerializer serializer = MysqlSerializer.newInstance(); + serializer.writeInt1(3); + serializer.writeEofString("select * from a;select * from b;drop table a"); + multiQueryPacket = serializer.toByteBuffer(); + } // CHECKSTYLE IGNORE THIS LINE + // Field list packet { // CHECKSTYLE IGNORE THIS LINE MysqlSerializer serializer = MysqlSerializer.newInstance(); @@ -108,6 +117,7 @@ public class ConnectProcessorTest { pingPacket.clear(); quitPacket.clear(); queryPacket.clear(); + multiQueryPacket.clear(); fieldListPacket.clear(); // Mock MysqlChannel channel = new MysqlChannel(socketChannel); @@ -385,6 +395,82 @@ public class ConnectProcessorTest { Assert.assertTrue(myContext.getState().toResponsePacket() instanceof MysqlErrPacket); } + @Test + public void testMultiQuery(@Mocked StmtExecutor executor) throws Exception { + ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); + + ConnectProcessor processor = new ConnectProcessor(ctx); + + // Mock statement executor + new Expectations() { + { + executor.getQueryStatisticsForAuditLog(); + minTimes = 0; + result = statistics; + + executor.execute(); + times = 3; + } + }; + + processor.processOnce(); + Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); + Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.OK); + Assert.assertEquals("drop table a", ctx.getAuditEventBuilder().build().stmt); + } + + @Test + public void testMultiQueryFail1(@Mocked StmtExecutor executor) throws Exception { + ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); + + ConnectProcessor processor = new ConnectProcessor(ctx); + + // Mock statement executor + new Expectations() { + { + executor.getQueryStatisticsForAuditLog(); + minTimes = 0; + result = statistics; + + executor.execute(); + times = 1; + result = new IOException("Fail"); + } + }; + + processor.processOnce(); + Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); + Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR); + Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state); + Assert.assertEquals("select * from a", ctx.getAuditEventBuilder().build().stmt); + } + + @Test + public void testMultiQueryFail2(@Mocked StmtExecutor executor) throws Exception { + ConnectContext ctx = initMockContext(mockChannel(multiQueryPacket), AccessTestUtil.fetchAdminCatalog()); + + ConnectProcessor processor = new ConnectProcessor(ctx); + + // Mock statement executor + new Expectations() { + { + executor.getQueryStatisticsForAuditLog(); + minTimes = 0; + result = statistics; + + executor.execute(); + result = null; + result = new IOException("Fail"); + } + }; + + processor.processOnce(); + Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); + Assert.assertEquals(myContext.getState().getStateType(), QueryState.MysqlStateType.ERR); + Assert.assertEquals(QueryState.MysqlStateType.ERR.name(), ctx.getAuditEventBuilder().build().state); + Assert.assertEquals("select * from b", ctx.getAuditEventBuilder().build().stmt); + } + @Test public void testFieldList() throws Exception { ConnectContext ctx = initMockContext(mockChannel(fieldListPacket), AccessTestUtil.fetchAdminCatalog()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org