This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1f6350701 [Feature](auditloader) Plugin auditloader use auth token 
to avoid using cleartext passwords in config (#26278)
ad1f6350701 is described below

commit ad1f6350701516c82509da0612574567989bd7b6
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Tue Nov 7 05:14:57 2023 -0600

    [Feature](auditloader) Plugin auditloader use auth token to avoid using 
cleartext passwords in config (#26278)
    
    Doris FE will check if stream load http request has auth token after 
checking password failed;
    Plugin audit-log loader can use auth token if plugin config set 
use_auth_token to true
    
    Co-authored-by: Mingyu Chen <morningman....@gmail.com>
---
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 112 ++++++++++++++++++++-
 .../auditloader/src/main/assembly/plugin.conf      |   3 +
 .../doris/plugin/audit/AuditLoaderPlugin.java      |  20 +++-
 .../doris/plugin/audit/DorisStreamLoader.java      |  11 +-
 4 files changed, 138 insertions(+), 8 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 9c5902d748a..ff6e98a6c78 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
+import org.apache.doris.httpv2.exception.UnauthorizedException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
@@ -43,6 +44,7 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.servlet.view.RedirectView;
 
+import java.net.URI;
 import java.util.List;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -81,7 +83,20 @@ public class LoadAction extends RestBaseController {
             return redirectToHttps(request);
         }
 
-        executeCheckPassword(request, response);
+        try {
+            executeCheckPassword(request, response);
+        } catch (UnauthorizedException unauthorizedException) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Check password failed, going to check auth token, 
request: {}", request.toString());
+            }
+
+            if (!checkClusterToken(request)) {
+                throw unauthorizedException;
+            } else {
+                return executeWithClusterToken(request, db, table, true);
+            }
+        }
+
         return executeWithoutPassword(request, response, db, table, true);
     }
 
@@ -257,4 +272,99 @@ public class LoadAction extends RestBaseController {
         }
         return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
     }
+
+    // NOTE: This function can only be used for AuditlogPlugin stream load for 
now.
+    // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
+    // temporarily addressing the users' needs for audit logs.
+    // So this function is not widely tested under general scenario
+    private boolean checkClusterToken(HttpServletRequest request) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Checking cluser token, request {}", request.toString());
+        }
+
+        String authToken = request.getHeader("token");
+
+        if (Strings.isNullOrEmpty(authToken)) {
+            return false;
+        }
+
+        return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
+    }
+
+    // NOTE: This function can only be used for AuditlogPlugin stream load for 
now.
+    // AuditlogPlugin should be re-disigned carefully, and blow method focuses 
on
+    // temporarily addressing the users' needs for audit logs.
+    // So this function is not widely tested under general scenario
+    private Object executeWithClusterToken(HttpServletRequest request, String 
db,
+                                          String table, boolean isStreamLoad) {
+        try {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
+            ctx.setRemoteIP(request.getRemoteAddr());
+
+            String dbName = db;
+            String tableName = table;
+            // A 'Load' request must have 100-continue header
+            if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
+                return new RestBaseResult("There is no 100-continue header");
+            }
+
+            final String clusterName = ConnectContext.get().getClusterName();
+            if (Strings.isNullOrEmpty(clusterName)) {
+                return new RestBaseResult("No cluster selected.");
+            }
+
+            if (Strings.isNullOrEmpty(dbName)) {
+                return new RestBaseResult("No database selected.");
+            }
+
+            if (Strings.isNullOrEmpty(tableName)) {
+                return new RestBaseResult("No table selected.");
+            }
+
+            String label = request.getParameter(LABEL_KEY);
+            if (isStreamLoad) {
+                label = request.getHeader(LABEL_KEY);
+            }
+
+            if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
+                // for stream load, the label can be generated by system 
automatically
+                return new RestBaseResult("No label selected.");
+            }
+
+            TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);
+
+            LOG.info("Redirect load action with auth token to destination={},"
+                        + "stream: {}, db: {}, tbl: {}, label: {}",
+                    redirectAddr.toString(), isStreamLoad, dbName, tableName, 
label);
+
+            URI urlObj = null;
+            URI resultUriObj = null;
+            String urlStr = request.getRequestURI();
+            String userInfo = null;
+
+            try {
+                urlObj = new URI(urlStr);
+                resultUriObj = new URI("http", userInfo, 
redirectAddr.getHostname(),
+                        redirectAddr.getPort(), urlObj.getPath(), "", null);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            String redirectUrl = resultUriObj.toASCIIString();
+            if (!Strings.isNullOrEmpty(request.getQueryString())) {
+                redirectUrl += request.getQueryString();
+            }
+            LOG.info("Redirect url: {}", redirectUrl);
+            RedirectView redirectView = new RedirectView(redirectUrl);
+            redirectView.setContentType("text/html;charset=utf-8");
+            
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
+
+            return redirectView;
+        } catch (Exception e) {
+            LOG.warn("Failed to execute stream load with cluster token, {}", 
e);
+            return new RestBaseResult(e.getMessage());
+        }
+    }
 }
diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf 
b/fe_plugins/auditloader/src/main/assembly/plugin.conf
index 31f7bd3f356..aec8724fd96 100755
--- a/fe_plugins/auditloader/src/main/assembly/plugin.conf
+++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf
@@ -51,3 +51,6 @@ user=root
 # Doris user's password
 password=
 
+# Use doris cluster token for stream load authorization, if true, user and 
password will be ignored.
+use_auth_token=false
+
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 2aa1246dd6b..3cfb0eeeaee 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -18,6 +18,7 @@
 package org.apache.doris.plugin.audit;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditPlugin;
 import org.apache.doris.plugin.Plugin;
@@ -84,7 +85,6 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
             this.lastLoadTimeSlowLog = System.currentTimeMillis();
 
             loadConfig(ctx, info.getProperties());
-
             this.auditEventQueue = 
Queues.newLinkedBlockingDeque(conf.maxQueueSize);
             this.streamLoader = new DorisStreamLoader(conf);
             this.loadThread = new Thread(new LoadWorker(this.streamLoader), 
"audit loader thread");
@@ -209,7 +209,16 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         if (logBuffer.length() >= conf.maxBatchSize || currentTime - 
lastLoadTime >= conf.maxBatchIntervalSec * 1000) {         
             // begin to load
             try {
-                DorisStreamLoader.LoadResponse response = 
loader.loadBatch(logBuffer, slowLog);
+                String token = "";
+                if (conf.use_auth_token) {
+                    try {
+                        // Acquire token from master
+                        token = 
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+                    } catch (Exception e) {
+                        LOG.error("Failed to get auth token: {}", e);
+                    }
+                }  
+                DorisStreamLoader.LoadResponse response = 
loader.loadBatch(logBuffer, slowLog, token);
                 LOG.debug("audit loader response: {}", response);
             } catch (Exception e) {
                 LOG.debug("encounter exception when putting current audit 
batch, discard current batch", e);
@@ -248,6 +257,7 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
         // the max stmt length to be loaded in audit table.
         public static final String MAX_STMT_LENGTH = "max_stmt_length";
+        public static final String USE_AUTH_TOKEN = "use_auth_token";
 
         public long maxBatchSize = 50 * 1024 * 1024;
         public long maxBatchIntervalSec = 60;
@@ -262,6 +272,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
         // the identity of FE which run this plugin
         public String feIdentity = "";
         public int max_stmt_length = 4096;
+        // auth_token is not used by default
+        public boolean use_auth_token = false;
+        
 
         public void init(Map<String, String> properties) throws 
PluginException {
             try {
@@ -302,6 +315,9 @@ public class AuditLoaderPlugin extends Plugin implements 
AuditPlugin {
                 if (properties.containsKey(MAX_STMT_LENGTH)) {
                     max_stmt_length = 
Integer.parseInt(properties.get(MAX_STMT_LENGTH));
                 }
+                if (properties.containsKey(USE_AUTH_TOKEN)) {
+                    use_auth_token = 
Boolean.valueOf(properties.get(USE_AUTH_TOKEN));
+                }
             } catch (Exception e) {
                 throw new PluginException(e.getMessage());
             }
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
index 2781bcc2073..d389f0dfa81 100644
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java
@@ -59,11 +59,12 @@ public class DorisStreamLoader {
         this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
     }
 
-    private HttpURLConnection getConnection(String urlStr, String label) 
throws IOException {
+    private HttpURLConnection getConnection(String urlStr, String label, 
String clusterToken) throws IOException {
         URL url = new URL(urlStr);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
+        conn.setRequestProperty("token", clusterToken);
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
@@ -114,7 +115,7 @@ public class DorisStreamLoader {
         return response.toString();
     }
 
-    public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
+    public LoadResponse loadBatch(StringBuilder sb, boolean slowLog, String 
clusterToken) {
         Calendar calendar = Calendar.getInstance();
         String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, 
calendar.get(Calendar.DAY_OF_MONTH),
@@ -127,10 +128,10 @@ public class DorisStreamLoader {
             // build request and send to fe
             if (slowLog) {
                 label = "slow" + label;
-                feConn = getConnection(slowLogLoadUrlStr, label);
+                feConn = getConnection(slowLogLoadUrlStr, label, clusterToken);
             } else {
                 label = "audit" + label;
-                feConn = getConnection(auditLogLoadUrlStr, label);
+                feConn = getConnection(auditLogLoadUrlStr, label, 
clusterToken);
             }
             int status = feConn.getResponseCode();
             // fe send back http response code TEMPORARY_REDIRECT 307 and new 
be location
@@ -143,7 +144,7 @@ public class DorisStreamLoader {
                 throw new Exception("redirect location is null");
             }
             // build request and send to new be location
-            beConn = getConnection(location, label);
+            beConn = getConnection(location, label, clusterToken);
             // send data to be
             try (BufferedOutputStream bos = new 
BufferedOutputStream(beConn.getOutputStream())) {
                 bos.write(sb.toString().getBytes());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to