This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c4b6d4d839 [enhancement](AuditLoaderPlugin): add audit queue capacity configurat… (#12887) c4b6d4d839 is described below commit c4b6d4d83963af2294882c57816c6814f9825af0 Author: wxy <dut.xian...@gmail.com> AuthorDate: Tue Sep 27 08:50:30 2022 +0800 [enhancement](AuditLoaderPlugin): add audit queue capacity configurat… (#12887) --- fe_plugins/auditloader/src/main/assembly/plugin.conf | 3 +++ .../apache/doris/plugin/audit/AuditLoaderPlugin.java | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf b/fe_plugins/auditloader/src/main/assembly/plugin.conf index 4013920938..14fdd32b98 100755 --- a/fe_plugins/auditloader/src/main/assembly/plugin.conf +++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf @@ -26,6 +26,9 @@ max_batch_interval_sec=60 # the max stmt length to be loaded in audit table, default is 4096 max_stmt_length=4096 +# the capacity of audit queue, default is 1000 +max_queue_size=1000 + # Doris FE host for loading the audit, default is 127.0.0.1:8030. # this should be the host port for stream load frontend_host_port=127.0.0.1:8030 diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index b706c08430..9497af68ee 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -17,6 +17,7 @@ package org.apache.doris.plugin.audit; +import com.google.common.collect.Queues; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; @@ -42,7 +43,6 @@ import java.util.Date; import java.util.Map; import java.util.Properties; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -53,12 +53,13 @@ import java.util.stream.Collectors; public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private final static Logger LOG = LogManager.getLogger(AuditLoaderPlugin.class); - private static SimpleDateFormat DATETIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + private static final ThreadLocal<SimpleDateFormat> dateFormatContainer = ThreadLocal.withInitial( + () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); private StringBuilder auditBuffer = new StringBuilder(); private long lastLoadTime = 0; - private BlockingQueue<AuditEvent> auditEventQueue = new LinkedBlockingDeque<AuditEvent>(1); + private BlockingQueue<AuditEvent> auditEventQueue; private DorisStreamLoader streamLoader; private Thread loadThread; @@ -78,6 +79,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { loadConfig(ctx, info.getProperties()); + this.auditEventQueue = Queues.newLinkedBlockingDeque(conf.maxQueueSize); this.streamLoader = new DorisStreamLoader(conf); this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); this.loadThread.start(); @@ -207,6 +209,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { public static class AuditLoaderConf { public static final String PROP_MAX_BATCH_SIZE = "max_batch_size"; public static final String PROP_MAX_BATCH_INTERVAL_SEC = "max_batch_interval_sec"; + public static final String PROP_MAX_QUEUE_SIZE = "max_queue_size"; public static final String PROP_FRONTEND_HOST_PORT = "frontend_host_port"; public static final String PROP_USER = "user"; public static final String PROP_PASSWORD = "password"; @@ -217,6 +220,7 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { public long maxBatchSize = 50 * 1024 * 1024; public long maxBatchIntervalSec = 60; + public int maxQueueSize = 1000; public String frontendHostPort = "127.0.0.1:8030"; public String user = "root"; public String password = ""; @@ -234,6 +238,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { if (properties.containsKey(PROP_MAX_BATCH_INTERVAL_SEC)) { maxBatchIntervalSec = Long.valueOf(properties.get(PROP_MAX_BATCH_INTERVAL_SEC)); } + if (properties.containsKey(PROP_MAX_QUEUE_SIZE)) { + maxQueueSize = Integer.valueOf(properties.get(PROP_MAX_QUEUE_SIZE)); + } if (properties.containsKey(PROP_FRONTEND_HOST_PORT)) { frontendHostPort = properties.get(PROP_FRONTEND_HOST_PORT); } @@ -282,10 +289,10 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { } } - public static synchronized String longToTimeString(long timeStamp) { + public static String longToTimeString(long timeStamp) { if (timeStamp <= 0L) { return "1900-01-01 00:00:00"; } - return DATETIME_FORMAT.format(new Date(timeStamp)); + return dateFormatContainer.get().format(new Date(timeStamp)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org