This is an automated email from the ASF dual-hosted git repository. yangzhg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6297ef10e9 [enhancement](plugin) import audit logs for slow queries into a separate table (#14100) 6297ef10e9 is described below commit 6297ef10e90dee1403d03b2d146acef486696a2a Author: yongjinhou <109586248+yongjin...@users.noreply.github.com> AuthorDate: Fri Nov 11 09:06:01 2022 +0800 [enhancement](plugin) import audit logs for slow queries into a separate table (#14100) * import audit logs for slow queries into a separate table --- docs/en/docs/ecosystem/audit-plugin.md | 42 +++++++- docs/zh-CN/docs/ecosystem/audit-plugin.md | 40 ++++++- .../auditloader/src/main/assembly/plugin.conf | 10 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 118 ++++++++++++++------- .../doris/plugin/audit/DorisStreamLoader.java | 24 +++-- 5 files changed, 183 insertions(+), 51 deletions(-) diff --git a/docs/en/docs/ecosystem/audit-plugin.md b/docs/en/docs/ecosystem/audit-plugin.md index 559656455c..2e533d07c7 100644 --- a/docs/en/docs/ecosystem/audit-plugin.md +++ b/docs/en/docs/ecosystem/audit-plugin.md @@ -52,10 +52,46 @@ You can place this file on an http download server or copy(or unzip) it to the s ### Installation -After deployment is complete, and before installing the plugin, you need to create the audit database and tables previously specified in `plugin.conf`. The table creation statement is as follows: +After deployment is complete, and before installing the plugin, you need to create the audit database and tables previously specified in `plugin.conf`. The database and table creation statement is as follows: ``` -create table doris_audit_tbl__ +create database doris_audit_db__; + +create table doris_audit_db__.doris_audit_log_tbl__ +( + query_id varchar(48) comment "Unique query id", + `time` datetime not null comment "Query start time", + client_ip varchar(32) comment "Client IP", + user varchar(64) comment "User name", + db varchar(96) comment "Database of this query", + state varchar(8) comment "Query result state. EOF, ERR, OK", + query_time bigint comment "Query execution time in millisecond", + scan_bytes bigint comment "Total scan bytes of this query", + scan_rows bigint comment "Total scan rows of this query", + return_rows bigint comment "Returned rows of this query", + stmt_id int comment "An incremental id of statement", + is_query tinyint comment "Is this statemt a query. 1 or 0", + frontend_ip varchar(32) comment "Frontend ip of executing this statement", + cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query", + sql_hash varchar(48) comment "Hash value for this query", + sql_digest varchar(48) comment "Sql digest for this query", + peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query", + stmt string comment "The original statement, trimed if longer than 2G " +) engine=OLAP +duplicate key(query_id, `time`, client_ip) +partition by range(`time`) () +distributed by hash(query_id) buckets 1 +properties( + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "1", + "dynamic_partition.enable" = "true", + "replication_num" = "3" +); + +create table doris_audit_db__.doris_slow_log_tbl__ ( query_id varchar(48) comment "Unique query id", `time` datetime not null comment "Query start time", @@ -71,7 +107,7 @@ create table doris_audit_tbl__ is_query tinyint comment "Is this statemt a query. 1 or 0", frontend_ip varchar(32) comment "Frontend ip of executing this statement", cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query", - sql_hash varchar(50) comment "Hash value for this query", + sql_hash varchar(48) comment "Hash value for this query", sql_digest varchar(48) comment "Sql digest for this query", peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query", stmt string comment "The original statement, trimed if longer than 2G" diff --git a/docs/zh-CN/docs/ecosystem/audit-plugin.md b/docs/zh-CN/docs/ecosystem/audit-plugin.md index e363f50413..aa1d752428 100644 --- a/docs/zh-CN/docs/ecosystem/audit-plugin.md +++ b/docs/zh-CN/docs/ecosystem/audit-plugin.md @@ -52,10 +52,46 @@ auditloader plugin的配置位于`${DORIS}/fe_plugins/auditloader/src/main/assem ### 安装 -部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建表语句如下: +部署完成后,安装插件前,需要创建之前在 `plugin.conf` 中指定的审计数据库和表。其中建库与建表语句如下: ``` -create table doris_audit_tbl__ +create database doris_audit_db__; + +create table doris_audit_db__.doris_audit_log_tbl__ +( + query_id varchar(48) comment "Unique query id", + `time` datetime not null comment "Query start time", + client_ip varchar(32) comment "Client IP", + user varchar(64) comment "User name", + db varchar(96) comment "Database of this query", + state varchar(8) comment "Query result state. EOF, ERR, OK", + query_time bigint comment "Query execution time in millisecond", + scan_bytes bigint comment "Total scan bytes of this query", + scan_rows bigint comment "Total scan rows of this query", + return_rows bigint comment "Returned rows of this query", + stmt_id int comment "An incremental id of statement", + is_query tinyint comment "Is this statemt a query. 1 or 0", + frontend_ip varchar(32) comment "Frontend ip of executing this statement", + cpu_time_ms bigint comment "Total scan cpu time in millisecond of this query", + sql_hash varchar(48) comment "Hash value for this query", + sql_digest varchar(48) comment "Sql digest for this query", + peak_memory_bytes bigint comment "Peak memory bytes used on all backends of this query", + stmt string comment "The original statement, trimed if longer than 2G" +) engine=OLAP +duplicate key(query_id, `time`, client_ip) +partition by range(`time`) () +distributed by hash(query_id) buckets 1 +properties( + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "1", + "dynamic_partition.enable" = "true", + "replication_num" = "3" +); + +create table doris_audit_db__.doris_slow_log_tbl__ ( query_id varchar(48) comment "Unique query id", `time` datetime not null comment "Query start time", diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf b/fe_plugins/auditloader/src/main/assembly/plugin.conf index 14fdd32b98..31f7bd3f35 100755 --- a/fe_plugins/auditloader/src/main/assembly/plugin.conf +++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf @@ -36,8 +36,14 @@ frontend_host_port=127.0.0.1:8030 # Database of the audit table database=doris_audit_db__ -# Audit table name, to save the audit data. -table=doris_audit_tbl__ +# Audit table name, to save the audit log data. +audit_log_table=doris_audit_log_tbl__ + +# Audit table name, to save the slow log data. +slow_log_table=doris_slow_log_tbl__ + +# Whether import slow logs into a separate slow table, default is false +enable_slow_log=false # Doris user. This user must have LOAD_PRIV to the audit table. user=root diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 9497af68ee..992b35f686 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -24,6 +24,7 @@ import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginContext; import org.apache.doris.plugin.PluginException; import org.apache.doris.plugin.PluginInfo; +import org.apache.doris.common.Config; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -56,8 +57,10 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { private static final ThreadLocal<SimpleDateFormat> dateFormatContainer = ThreadLocal.withInitial( () -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); - private StringBuilder auditBuffer = new StringBuilder(); - private long lastLoadTime = 0; + private StringBuilder auditLogBuffer = new StringBuilder(); + private StringBuilder slowLogBuffer = new StringBuilder(); + private long lastLoadTimeAuditLog = 0; + private long lastLoadTimeSlowLog = 0; private BlockingQueue<AuditEvent> auditEventQueue; private DorisStreamLoader streamLoader; @@ -75,7 +78,8 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { if (isInit) { return; } - this.lastLoadTime = System.currentTimeMillis(); + this.lastLoadTimeAuditLog = System.currentTimeMillis(); + this.lastLoadTimeSlowLog = System.currentTimeMillis(); loadConfig(ctx, info.getProperties()); @@ -146,28 +150,35 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { } private void assembleAudit(AuditEvent event) { - auditBuffer.append(event.queryId).append("\t"); - auditBuffer.append(longToTimeString(event.timestamp)).append("\t"); - auditBuffer.append(event.clientIp).append("\t"); - auditBuffer.append(event.user).append("\t"); - auditBuffer.append(event.db).append("\t"); - auditBuffer.append(event.state).append("\t"); - auditBuffer.append(event.queryTime).append("\t"); - auditBuffer.append(event.scanBytes).append("\t"); - auditBuffer.append(event.scanRows).append("\t"); - auditBuffer.append(event.returnRows).append("\t"); - auditBuffer.append(event.stmtId).append("\t"); - auditBuffer.append(event.isQuery ? 1 : 0).append("\t"); - auditBuffer.append(event.feIp).append("\t"); - auditBuffer.append(event.cpuTimeMs).append("\t"); - auditBuffer.append(event.sqlHash).append("\t"); - auditBuffer.append(event.sqlDigest).append("\t"); - auditBuffer.append(event.peakMemoryBytes).append("\t"); + if (conf.enableSlowLog && event.queryTime > Config.qe_slow_log_ms) { + fillLogBuffer(event, slowLogBuffer); + } + fillLogBuffer(event, auditLogBuffer); + } + + private void fillLogBuffer(AuditEvent event, StringBuilder logBuffer) { + logBuffer.append(event.queryId).append("\t"); + logBuffer.append(longToTimeString(event.timestamp)).append("\t"); + logBuffer.append(event.clientIp).append("\t"); + logBuffer.append(event.user).append("\t"); + logBuffer.append(event.db).append("\t"); + logBuffer.append(event.state).append("\t"); + logBuffer.append(event.queryTime).append("\t"); + logBuffer.append(event.scanBytes).append("\t"); + logBuffer.append(event.scanRows).append("\t"); + logBuffer.append(event.returnRows).append("\t"); + logBuffer.append(event.stmtId).append("\t"); + logBuffer.append(event.isQuery ? 1 : 0).append("\t"); + logBuffer.append(event.feIp).append("\t"); + logBuffer.append(event.cpuTimeMs).append("\t"); + logBuffer.append(event.sqlHash).append("\t"); + logBuffer.append(event.sqlDigest).append("\t"); + logBuffer.append(event.peakMemoryBytes).append("\t"); // trim the query to avoid too long // use `getBytes().length` to get real byte length String stmt = truncateByBytes(event.stmt).replace("\n", " ").replace("\t", " "); LOG.debug("receive audit event with stmt: {}", stmt); - auditBuffer.append(stmt).append("\n"); + logBuffer.append(stmt).append("\n"); } private String truncateByBytes(String str) { @@ -186,21 +197,34 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { return new String(charBuffer.array(), 0, charBuffer.position()); } - private void loadIfNecessary(DorisStreamLoader loader) { - if (auditBuffer.length() < conf.maxBatchSize && System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) { - return; + private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) { + StringBuilder logBuffer = slowLog ? slowLogBuffer : auditLogBuffer; + long lastLoadTime = slowLog ? lastLoadTimeSlowLog : lastLoadTimeAuditLog; + long currentTime = System.currentTimeMillis(); + + if (logBuffer.length() >= conf.maxBatchSize || currentTime - lastLoadTime >= conf.maxBatchIntervalSec * 1000) { + // begin to load + try { + DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog); + LOG.debug("audit loader response: {}", response); + } catch (Exception e) { + LOG.debug("encounter exception when putting current audit batch, discard current batch", e); + } finally { + // make a new string builder to receive following events. + resetLogBufferAndLastLoadTime(currentTime, slowLog); + } } - lastLoadTime = System.currentTimeMillis(); - // begin to load - try { - DorisStreamLoader.LoadResponse response = loader.loadBatch(auditBuffer); - LOG.debug("audit loader response: {}", response); - } catch (Exception e) { - LOG.debug("encounter exception when putting current audit batch, discard current batch", e); - } finally { - // make a new string builder to receive following events. - this.auditBuffer = new StringBuilder(); + return; + } + + private void resetLogBufferAndLastLoadTime(long currentTime, boolean slowLog) { + if (slowLog) { + this.slowLogBuffer = new StringBuilder(); + lastLoadTimeSlowLog = currentTime; + } else { + this.auditLogBuffer = new StringBuilder(); + lastLoadTimeAuditLog = currentTime; } return; @@ -215,6 +239,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { public static final String PROP_PASSWORD = "password"; public static final String PROP_DATABASE = "database"; public static final String PROP_TABLE = "table"; + public static final String PROP_AUDIT_LOG_TABLE = "audit_log_table"; + public static final String PROP_SLOW_LOG_TABLE = "slow_log_table"; + public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log"; // the max stmt length to be loaded in audit table. public static final String MAX_STMT_LENGTH = "max_stmt_length"; @@ -225,7 +252,9 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { public String user = "root"; public String password = ""; public String database = "doris_audit_db__"; - public String table = "doris_audit_tbl__"; + public String auditLogTable = "doris_audit_log_tbl__"; + public String slowLogTable = "doris_slow_log_tbl__"; + public boolean enableSlowLog = false; // the identity of FE which run this plugin public String feIdentity = ""; public int max_stmt_length = 4096; @@ -253,8 +282,18 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { if (properties.containsKey(PROP_DATABASE)) { database = properties.get(PROP_DATABASE); } + // If plugin.conf is not changed, the audit logs are imported to previous table if (properties.containsKey(PROP_TABLE)) { - table = properties.get(PROP_TABLE); + auditLogTable = properties.get(PROP_TABLE); + } + if (properties.containsKey(PROP_AUDIT_LOG_TABLE)) { + auditLogTable = properties.get(PROP_AUDIT_LOG_TABLE); + } + if (properties.containsKey(PROP_SLOW_LOG_TABLE)) { + slowLogTable = properties.get(PROP_SLOW_LOG_TABLE); + } + if (properties.containsKey(PROP_ENABLE_SLOW_LOG)) { + enableSlowLog = Boolean.valueOf(properties.get(PROP_ENABLE_SLOW_LOG)); } if (properties.containsKey(MAX_STMT_LENGTH)) { max_stmt_length = Integer.parseInt(properties.get(MAX_STMT_LENGTH)); @@ -278,7 +317,12 @@ public class AuditLoaderPlugin extends Plugin implements AuditPlugin { AuditEvent event = auditEventQueue.poll(5, TimeUnit.SECONDS); if (event != null) { assembleAudit(event); - loadIfNecessary(loader); + // process slow audit logs + if (conf.enableSlowLog) { + loadIfNecessary(loader, true); + } + // process all audit logs + loadIfNecessary(loader, false); } } catch (InterruptedException ie) { LOG.debug("encounter exception when loading current audit batch", ie); diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java index 28763bbdca..844ca04892 100644 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java @@ -38,21 +38,25 @@ public class DorisStreamLoader { private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; private String hostPort; private String db; - private String tbl; + private String auditLogTbl; + private String slowLogTbl; private String user; private String passwd; - private String loadUrlStr; + private String auditLogLoadUrlStr; + private String slowLogLoadUrlStr; private String authEncoding; private String feIdentity; public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) { this.hostPort = conf.frontendHostPort; this.db = conf.database; - this.tbl = conf.table; + this.auditLogTbl = conf.auditLogTable; + this.slowLogTbl = conf.slowLogTable; this.user = conf.user; this.passwd = conf.password; - this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); + this.auditLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, auditLogTbl); + this.slowLogLoadUrlStr = String.format(loadUrlPattern, hostPort, db, slowLogTbl); this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); // currently, FE identity is FE's IP, so we replace the "." in IP to make it suitable for label this.feIdentity = conf.feIdentity.replaceAll("\\.", "_"); @@ -112,9 +116,9 @@ public class DorisStreamLoader { return response.toString(); } - public LoadResponse loadBatch(StringBuilder sb) { + public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) { Calendar calendar = Calendar.getInstance(); - String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", + String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), feIdentity); @@ -123,7 +127,13 @@ public class DorisStreamLoader { HttpURLConnection beConn = null; try { // build request and send to fe - feConn = getConnection(loadUrlStr, label); + if (slowLog) { + label = "slow" + label; + feConn = getConnection(slowLogLoadUrlStr, label); + } else { + label = "audit" + label; + feConn = getConnection(auditLogLoadUrlStr, label); + } int status = feConn.getResponseCode(); // fe send back http response code TEMPORARY_REDIRECT 307 and new be location if (status != 307) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org