This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cf68a6485a8 [opt](audit) use one line in audit log and origin
statement in audit table (#52032) (#52205)
cf68a6485a8 is described below
commit cf68a6485a84b432481f2ee6aab64fdb395b1b53
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Jun 24 19:43:32 2025 +0800
[opt](audit) use one line in audit log and origin statement in audit table
(#52032) (#52205)
bp #52032
---
.../java/org/apache/doris/plugin/AuditEvent.java | 116 ++++++++++++---------
.../org/apache/doris/plugin/audit/AuditLoader.java | 66 ++++++------
.../apache/doris/plugin/audit/AuditLogBuilder.java | 6 ++
.../doris/plugin/audit/AuditStreamLoader.java | 2 +
.../java/org/apache/doris/qe/AuditLogHelper.java | 7 +-
.../doris/plugin/audit/AuditLogBuilderTest.java | 24 ++---
6 files changed, 119 insertions(+), 102 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 20c05d982f8..7a2f48d4e6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -42,77 +42,95 @@ public class AuditEvent {
}
@Retention(RetentionPolicy.RUNTIME)
- public static @interface AuditField {
+ public @interface AuditField {
String value() default "";
+
+ String colName() default "";
}
public EventType type;
- // all fields which is about to be audit should be annotated by
"@AuditField"
+ // all fields which is about to be audited should be annotated by
"@AuditField"
// make them all "public" so that easy to visit.
- @AuditField(value = "Timestamp")
+
+ // uuid and time
+ @AuditField(value = "QueryId", colName = "query_id")
+ public String queryId = "";
+ @AuditField(value = "Timestamp", colName = "time")
public long timestamp = -1;
- @AuditField(value = "Client")
+
+ // cs info
+ @AuditField(value = "Client", colName = "client_ip")
public String clientIp = "";
- @AuditField(value = "User")
+ @AuditField(value = "User", colName = "user")
public String user = "";
- @AuditField(value = "Ctl")
+ @AuditField(value = "FeIp", colName = "frontend_ip")
+ public String feIp = "";
+
+ // default ctl and db
+ @AuditField(value = "Ctl", colName = "catalog")
public String ctl = "";
- @AuditField(value = "Db")
+ @AuditField(value = "Db", colName = "db")
public String db = "";
- @AuditField(value = "CommandType")
- public String commandType = "";
- @AuditField(value = "State")
+
+ // query state
+ @AuditField(value = "State", colName = "state")
public String state = "";
- @AuditField(value = "ErrorCode")
+ @AuditField(value = "ErrorCode", colName = "error_code")
public int errorCode = 0;
- @AuditField(value = "ErrorMessage")
+ @AuditField(value = "ErrorMessage", colName = "error_message")
public String errorMessage = "";
- @AuditField(value = "Time(ms)")
+
+ // execution info
+ @AuditField(value = "Time(ms)", colName = "query_time")
public long queryTime = -1;
- @AuditField(value = "ScanBytes")
+ @AuditField(value = "CpuTimeMS", colName = "cpu_time_ms")
+ public long cpuTimeMs = -1;
+ @AuditField(value = "PeakMemoryBytes", colName = "peak_memory_bytes")
+ public long peakMemoryBytes = -1;
+ @AuditField(value = "ScanBytes", colName = "scan_bytes")
public long scanBytes = -1;
- @AuditField(value = "ScanRows")
+ @AuditField(value = "ScanRows", colName = "scan_rows")
public long scanRows = -1;
- @AuditField(value = "ReturnRows")
+ @AuditField(value = "ReturnRows", colName = "return_rows")
public long returnRows = -1;
- @AuditField(value = "StmtId")
- public long stmtId = -1;
- @AuditField(value = "QueryId")
- public String queryId = "";
- @AuditField(value = "IsQuery")
- public boolean isQuery = false;
- @AuditField(value = "IsNereids")
- public boolean isNereids = false;
- @AuditField(value = "FeIp")
- public String feIp = "";
- @AuditField(value = "StmtType")
- public String stmtType = "";
- @AuditField(value = "Stmt")
- public String stmt = "";
- @AuditField(value = "CpuTimeMS")
- public long cpuTimeMs = -1;
- @AuditField(value = "ShuffleSendBytes")
- public long shuffleSendBytes = -1;
- @AuditField(value = "ShuffleSendRows")
+ @AuditField(value = "ShuffleSendRows", colName = "shuffle_send_rows")
public long shuffleSendRows = -1;
- @AuditField(value = "SqlHash")
+ @AuditField(value = "ShuffleSendBytes", colName = "shuffle_send_bytes")
+ public long shuffleSendBytes = -1;
+ @AuditField(value = "ScanBytesFromLocalStorage", colName =
"scan_bytes_from_local_storage")
+ public long scanBytesFromLocalStorage = -1;
+ @AuditField(value = "ScanBytesFromRemoteStorage", colName =
"scan_bytes_from_remote_storage")
+ public long scanBytesFromRemoteStorage = -1;
+
+ @AuditField(value = "FuzzyVariables")
+ public String fuzzyVariables = "";
+
+ // type and digest
+ @AuditField(value = "CommandType")
+ public String commandType = "";
+ @AuditField(value = "StmtType", colName = "stmt_type")
+ public String stmtType = "";
+ @AuditField(value = "StmtId", colName = "stmt_id")
+ public long stmtId = -1;
+ @AuditField(value = "SqlHash", colName = "sql_hash")
public String sqlHash = "";
- @AuditField(value = "PeakMemoryBytes")
- public long peakMemoryBytes = -1;
- @AuditField(value = "SqlDigest")
+ @AuditField(value = "SqlDigest", colName = "sql_digest")
public String sqlDigest = "";
- @AuditField(value = "ComputeGroupName")
- public String cloudClusterName = "";
- @AuditField(value = "WorkloadGroup")
+ @AuditField(value = "IsQuery", colName = "is_query")
+ public boolean isQuery = false;
+ @AuditField(value = "IsNereids", colName = "is_nereids")
+ public boolean isNereids = false;
+
+ // resource
+ @AuditField(value = "WorkloadGroup", colName = "workload_group")
public String workloadGroup = "";
- // note: newly added fields should be always before fuzzyVariables
- @AuditField(value = "FuzzyVariables")
- public String fuzzyVariables = "";
- @AuditField(value = "ScanBytesFromLocalStorage")
- public long scanBytesFromLocalStorage = -1;
- @AuditField(value = "ScanBytesFromRemoteStorage")
- public long scanBytesFromRemoteStorage = -1;
+ @AuditField(value = "ComputeGroupName", colName = "compute_group")
+ public String cloudClusterName = "";
+
+ // stmt should be last one
+ @AuditField(value = "Stmt", colName = "stmt")
+ public String stmt = "";
public long pushToAuditLogQueueTime;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 722ab48669b..c1047bec1b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -46,6 +46,14 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
public static final String AUDIT_LOG_TABLE = "audit_log";
+ // the "\\u001F" and "\\u001E" are used to separate columns and lines in
audit log data
+ public static final String AUDIT_TABLE_COL_SEPARATOR = "\\u001F";
+ public static final String AUDIT_TABLE_LINE_DELIMITER = "\\u001E";
+ // the "\\x1F" and "\\x1E" are used to specified column and line delimiter
in stream load request
+ // which is corresponding to the "\\u001F" and "\\u001E" in audit log data.
+ public static final String AUDIT_TABLE_COL_SEPARATOR_STR = "\\x1F";
+ public static final String AUDIT_TABLE_LINE_DELIMITER_STR = "\\x1E";
+
private StringBuilder auditLogBuffer = new StringBuilder();
private int auditLogNum = 0;
private long lastLoadTimeAuditLog = 0;
@@ -139,40 +147,40 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
// should be same order as InternalSchema.AUDIT_SCHEMA
- logBuffer.append(event.queryId).append("\t");
-
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("\t");
- logBuffer.append(event.clientIp).append("\t");
- logBuffer.append(event.user).append("\t");
- logBuffer.append(event.ctl).append("\t");
- logBuffer.append(event.db).append("\t");
- logBuffer.append(event.state).append("\t");
- logBuffer.append(event.errorCode).append("\t");
- logBuffer.append(event.errorMessage).append("\t");
- logBuffer.append(event.queryTime).append("\t");
- logBuffer.append(event.scanBytes).append("\t");
- logBuffer.append(event.scanRows).append("\t");
- logBuffer.append(event.returnRows).append("\t");
- logBuffer.append(event.shuffleSendRows).append("\t");
- logBuffer.append(event.shuffleSendBytes).append("\t");
- logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
- logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
- logBuffer.append(event.stmtId).append("\t");
- logBuffer.append(event.stmtType).append("\t");
- logBuffer.append(event.isQuery ? 1 : 0).append("\t");
- logBuffer.append(event.isNereids ? 1 : 0).append("\t");
- logBuffer.append(event.feIp).append("\t");
- logBuffer.append(event.cpuTimeMs).append("\t");
- logBuffer.append(event.sqlHash).append("\t");
- logBuffer.append(event.sqlDigest).append("\t");
- logBuffer.append(event.peakMemoryBytes).append("\t");
- logBuffer.append(event.workloadGroup).append("\t");
- logBuffer.append(event.cloudClusterName).append("\t");
+ logBuffer.append(event.queryId).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(TimeUtils.longToTimeStringWithms(event.timestamp)).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.clientIp).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.user).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.ctl).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.db).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.state).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.errorCode).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.errorMessage).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.queryTime).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.scanBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.scanRows).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.returnRows).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.shuffleSendRows).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.shuffleSendBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.scanBytesFromLocalStorage).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.scanBytesFromRemoteStorage).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.stmtId).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.stmtType).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.isQuery ? 1 :
0).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.isNereids ? 1 :
0).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.feIp).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.cpuTimeMs).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.sqlHash).append("AUDIT_TABLE_COL_SEPARATOR");
+ logBuffer.append(event.sqlDigest).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.peakMemoryBytes).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.workloadGroup).append("AUDIT_TABLE_COL_SEPARATOR");
+
logBuffer.append(event.cloudClusterName).append("AUDIT_TABLE_COL_SEPARATOR");
// already trim the query in
org.apache.doris.qe.AuditLogHelper#logAuditLog
String stmt = event.stmt;
if (LOG.isDebugEnabled()) {
LOG.debug("receive audit event with stmt: {}", stmt);
}
- logBuffer.append(stmt).append("\n");
+ logBuffer.append(stmt).append(AUDIT_TABLE_LINE_DELIMITER);
}
// public for external call.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
index 4208d5def2e..94d7973f294 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLogBuilder.java
@@ -121,6 +121,12 @@ public class AuditLogBuilder extends Plugin implements
AuditPlugin {
}
}
+ // replace new line characters with escaped characters to make
sure the stmt in one line
+ if (af.value().equals("Stmt")) {
+ fieldValue = ((String) fieldValue).replace("\n", "\\n")
+ .replace("\r", "\\r");
+ }
+
sb.append("|").append(af.value()).append("=").append(fieldValue);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index 0b70e9591d5..d2576937d98 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -68,6 +68,8 @@ public class AuditStreamLoader {
InternalSchema.AUDIT_SCHEMA.stream().map(c ->
c.getName()).collect(
Collectors.joining(",")));
conn.addRequestProperty("redirect-policy", "random-be");
+ conn.addRequestProperty("column_separator",
AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR);
+ conn.addRequestProperty("line_delimiter",
AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR);
conn.setDoOutput(true);
conn.setDoInput(true);
return conn;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index f1470d444b9..ecfd08aaa71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -101,9 +101,7 @@ public class AuditLogHelper {
int maxLen = GlobalVariable.auditPluginMaxSqlLength;
origStmt = truncateByBytes(origStmt, maxLen, " ... /* truncated.
audit_plugin_max_sql_length=" + maxLen
+ " */");
- return origStmt.replace("\n", "\\n")
- .replace("\t", "\\t")
- .replace("\r", "\\r");
+ return origStmt;
}
private static Optional<String> handleInsertStmt(String origStmt,
StatementBase parsedStmt) {
@@ -134,9 +132,6 @@ public class AuditLogHelper {
Math.min(GlobalVariable.auditPluginMaxInsertStmtLength,
GlobalVariable.auditPluginMaxSqlLength));
origStmt = truncateByBytes(origStmt, maxLen, " ... /* total " +
rowCnt
+ " rows, truncated. audit_plugin_max_insert_stmt_length="
+ maxLen + " */");
- origStmt = origStmt.replace("\n", "\\n")
- .replace("\t", "\\t")
- .replace("\r", "\\r");
return Optional.of(origStmt);
} else {
return Optional.empty();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
index 8c678447c3a..f3e71c248df 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLogBuilderTest.java
@@ -80,12 +80,9 @@ public class AuditLogBuilderTest {
// 4. Test statement with newlines, tabs, carriage returns
String stmtWithSpecialChars = "SELECT *\nFROM table1\tWHERE id =
1\r";
result = AuditLogHelper.handleStmt(stmtWithSpecialChars,
nonInsertStmt);
- Assert.assertTrue("Should escape newlines",
result.contains("\\n"));
- Assert.assertTrue("Should escape tabs", result.contains("\\t"));
- Assert.assertTrue("Should escape carriage returns",
result.contains("\\r"));
- Assert.assertFalse("Should not contain actual newlines",
result.contains("\n"));
- Assert.assertFalse("Should not contain actual tabs",
result.contains("\t"));
- Assert.assertFalse("Should not contain actual carriage returns",
result.contains("\r"));
+ Assert.assertTrue("Should contain actual newlines",
result.contains("\n"));
+ Assert.assertTrue("Should contain actual tabs",
result.contains("\t"));
+ Assert.assertTrue("Should contain actual carriage returns",
result.contains("\r"));
// 5. Test long statement with Chinese characters truncation
String chineseStmt
@@ -118,12 +115,6 @@ public class AuditLogBuilderTest {
String emptyStmt = "";
result = AuditLogHelper.handleStmt(emptyStmt, nonInsertStmt);
Assert.assertEquals("Empty string should remain empty", "",
result);
-
- // 9. Test statement with only special characters
- String specialCharsStmt = "\n\t\r\n\t\r";
- result = AuditLogHelper.handleStmt(specialCharsStmt,
nonInsertStmt);
- Assert.assertEquals("Should escape all special characters",
"\\n\\t\\r\\n\\t\\r", result);
-
} finally {
// Restore original values
GlobalVariable.auditPluginMaxSqlLength = originalMaxSqlLength;
@@ -172,12 +163,9 @@ public class AuditLogBuilderTest {
result = AuditLogHelper.handleStmt(insertWithSpecialChars,
insertStmt);
// Verify special characters are properly escaped
- Assert.assertTrue("Should escape newlines in INSERT",
result.contains("\\n"));
- Assert.assertTrue("Should escape tabs in INSERT",
result.contains("\\t"));
- Assert.assertTrue("Should escape carriage returns in INSERT",
result.contains("\\r"));
- Assert.assertFalse("Should not contain actual newlines",
result.contains("\n"));
- Assert.assertFalse("Should not contain actual tabs",
result.contains("\t"));
- Assert.assertFalse("Should not contain actual carriage returns",
result.contains("\r"));
+ Assert.assertTrue("Should contain actual newlines",
result.contains("\n"));
+ Assert.assertTrue("Should contain actual tabs",
result.contains("\t"));
+ Assert.assertTrue("Should contain actual carriage returns",
result.contains("\r"));
// 4. Test comparison: same length statements, different handling
for INSERT vs non-INSERT
// Create a statement with length between 80-200
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]