This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a41629   [Audit] Support builtin load audit function to record 
successful bulk load job (#5183)
7a41629 is described below

commit 7a41629fbe8995c5688ba3221470f72b862028e3
Author: caiconghui <55968745+caicong...@users.noreply.github.com>
AuthorDate: Wed Mar 3 17:01:02 2021 +0800

     [Audit] Support builtin load audit function to record successful bulk load 
job (#5183)
    
    * [Audit] Support builtin load audit function to record successful bulk 
load job
    
    Co-authored-by: caiconghui [蔡聪辉] <caicong...@xiaomi.com>
---
 .../java/org/apache/doris/common/AuditLog.java     |   5 +
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  |  43 +++++++
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  75 +++++++-----
 .../java/org/apache/doris/plugin/AuditEvent.java   |   3 +-
 .../org/apache/doris/plugin/LoadAuditEvent.java    | 130 +++++++++++++++++++++
 .../java/org/apache/doris/qe/AuditLogBuilder.java  |  96 ++++++++++-----
 7 files changed, 293 insertions(+), 61 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
index 0e1b609..235d783 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java
@@ -25,6 +25,7 @@ public class AuditLog {
 
     public static final AuditLog SLOW_AUDIT = new AuditLog("audit.slow_query");
     public static final AuditLog QUERY_AUDIT = new AuditLog("audit.query");
+    public static final AuditLog LOAD_AUDIT = new AuditLog("audit.load");
 
     private Logger logger;
 
@@ -36,6 +37,10 @@ public class AuditLog {
         return SLOW_AUDIT;
     }
 
+    public static AuditLog getLoadAudit() {
+        return LOAD_AUDIT;
+    }
+
     public AuditLog(String auditName) {
         logger = LogManager.getLogger(auditName);   
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 3e1ed6d..91000e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -103,7 +103,7 @@ public class Config extends ConfigBase {
      */
     @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + 
"/log";
     @ConfField public static int audit_log_roll_num = 90;
-    @ConfField public static String[] audit_log_modules = {"slow_query", 
"query"};
+    @ConfField public static String[] audit_log_modules = {"slow_query", 
"query", "load"};
     @ConfField(mutable = true) public static long qe_slow_log_ms = 5000;
     @ConfField public static String audit_log_roll_interval = "DAY";
     @ConfField public static String audit_log_delete_age = "30d";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index 35334ed..77d7e6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DataDescription;
 import org.apache.doris.analysis.LoadStmt;
@@ -39,6 +40,8 @@ import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
+import org.apache.doris.plugin.AuditEvent;
+import org.apache.doris.plugin.LoadAuditEvent;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
@@ -361,4 +364,44 @@ public abstract class BulkLoadJob extends LoadJob {
     public UserIdentity getUserInfo() {
         return userInfo;
     }
+
+    @Override
+    protected void auditFinishedLoadJob() {
+        try {
+            String dbName = getDb().getFullName();
+            String tableListName = StringUtils.join(getTableNames(), ",");
+            List<String> filePathList = Lists.newArrayList();
+            for (List<BrokerFileGroup> brokerFileGroups : 
fileGroupAggInfo.getAggKeyToFileGroups().values()) {
+                for (BrokerFileGroup brokerFileGroup : brokerFileGroups) {
+                    filePathList.add("(" + 
StringUtils.join(brokerFileGroup.getFilePaths(), ",") + ")");
+                }
+            }
+            String filePathListName = StringUtils.join(filePathList, ",");
+            String brokerUserName = getBrokerUserName();
+            AuditEvent auditEvent = new 
LoadAuditEvent.AuditEventBuilder().setEventType(AuditEvent.EventType.LOAD_SUCCEED)
+                    
.setJobId(id).setLabel(label).setLoadType(jobType.name()).setDb(dbName).setTableList(tableListName)
+                    
.setFilePathList(filePathListName).setBrokerUser(brokerUserName).setTimestamp(createTimestamp)
+                    
.setLoadStartTime(loadStartTimestamp).setLoadFinishTime(finishTimestamp)
+                    
.setScanRows(loadStatistic.getScannedRows()).setScanBytes(loadStatistic.totalFileSizeB)
+                    .setFileNumber(loadStatistic.fileNum)
+                    .build();
+            
Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent);
+        } catch (Exception e) {
+            LOG.warn("audit finished load job info failed", e);
+        }
+    }
+
+    private String getBrokerUserName() {
+        Map<String, String> properties = brokerDesc.getProperties();
+        if (properties.containsKey("kerberos_principal")) {
+            return properties.get("kerberos_principal");
+        } else if (properties.containsKey("username")) {
+            return properties.get("username");
+        } else if (properties.containsKey("bos_accesskey")) {
+            return properties.get("bos_accesskey");
+        } else if (properties.containsKey("fs.s3a.access.key")) {
+            return properties.get("fs.s3a.access.key");
+        }
+        return null;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index db79fa2..75bc2f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -163,7 +163,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         }
 
         public synchronized void updateLoadProgress(long backendId, TUniqueId 
loadId, TUniqueId fragmentId,
-                long rows, boolean isDone) {
+                                                    long rows, boolean isDone) 
{
             if (counterTbl.contains(loadId, fragmentId)) {
                 counterTbl.put(loadId, fragmentId, rows);
             }
@@ -172,6 +172,14 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             }
         }
 
+        public synchronized long getScannedRows() {
+            long total = 0;
+            for (long rows : counterTbl.values()) {
+                total += rows;
+            }
+            return total;
+        }
+
         public synchronized String toJson() {
             long total = 0;
             for (long rows : counterTbl.values()) {
@@ -243,6 +251,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     public long getDbId() {
         return dbId;
     }
+
     public String getLabel() {
         return label;
     }
@@ -305,6 +314,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      * The method is invoked by 'checkAuth' when authorization info is null in 
job.
      * Also it is invoked by 'gatherAuthInfo' which saves the auth info in the 
constructor of job.
      * Throw MetaNofFoundException when table name could not be found.
+     *
      * @return
      */
     abstract Set<String> getTableNames() throws MetaNotFoundException;
@@ -385,10 +395,10 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      * create pending task for load job and add pending task into pool
      * if job has been cancelled, this step will be ignored
      *
-     * @throws LabelAlreadyUsedException the job is duplicated
-     * @throws BeginTransactionException the limit of load job is exceeded
-     * @throws AnalysisException there are error params in job
-     * @throws DuplicatedRequestException 
+     * @throws LabelAlreadyUsedException  the job is duplicated
+     * @throws BeginTransactionException  the limit of load job is exceeded
+     * @throws AnalysisException          there are error params in job
+     * @throws DuplicatedRequestException
      */
     public void execute() throws LabelAlreadyUsedException, 
BeginTransactionException, AnalysisException,
             DuplicatedRequestException, LoadException {
@@ -513,15 +523,15 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             }
             if (isCommitting) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                                 .add("error_msg", "The txn which belongs to 
job is committing. "
-                                         + "The job could not be cancelled in 
this step").build());
+                        .add("error_msg", "The txn which belongs to job is 
committing. "
+                                + "The job could not be cancelled in this 
step").build());
                 throw new DdlException("Job could not be cancelled while txn 
is committing");
             }
             if (isTxnDone()) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                                 .add("state", state)
-                                 .add("error_msg", "Job could not be cancelled 
when job is " + state)
-                                 .build());
+                        .add("state", state)
+                        .add("error_msg", "Job could not be cancelled when job 
is " + state)
+                        .build());
                 throw new DdlException("Job could not be cancelled when job is 
finished or cancelled");
             }
 
@@ -539,9 +549,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             return;
         }
         if 
(!Catalog.getCurrentCatalog().getAuth().checkPrivByAuthInfo(ConnectContext.get(),
 authorizationInfo,
-                                                                       
PrivPredicate.LOAD)) {
+                PrivPredicate.LOAD)) {
             
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                                           PaloPrivilege.LOAD_PRIV);
+                    PaloPrivilege.LOAD_PRIV);
         }
     }
 
@@ -563,18 +573,18 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             if (tableNames.isEmpty()) {
                 // forward compatibility
                 if 
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), 
db.getFullName(),
-                                                                       
PrivPredicate.LOAD)) {
+                        PrivPredicate.LOAD)) {
                     
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
-                                                   PaloPrivilege.LOAD_PRIV);
+                            PaloPrivilege.LOAD_PRIV);
                 }
             } else {
                 for (String tblName : tableNames) {
                     if 
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), 
db.getFullName(),
-                                                                            
tblName, PrivPredicate.LOAD)) {
+                            tblName, PrivPredicate.LOAD)) {
                         
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
-                                                       command,
-                                                       
ConnectContext.get().getQualifiedUser(),
-                                                       
ConnectContext.get().getRemoteIP(), tblName);
+                                command,
+                                ConnectContext.get().getQualifiedUser(),
+                                ConnectContext.get().getRemoteIP(), tblName);
                     }
                 }
             }
@@ -587,8 +597,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
      * This method will cancel job without edit log and lock
      *
      * @param failMsg
-     * @param abortTxn
-     *            true: abort txn when cancel job, false: only change the 
state of job and ignore abort txn
+     * @param abortTxn true: abort txn when cancel job, false: only change the 
state of job and ignore abort txn
      */
     protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) 
{
         LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id).add("transaction_id", 
transactionId)
@@ -600,8 +609,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         // get load ids of all loading tasks, we will cancel their coordinator 
process later
         List<TUniqueId> loadIds = Lists.newArrayList();
         for (LoadTask loadTask : idToTasks.values()) {
-            if (loadTask instanceof LoadLoadingTask ) {
-                loadIds.add(((LoadLoadingTask)loadTask).getLoadId());
+            if (loadTask instanceof LoadLoadingTask) {
+                loadIds.add(((LoadLoadingTask) loadTask).getLoadId());
             }
         }
         idToTasks.clear();
@@ -622,9 +631,9 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             // abort txn
             try {
                 LOG.debug(new LogBuilder(LogKey.LOAD_JOB, id)
-                                  .add("transaction_id", transactionId)
-                                  .add("msg", "begin to abort txn")
-                                  .build());
+                        .add("transaction_id", transactionId)
+                        .add("msg", "begin to abort txn")
+                        .build());
                 
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, transactionId, 
failMsg.getMsg());
             } catch (UserException e) {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
@@ -633,7 +642,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
                         .build());
             }
         }
-        
+
         // cancel all running coordinators, so that the scheduler's worker 
thread will be released
         for (TUniqueId loadId : loadIds) {
             Coordinator coordinator = 
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
@@ -677,7 +686,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
     protected void logFinalOperation() {
         Catalog.getCurrentCatalog().getEditLog().logEndLoadJob(
                 new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp, finishTimestamp,
-                                          state, failMsg));
+                        state, failMsg));
     }
 
     public void unprotectReadEndOperation(LoadJobFinalOperation 
loadJobFinalOperation) {
@@ -731,7 +740,6 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
             // task info
             jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + 
getTimeout()
                                 + "; max_filter_ratio:" + getMaxFilterRatio());
-
             // error msg
             if (failMsg == null) {
                 jobInfo.add(FeConstants.null_string);
@@ -904,6 +912,7 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         }
         replayTxnAttachment(txnState);
         updateState(JobState.FINISHED);
+        auditFinishedLoadJob();
     }
 
     @Override
@@ -951,10 +960,10 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         LoadJob other = (LoadJob) obj;
 
         return this.id == other.id
-        && this.dbId == other.dbId
-        && this.label.equals(other.label)
-        && this.state.equals(other.state)
-        && this.jobType.equals(other.jobType);
+                && this.dbId == other.dbId
+                && this.label.equals(other.label)
+                && this.state.equals(other.state)
+                && this.jobType.equals(other.jobType);
     }
 
     @Override
@@ -1098,6 +1107,8 @@ public abstract class LoadJob extends 
AbstractTxnStateChangeCallback implements
         loadStartTimestamp = info.getLoadStartTimestamp();
     }
 
+    protected void auditFinishedLoadJob() {}
+
     public static class LoadJobStateUpdateInfo implements Writable {
         @SerializedName(value = "jobId")
         private long jobId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 1e9f9a7..c02cdd8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -35,7 +35,8 @@ public class AuditEvent {
         CONNECTION,
         DISCONNECTION,
         BEFORE_QUERY,
-        AFTER_QUERY
+        AFTER_QUERY,
+        LOAD_SUCCEED
     }
 
     @Retention(RetentionPolicy.RUNTIME)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
new file mode 100644
index 0000000..a79fa80
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/LoadAuditEvent.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.plugin;
+
+public class LoadAuditEvent extends AuditEvent {
+
+    @AuditField(value = "JobId")
+    public long jobId = -1;
+    @AuditField(value = "Label")
+    public String label = "";
+    @AuditField(value = "LoadType")
+    public String loadType = "";
+    @AuditField(value = "TableList")
+    public String tableList = "";
+    @AuditField(value = "FilePathList")
+    public String filePathList = "";
+    // for Baidu HDFS/AFS it is username
+    // for BOS, it is bos_accesskey
+    // for Apache HDFS, it it username or kerberos_principal
+    // for Amazon S3, it is fs.s3a.access.key
+    @AuditField(value = "BrokerUser")
+    public String brokerUser = "";
+    @AuditField(value = "LoadStartTime")
+    public long loadStartTime = -1;
+    @AuditField(value = "LoadFinishTime")
+    public long loadFinishTime = -1;
+    @AuditField(value = "FileNumber")
+    public long fileNumber = -1;
+
+    public static class AuditEventBuilder {
+
+        private LoadAuditEvent auditEvent = new LoadAuditEvent();
+
+        public AuditEventBuilder() {
+        }
+
+        public void reset() {
+            auditEvent = new LoadAuditEvent();
+        }
+
+        public AuditEventBuilder setEventType(EventType eventType) {
+            auditEvent.type = eventType;
+            return this;
+        }
+
+        public AuditEventBuilder setJobId(long jobId) {
+            auditEvent.jobId = jobId;
+            return this;
+        }
+
+        public AuditEventBuilder setLabel(String label) {
+            auditEvent.label = label;
+            return this;
+        }
+
+        public AuditEventBuilder setLoadType(String loadType) {
+            auditEvent.loadType = loadType;
+            return this;
+        }
+
+        public AuditEventBuilder setDb(String db) {
+            auditEvent.db = db;
+            return this;
+        }
+
+        public AuditEventBuilder setTableList(String tableList) {
+            auditEvent.tableList = tableList;
+            return this;
+        }
+
+        public AuditEventBuilder setFilePathList(String filePathList) {
+            auditEvent.filePathList = filePathList;
+            return this;
+        }
+
+        public AuditEventBuilder setBrokerUser(String brokerUser) {
+            auditEvent.brokerUser = brokerUser;
+            return this;
+        }
+
+        public AuditEventBuilder setTimestamp(long timestamp) {
+            auditEvent.timestamp = timestamp;
+            return this;
+        }
+
+        public AuditEventBuilder setLoadStartTime(long loadStartTime) {
+            auditEvent.loadStartTime = loadStartTime;
+            return this;
+        }
+
+        public AuditEventBuilder setLoadFinishTime(long loadFinishTime) {
+            auditEvent.loadFinishTime = loadFinishTime;
+            return this;
+        }
+
+        public AuditEventBuilder setScanRows(long scanRows) {
+            auditEvent.scanRows = scanRows;
+            return this;
+        }
+
+        public AuditEventBuilder setScanBytes(long scanBytes) {
+            auditEvent.scanBytes = scanBytes;
+            return this;
+        }
+
+        public AuditEventBuilder setFileNumber(long fileNumber) {
+            auditEvent.fileNumber = fileNumber;
+            return this;
+        }
+
+        public AuditEvent build() {
+            return this.auditEvent;
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
index 29214c8..3fd1b0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.qe;
 
+import avro.shaded.com.google.common.collect.Maps;
+import avro.shaded.com.google.common.collect.Sets;
 import org.apache.doris.common.AuditLog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.DigitalVersion;
@@ -33,6 +35,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
 
 // A builtin Audit plugin, registered when FE start.
 // it will receive "AFTER_QUERY" AuditEvent and print it as a log in 
fe.audit.log
@@ -41,10 +45,17 @@ public class AuditLogBuilder extends Plugin implements 
AuditPlugin {
 
     private PluginInfo pluginInfo;
 
+    private final String[] LOAD_ANNONATION_NAMES = {"JobId", "Label", 
"LoadType", "Db", "TableList",
+        "FilePathList", "BrokerUser", "Timestamp", "LoadStartTime", 
"LoadFinishTime", "ScanRows",
+        "ScanBytes", "FileNumber"};
+
+    private Set<String> loadAnnotationSet;
+
     public AuditLogBuilder() {
         pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + 
"AuditLogBuilder", PluginType.AUDIT,
                 "builtin audit logger", DigitalVersion.fromString("0.12.0"), 
                 DigitalVersion.fromString("1.8.31"), 
AuditLogBuilder.class.getName(), null, null);
+        loadAnnotationSet = Sets.newHashSet(LOAD_ANNONATION_NAMES);
     }
 
     public PluginInfo getPluginInfo() {
@@ -53,41 +64,72 @@ public class AuditLogBuilder extends Plugin implements 
AuditPlugin {
 
     @Override
     public boolean eventFilter(EventType type) {
-        return type == EventType.AFTER_QUERY;
+        return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED;
     }
 
     @Override
     public void exec(AuditEvent event) {
         try {
-            StringBuilder sb = new StringBuilder();
-            long queryTime = 0;
-            // get each field with annotation "AuditField" in AuditEvent
-            // and assemble them into a string.
-            Field[] fields = event.getClass().getFields();
-            for (Field f : fields) {
-                AuditField af = f.getAnnotation(AuditField.class);
-                if (af == null) {
-                    continue;
-                }
-
-                if (af.value().equals("Timestamp")) {
-                    continue;
-                }
-
-                if (af.value().equals("Time")) {
-                    queryTime = (long) f.get(event);
-                }
-                
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
+           switch (event.type) {
+               case AFTER_QUERY:
+                   auditQueryLog(event);
+                   break;
+               case LOAD_SUCCEED:
+                   auditLoadLog(event);
+                   break;
+               default:
+                   break;
+           }
+        } catch (Exception e) {
+            LOG.debug("failed to process audit event", e);
+        }
+    }
+
+    private void auditQueryLog(AuditEvent event) throws IllegalAccessException 
{
+        StringBuilder sb = new StringBuilder();
+        long queryTime = 0;
+        // get each field with annotation "AuditField" in AuditEvent
+        // and assemble them into a string.
+        Field[] fields = event.getClass().getFields();
+        for (Field f : fields) {
+            AuditField af = f.getAnnotation(AuditField.class);
+            if (af == null) {
+                continue;
             }
 
-            String auditLog = sb.toString();
-            AuditLog.getQueryAudit().log(auditLog);
-            // slow query
-            if (queryTime > Config.qe_slow_log_ms) {
-                AuditLog.getSlowAudit().log(auditLog);
+            if (af.value().equals("Timestamp")) {
+                continue;
             }
-        } catch (Exception e) {
-            LOG.debug("failed to process audit event", e);
+
+            if (af.value().equals("Time")) {
+                queryTime = (long) f.get(event);
+            }
+            
sb.append("|").append(af.value()).append("=").append(String.valueOf(f.get(event)));
+        }
+
+        String auditLog = sb.toString();
+        AuditLog.getQueryAudit().log(auditLog);
+        // slow query
+        if (queryTime > Config.qe_slow_log_ms) {
+            AuditLog.getSlowAudit().log(auditLog);
+        }
+    }
+
+    private void auditLoadLog(AuditEvent event) throws IllegalAccessException {
+        Field[] fields = event.getClass().getFields();
+        Map<String, String> annotationToFieldValueMap = Maps.newHashMap();
+        for (Field f : fields) {
+            AuditField af = f.getAnnotation(AuditField.class);
+            if (af == null || !loadAnnotationSet.contains(af.value())) {
+                continue;
+            }
+            annotationToFieldValueMap.put(af.value(), 
String.valueOf(f.get(event)));
+        }
+        StringBuilder sb = new StringBuilder();
+        for (String annotation : LOAD_ANNONATION_NAMES) {
+            
sb.append("|").append(annotation).append("=").append(annotationToFieldValueMap.get(annotation));
         }
+        String auditLog = sb.toString();
+        AuditLog.getLoadAudit().log(auditLog);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to