This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 38430219e62 [fix](audit) serialize audit loader batch assembly (#65107)
38430219e62 is described below
commit 38430219e625b491f47920bf9925c687117c8076
Author: shuke <[email protected]>
AuthorDate: Thu Jul 2 10:17:23 2026 +0800
[fix](audit) serialize audit loader batch assembly (#65107)
### What problem does this PR solve?
`AuditLoader.loadIfNecessary()` is synchronized and resets
`auditLogBuffer` after stream load, but `assembleAudit()` appended
events to the same shared `StringBuilder` without holding the same
monitor.
When `call flush_audit_log()` forces a load while the audit loader
worker is assembling a new event, the worker may append to the old
buffer after the payload has already been materialized for stream load
and before `resetBatch()` replaces the buffer. That event is then
neither included in the current load nor retained for the next load.
This can make `flush_audit_log()` miss audit events that have already
reached the audit plugin pipeline, which matches the flaky
`test_audit_log_queue_time` symptom.
Related: DORIS-25958
### Check List
- [x] Added test
- [ ] This is a behavior change and it is documented
- [ ] This is a new feature and it is documented
- [ ] This needs upgrade
- [ ] This needs downgrade
### Release note
None
### Testing
- `git diff --check`
- Not run: `run-fe-ut.sh --run
org.apache.doris.plugin.audit.AuditLoaderTest` because local
`thirdparty/installed/bin/protoc` is missing, and `fe/AGENTS.md`
requires stopping FE build/test in that case.
---
.../org/apache/doris/plugin/audit/AuditLoader.java | 3 +-
.../apache/doris/plugin/audit/AuditLoaderTest.java | 82 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 6f43ecdd574..6a09fdfa1fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -140,7 +140,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
}
- private void assembleAudit(AuditEvent event) {
+ private synchronized void assembleAudit(AuditEvent event) {
fillLogBuffer(event, auditLogBuffer);
++auditLogNum;
}
@@ -287,4 +287,3 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
}
}
}
-
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java
new file mode 100644
index 00000000000..63bab9c3f95
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/plugin/audit/AuditLoaderTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin.audit;
+
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.plugin.AuditEvent;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AuditLoaderTest {
+
+ @Test
+ public void testAssembleAuditIsSerializedWithLoadLock() throws Exception {
+ AuditLoader auditLoader = new AuditLoader();
+ AuditEvent auditEvent = new AuditEvent.AuditEventBuilder()
+ .setQueryId("query-in-shared-monitor-test")
+ .setTimestamp(1L)
+ .setStmt("select 1")
+ .build();
+
+ CountDownLatch started = new CountDownLatch(1);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ Thread assembleThread = new Thread(() -> {
+ started.countDown();
+ try {
+ Deencapsulation.invoke(auditLoader, "assembleAudit",
auditEvent);
+ } catch (Throwable t) {
+ error.set(t);
+ }
+ });
+
+ synchronized (auditLoader) {
+ assembleThread.start();
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(waitForBlocked(assembleThread));
+
Assert.assertFalse(getAuditLogBuffer(auditLoader).contains(auditEvent.queryId));
+ }
+
+ assembleThread.join(5000);
+ Assert.assertFalse(assembleThread.isAlive());
+ if (error.get() != null) {
+ throw new AssertionError("failed to assemble audit event",
error.get());
+ }
+
Assert.assertTrue(getAuditLogBuffer(auditLoader).contains(auditEvent.queryId));
+ }
+
+ private boolean waitForBlocked(Thread thread) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + 5000;
+ while (System.currentTimeMillis() < deadline) {
+ if (thread.getState() == Thread.State.BLOCKED) {
+ return true;
+ }
+ Thread.sleep(10);
+ }
+ return false;
+ }
+
+ private String getAuditLogBuffer(AuditLoader auditLoader) {
+ StringBuilder buffer = Deencapsulation.getField(auditLoader,
"auditLogBuffer");
+ return buffer.toString();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]