This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0e472  [fix](audit-plugin) Fix audit load plugin may stopped when 
throw unexpected exceptions (#7607)
1e0e472 is described below

commit 1e0e4727842e8140dee24b3e0908ebf176dbd258
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Thu Jan 6 23:21:13 2022 +0800

    [fix](audit-plugin) Fix audit load plugin may stopped when throw unexpected 
exceptions (#7607)
    
    Fix audit load may stopped when throw unexpected exceptions
---
 .../doris/plugin/audit/AuditLoaderPlugin.java      | 34 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)

diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 44d34a7..6d2b7bc 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -29,6 +29,11 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -36,7 +41,6 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
 import java.util.Properties;
-import java.util.TimeZone;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
@@ -62,9 +66,6 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
     private volatile boolean isClosed = false;
     private volatile boolean isInit = false;
 
-    // the max auditEventQueue size to store audit_event
-    private static final int MAX_AUDIT_EVENT_SIZE = 4096;
-
     @Override
     public void init(PluginInfo info, PluginContext ctx) throws 
PluginException {
         super.init(info, ctx);
@@ -161,12 +162,27 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         auditBuffer.append(event.peakMemoryBytes).append("\t");
         // trim the query to avoid too long
         // use `getBytes().length` to get real byte length
-        int maxLen = Math.min(conf.max_stmt_length, 
event.stmt.getBytes().length);
-        String stmt = new String(event.stmt.getBytes(), 0, 
maxLen).replace("\t", " ");
+        String stmt = truncateByBytes(event.stmt).replace("\t", " ");
         LOG.debug("receive audit event with stmt: {}", stmt);
         auditBuffer.append(stmt).append("\n");
     }
 
+    private String truncateByBytes(String str) {
+        int maxLen = Math.min(conf.max_stmt_length, str.getBytes().length);
+        if (maxLen >= str.getBytes().length) {
+            return str;
+        }
+        Charset utf8Charset = Charset.forName("UTF-8");
+        CharsetDecoder decoder = utf8Charset.newDecoder();
+        byte[] sb = str.getBytes();
+        ByteBuffer buffer = ByteBuffer.wrap(sb, 0, maxLen);
+        CharBuffer charBuffer = CharBuffer.allocate(maxLen);
+        decoder.onMalformedInput(CodingErrorAction.IGNORE);
+        decoder.decode(buffer, charBuffer, true);
+        decoder.flush(charBuffer);
+        return new String(charBuffer.array(), 0, charBuffer.position());
+    }
+
     private void loadIfNecessary(DorisStreamLoader loader) {
         if (auditBuffer.length() < conf.maxBatchSize && 
System.currentTimeMillis() - lastLoadTime < conf.maxBatchIntervalSec * 1000) {
             return;
@@ -256,8 +272,10 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                         assembleAudit(event);
                         loadIfNecessary(loader);
                     }
-                } catch (InterruptedException e) {
-                    LOG.debug("encounter exception when loading current audit 
batch", e);
+                } catch (InterruptedException ie) {
+                    LOG.debug("encounter exception when loading current audit 
batch", ie);
+                } catch (Exception e) {
+                    LOG.error("run audit logger error:", e);
                 }
             }
         }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to