This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6297ef10e9 [enhancement](plugin) import audit logs for slow queries 
into a separate table (#14100)
6297ef10e9 is described below

commit 6297ef10e90dee1403d03b2d146acef486696a2a
Author: yongjinhou <109586248+yongjin...@users.noreply.github.com>
AuthorDate: Fri Nov 11 09:06:01 2022 +0800

    [enhancement](plugin) import audit logs for slow queries into a separate 
table (#14100)
    
    * import audit logs for slow queries into a separate table
---
 docs/en/docs/ecosystem/audit-plugin.md             |  42 +++++++-
 docs/zh-CN/docs/ecosystem/audit-plugin.md          |  40 ++++++-
 .../auditloader/src/main/assembly/plugin.conf      |  10 +-
 .../doris/plugin/audit/AuditLoaderPlugin.java      | 118 ++++++++++++++-------
 .../doris/plugin/audit/DorisStreamLoader.java      |  24 +++--
 5 files changed, 183 insertions(+), 51 deletions(-)

diff --git a/docs/en/docs/ecosystem/audit-plugin.md 
b/docs/en/docs/ecosystem/audit-plugin.md
index 559656455c..2e533d07c7 100644
--- a/docs/en/docs/ecosystem/audit-plugin.md
+++ b/docs/en/docs/ecosystem/audit-plugin.md
@@ -52,10 +52,46 @@ You can place this file on an http download server or 
copy(or unzip) it to the s
 
 ### Installation
 
-After deployment is complete, and before installing the plugin, you need to 
create the audit database and tables previously specified in `plugin.conf`. The 
table creation statement is as follows:
+After deployment is complete, and before installing the plugin, you need to 
create the audit database and tables previously specified in `plugin.conf`. The 
database and table creation statement is as follows:
 
 ```
-create table doris_audit_tbl__
+create database doris_audit_db__;
+
+create table doris_audit_db__.doris_audit_log_tbl__
+(
+    query_id varchar(48) comment "Unique query id",
+    `time` datetime not null comment "Query start time",
+    client_ip varchar(32) comment "Client IP",
+    user varchar(64) comment "User name",
+    db varchar(96) comment "Database of this query",
+    state varchar(8) comment "Query result state. EOF, ERR, OK",
+    query_time bigint comment "Query execution time in millisecond",
+    scan_bytes bigint comment "Total scan bytes of this query",
+    scan_rows bigint comment "Total scan rows of this query",
+    return_rows bigint comment "Returned rows of this query",
+    stmt_id int comment "An incremental id of statement",
+    is_query tinyint comment "Is this statemt a query. 1 or 0",
+    frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+    cpu_time_ms bigint comment "Total scan cpu time in millisecond of this 
query",
+    sql_hash varchar(48) comment "Hash value for this query",
+    sql_digest varchar(48) comment "Sql digest for this query",
+    peak_memory_bytes bigint comment "Peak memory bytes used on all backends 
of this query",
+    stmt string comment "The original statement, trimed if longer than 2G "
+) engine=OLAP
+duplicate key(query_id, `time`, client_ip)
+partition by range(`time`) ()
+distributed by hash(query_id) buckets 1
+properties(
+    "dynamic_partition.time_unit" = "DAY",
+    "dynamic_partition.start" = "-30",
+    "dynamic_partition.end" = "3",
+    "dynamic_partition.prefix" = "p",
+    "dynamic_partition.buckets" = "1",
+    "dynamic_partition.enable" = "true",
+    "replication_num" = "3"
+);
+
+create table doris_audit_db__.doris_slow_log_tbl__
 (
     query_id varchar(48) comment "Unique query id",
     `time` datetime not null comment "Query start time",
@@ -71,7 +107,7 @@ create table doris_audit_tbl__
     is_query tinyint comment "Is this statemt a query. 1 or 0",
     frontend_ip varchar(32) comment "Frontend ip of executing this statement",
     cpu_time_ms bigint comment "Total scan cpu time in millisecond of this 
query",
-    sql_hash varchar(50) comment "Hash value for this query",
+    sql_hash varchar(48) comment "Hash value for this query",
     sql_digest varchar(48) comment "Sql digest for this query",
     peak_memory_bytes bigint comment "Peak memory bytes used on all backends 
of this query",
     stmt string comment "The original statement, trimed if longer than 2G"
diff --git a/docs/zh-CN/docs/ecosystem/audit-plugin.md 
b/docs/zh-CN/docs/ecosystem/audit-plugin.md
index e363f50413..aa1d752428 100644
--- a/docs/zh-CN/docs/ecosystem/audit-plugin.md
+++ b/docs/zh-CN/docs/ecosystem/audit-plugin.md
@@ -52,10 +52,46 @@ auditloader 
plugin的配置位于`${DORIS}/fe_plugins/auditloader/src/main/assem
 
 ### 安装
 
-部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建表语句如下:
+部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建库与建表语句如下:
 
 ```
-create table doris_audit_tbl__
+create database doris_audit_db__;
+
+create table doris_audit_db__.doris_audit_log_tbl__
+(
+    query_id varchar(48) comment "Unique query id",
+    `time` datetime not null comment "Query start time",
+    client_ip varchar(32) comment "Client IP",
+    user varchar(64) comment "User name",
+    db varchar(96) comment "Database of this query",
+    state varchar(8) comment "Query result state. EOF, ERR, OK",
+    query_time bigint comment "Query execution time in millisecond",
+    scan_bytes bigint comment "Total scan bytes of this query",
+    scan_rows bigint comment "Total scan rows of this query",
+    return_rows bigint comment "Returned rows of this query",
+    stmt_id int comment "An incremental id of statement",
+    is_query tinyint comment "Is this statemt a query. 1 or 0",
+    frontend_ip varchar(32) comment "Frontend ip of executing this statement",
+    cpu_time_ms bigint comment "Total scan cpu time in millisecond of this 
query",
+    sql_hash varchar(48) comment "Hash value for this query",
+    sql_digest varchar(48) comment "Sql digest for this query",
+    peak_memory_bytes bigint comment "Peak memory bytes used on all backends 
of this query",
+    stmt string comment "The original statement, trimed if longer than 2G"
+) engine=OLAP
+duplicate key(query_id, `time`, client_ip)
+partition by range(`time`) ()
+distributed by hash(query_id) buckets 1
+properties(
+    "dynamic_partition.time_unit" = "DAY",
+    "dynamic_partition.start" = "-30",
+    "dynamic_partition.end" = "3",
+    "dynamic_partition.prefix" = "p",
+    "dynamic_partition.buckets" = "1",
+    "dynamic_partition.enable" = "true",
+    "replication_num" = "3"
+);
+
+create table doris_audit_db__.doris_slow_log_tbl__
 (
     query_id varchar(48) comment "Unique query id",
     `time` datetime not null comment "Query start time",
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf 
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 14fdd32b98..31f7bd3f35 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -36,8 +36,14 @@ frontend_host_port=127.0.0.1:8030
 # Database of the audit table
 database=doris_audit_db__
 
-# Audit table name, to save the audit data.
-table=doris_audit_tbl__
+# Audit table name, to save the audit log data.
+audit_log_table=doris_audit_log_tbl__
+
+# Audit table name, to save the slow log data.
+slow_log_table=doris_slow_log_tbl__
+
+# Whether import slow logs into a separate slow table, default is false
+enable_slow_log=false
 
 # Doris user. This user must have LOAD_PRIV to the audit table.
 user=root
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 9497af68ee..992b35f686 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -24,6 +24,7 @@ import org.apache.doris.plugin.Plugin;
 import org.apache.doris.plugin.PluginContext;
 import org.apache.doris.plugin.PluginException;
 import org.apache.doris.plugin.PluginInfo;
+import org.apache.doris.common.Config;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -56,8 +57,10 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     private static final ThreadLocal<SimpleDateFormat> dateFormatContainer = 
ThreadLocal.withInitial(
             () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
 
-    private StringBuilder auditBuffer = new StringBuilder();
-    private long lastLoadTime = 0;
+    private StringBuilder auditLogBuffer = new StringBuilder();
+    private StringBuilder slowLogBuffer = new StringBuilder();
+    private long lastLoadTimeAuditLog = 0;
+    private long lastLoadTimeSlowLog = 0;
 
     private BlockingQueue<AuditEvent> auditEventQueue;
     private DorisStreamLoader streamLoader;
@@ -75,7 +78,8 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
             if (isInit) {
                 return;
             }
-            this.lastLoadTime = System.currentTimeMillis();
+            this.lastLoadTimeAuditLog = System.currentTimeMillis();
+            this.lastLoadTimeSlowLog = System.currentTimeMillis();
 
             loadConfig(ctx, info.getProperties());
 
@@ -146,28 +150,35 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     }
 
     private void assembleAudit(AuditEvent event) {
-        auditBuffer.append(event.queryId).append("\t");
-        auditBuffer.append(longToTimeString(event.timestamp)).append("\t");
-        auditBuffer.append(event.clientIp).append("\t");
-        auditBuffer.append(event.user).append("\t");
-        auditBuffer.append(event.db).append("\t");
-        auditBuffer.append(event.state).append("\t");
-        auditBuffer.append(event.queryTime).append("\t");
-        auditBuffer.append(event.scanBytes).append("\t");
-        auditBuffer.append(event.scanRows).append("\t");
-        auditBuffer.append(event.returnRows).append("\t");
-        auditBuffer.append(event.stmtId).append("\t");
-        auditBuffer.append(event.isQuery ? 1 : 0).append("\t");
-        auditBuffer.append(event.feIp).append("\t");
-        auditBuffer.append(event.cpuTimeMs).append("\t");
-        auditBuffer.append(event.sqlHash).append("\t");
-        auditBuffer.append(event.sqlDigest).append("\t");
-        auditBuffer.append(event.peakMemoryBytes).append("\t");
+        if (conf.enableSlowLog && event.queryTime > Config.qe_slow_log_ms) {
+            fillLogBuffer(event, slowLogBuffer);
+        }
+        fillLogBuffer(event, auditLogBuffer);
+    }
+
+    private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) {
+        logBuffer.append(event.queryId).append("\t");
+        logBuffer.append(longToTimeString(event.timestamp)).append("\t");
+        logBuffer.append(event.clientIp).append("\t");
+        logBuffer.append(event.user).append("\t");
+        logBuffer.append(event.db).append("\t");
+        logBuffer.append(event.state).append("\t");
+        logBuffer.append(event.queryTime).append("\t");
+        logBuffer.append(event.scanBytes).append("\t");
+        logBuffer.append(event.scanRows).append("\t");
+        logBuffer.append(event.returnRows).append("\t");
+        logBuffer.append(event.stmtId).append("\t");
+        logBuffer.append(event.isQuery ? 1 : 0).append("\t");
+        logBuffer.append(event.feIp).append("\t");
+        logBuffer.append(event.cpuTimeMs).append("\t");
+        logBuffer.append(event.sqlHash).append("\t");
+        logBuffer.append(event.sqlDigest).append("\t");
+        logBuffer.append(event.peakMemoryBytes).append("\t");
         // trim the query to avoid too long
         // use `getBytes().length` to get real byte length
         String stmt = truncateByBytes(event.stmt).replace("\n", " 
").replace("\t", " ");
         LOG.debug("receive audit event with stmt: {}", stmt);
-        auditBuffer.append(stmt).append("\n");
+        logBuffer.append(stmt).append("\n");
     }
 
     private String truncateByBytes(String str) {
@@ -186,21 +197,34 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         return new String(charBuffer.array(), 0, charBuffer.position());
     }
 
-    private void loadIfNecessary(DorisStreamLoader loader) {
-        if (auditBuffer.length() < conf.maxBatchSize && 
System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
-            return;
+    private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) {
+        StringBuilder logBuffer = slowLog ? slowLogBuffer : auditLogBuffer;
+        long lastLoadTime = slowLog ? lastLoadTimeSlowLog : 
lastLoadTimeAuditLog;
+        long currentTime = System.currentTimeMillis();
+        
+        if (logBuffer.length() >= conf.maxBatchSize || currentTime - 
lastLoadTime >= conf.maxBatchIntervalSec * 1000) {         
+            // begin to load
+            try {
+                DorisStreamLoader.LoadResponse response = 
loader.loadBatch(logBuffer, slowLog);
+                LOG.debug("audit loader response: {}", response);
+            } catch (Exception e) {
+                LOG.debug("encounter exception when putting current audit 
batch, discard current batch", e);
+            } finally {
+                // make a new string builder to receive following events.
+                resetLogBufferAndLastLoadTime(currentTime, slowLog);
+            }
         }
 
-        lastLoadTime = System.currentTimeMillis();
-        // begin to load
-        try {
-            DorisStreamLoader.LoadResponse response = 
loader.loadBatch(auditBuffer);
-            LOG.debug("audit loader response: {}", response);
-        } catch (Exception e) {
-            LOG.debug("encounter exception when putting current audit batch, 
discard current batch", e);
-        } finally {
-            // make a new string builder to receive following events.
-            this.auditBuffer = new StringBuilder();
+        return;
+    }
+
+    private void resetLogBufferAndLastLoadTime(long currentTime, boolean 
slowLog) {
+        if (slowLog) {
+            this.slowLogBuffer = new StringBuilder();
+            lastLoadTimeSlowLog = currentTime;
+        } else {
+            this.auditLogBuffer = new StringBuilder();
+            lastLoadTimeAuditLog = currentTime;
         }
 
         return;
@@ -215,6 +239,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         public static final String PROP_PASSWORD = "password";
         public static final String PROP_DATABASE = "database";
         public static final String PROP_TABLE = "table";
+        public static final String PROP_AUDIT_LOG_TABLE = "audit_log_table";
+        public static final String PROP_SLOW_LOG_TABLE = "slow_log_table";
+        public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
         // the max stmt length to be loaded in audit table.
         public static final String MAX_STMT_LENGTH = "max_stmt_length";
 
@@ -225,7 +252,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         public String user = "root";
         public String password = "";
         public String database = "doris_audit_db__";
-        public String table = "doris_audit_tbl__";
+        public String auditLogTable = "doris_audit_log_tbl__";
+        public String slowLogTable = "doris_slow_log_tbl__";
+        public boolean enableSlowLog = false;
         // the identity of FE which run this plugin
         public String feIdentity = "";
         public int max_stmt_length = 4096;
@@ -253,8 +282,18 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                 if (properties.containsKey(PROP_DATABASE)) {
                     database = properties.get(PROP_DATABASE);
                 }
+                // If plugin.conf is not changed, the audit logs are imported 
to previous table
                 if (properties.containsKey(PROP_TABLE)) {
-                    table = properties.get(PROP_TABLE);
+                    auditLogTable = properties.get(PROP_TABLE);
+                }
+                if (properties.containsKey(PROP_AUDIT_LOG_TABLE)) {
+                    auditLogTable = properties.get(PROP_AUDIT_LOG_TABLE);
+                }
+                if (properties.containsKey(PROP_SLOW_LOG_TABLE)) {
+                    slowLogTable = properties.get(PROP_SLOW_LOG_TABLE);
+                }
+                if (properties.containsKey(PROP_ENABLE_SLOW_LOG)) {
+                    enableSlowLog = 
Boolean.valueOf(properties.get(PROP_ENABLE_SLOW_LOG));
                 }
                 if (properties.containsKey(MAX_STMT_LENGTH)) {
                     max_stmt_length = 
Integer.parseInt(properties.get(MAX_STMT_LENGTH));
@@ -278,7 +317,12 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                     AuditEvent event = auditEventQueue.poll(5, 
TimeUnit.SECONDS);
                     if (event != null) {
                         assembleAudit(event);
-                        loadIfNecessary(loader);
+                        // process slow audit logs
+                        if (conf.enableSlowLog) {
+                            loadIfNecessary(loader, true);
+                        }
+                        // process all audit logs
+                        loadIfNecessary(loader, false);
                     }
                 } catch (InterruptedException ie) {
                     LOG.debug("encounter exception when loading current audit 
batch", ie);
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 28763bbdca..844ca04892 100644
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -38,21 +38,25 @@ public class DorisStreamLoader {
     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";;
     private String hostPort;
     private String db;
-    private String tbl;
+    private String auditLogTbl;
+    private String slowLogTbl;
     private String user;
     private String passwd;
-    private String loadUrlStr;
+    private String auditLogLoadUrlStr;
+    private String slowLogLoadUrlStr;
     private String authEncoding;
     private String feIdentity;
 
     public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
         this.hostPort = conf.frontendHostPort;
         this.db = conf.database;
-        this.tbl = conf.table;
+        this.auditLogTbl = conf.auditLogTable;
+        this.slowLogTbl = conf.slowLogTable;
         this.user = conf.user;
         this.passwd = conf.password;
 
-        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+        this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, 
auditLogTbl);
+        this.slowLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, 
slowLogTbl);
         this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
         // currently, FE identity is FE's IP, so we replace the "." in IP to 
make it suitable for label
         this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
@@ -112,9 +116,9 @@ public class DorisStreamLoader {
         return response.toString();
     }
 
-    public LoadResponse loadBatch(StringBuilder sb) {
+    public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
         Calendar calendar = Calendar.getInstance();
-        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+        String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, 
calendar.get(Calendar.DAY_OF_MONTH),
                 calendar.get(Calendar.HOUR_OF_DAY), 
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 feIdentity);
@@ -123,7 +127,13 @@ public class DorisStreamLoader {
         HttpURLConnection beConn = null;
         try {
             // build request and send to fe
-            feConn = getConnection(loadUrlStr, label);
+            if (slowLog) {
+                label = "slow" + label;
+                feConn = getConnection(slowLogLoadUrlStr, label);
+            } else {
+                label = "audit" + label;
+                feConn = getConnection(auditLogLoadUrlStr, label);
+            }
             int status = feConn.getResponseCode();
             // fe send back http response code TEMPORARY_REDIRECT 307 and new 
be location
             if (status != 307) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to