This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new a0e85ae559 [enhancement](AuditLoaderPlugin): add audit queue capacity 
configurat… (#12887)
a0e85ae559 is described below

commit a0e85ae559e3a011d1d160ebbc776393cfff711a
Author: wxy <dut.xian...@gmail.com>
AuthorDate: Tue Sep 27 08:50:30 2022 +0800

    [enhancement](AuditLoaderPlugin): add audit queue capacity configurat… 
(#12887)
---
 fe_plugins/auditloader/src/main/assembly/plugin.conf    |  3 +++
 .../apache/doris/plugin/audit/AuditLoaderPlugin.java    | 17 ++++++++++++-----
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf 
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 4013920938..14fdd32b98 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -26,6 +26,9 @@ max_batch_interval_sec=60
 # the max stmt length to be loaded in audit table, default is 4096
 max_stmt_length=4096
 
+# the capacity of audit queue, default is 1000
+max_queue_size=1000
+
 # Doris FE host for loading the audit, default is 127.0.0.1:8030.
 # this should be the host port for stream load
 frontend_host_port=127.0.0.1:8030
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index ea5c05ee66..8ebd5ac6dd 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.plugin.audit;
 
+import com.google.common.collect.Queues;
 import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditPlugin;
 import org.apache.doris.plugin.Plugin;
@@ -42,7 +43,6 @@ import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -53,12 +53,13 @@ import java.util.stream.Collectors;
 public class AuditLoaderPlugin extends Plugin implements AuditPlugin {
     private final static Logger LOG = 
LogManager.getLogger(AuditLoaderPlugin.class);
 
-    private static SimpleDateFormat DATETIME_FORMAT = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    private static final ThreadLocal<SimpleDateFormat> dateFormatContainer = 
ThreadLocal.withInitial(
+            () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
 
     private StringBuilder auditBuffer = new StringBuilder();
     private long lastLoadTime = 0;
 
-    private BlockingQueue<AuditEvent> auditEventQueue = new 
LinkedBlockingDeque<AuditEvent>(1);
+    private BlockingQueue<AuditEvent> auditEventQueue;
     private DorisStreamLoader streamLoader;
     private Thread loadThread;
 
@@ -78,6 +79,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
 
             loadConfig(ctx, info.getProperties());
 
+            this.auditEventQueue = 
Queues.newLinkedBlockingDeque(conf.maxQueueSize);
             this.streamLoader = new DorisStreamLoader(conf);
             this.loadThread = new Thread(new LoadWorker(this.streamLoader), 
"audit loader thread");
             this.loadThread.start();
@@ -206,6 +208,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     public static class AuditLoaderConf {
         public static final String PROP_MAX_BATCH_SIZE = "max_batch_size";
         public static final String PROP_MAX_BATCH_INTERVAL_SEC = 
"max_batch_interval_sec";
+        public static final String PROP_MAX_QUEUE_SIZE = "max_queue_size";
         public static final String PROP_FRONTEND_HOST_PORT = 
"frontend_host_port";
         public static final String PROP_USER = "user";
         public static final String PROP_PASSWORD = "password";
@@ -216,6 +219,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
 
         public long maxBatchSize = 50 * 1024 * 1024;
         public long maxBatchIntervalSec = 60;
+        public int maxQueueSize = 1000;
         public String frontendHostPort = "127.0.0.1:8030";
         public String user = "root";
         public String password = "";
@@ -233,6 +237,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                 if (properties.containsKey(PROP_MAX_BATCH_INTERVAL_SEC)) {
                     maxBatchIntervalSec = 
Long.valueOf(properties.get(PROP_MAX_BATCH_INTERVAL_SEC));
                 }
+                if (properties.containsKey(PROP_MAX_QUEUE_SIZE)) {
+                    maxQueueSize = 
Integer.valueOf(properties.get(PROP_MAX_QUEUE_SIZE));
+                }
                 if (properties.containsKey(PROP_FRONTEND_HOST_PORT)) {
                     frontendHostPort = properties.get(PROP_FRONTEND_HOST_PORT);
                 }
@@ -281,10 +288,10 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         }
     }
 
-    public static synchronized String longToTimeString(long timeStamp) {
+    public static String longToTimeString(long timeStamp) {
         if (timeStamp <= 0L) {
             return "1900-01-01 00:00:00";
         }
-        return DATETIME_FORMAT.format(new Date(timeStamp));
+        return dateFormatContainer.get().format(new Date(timeStamp));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to