This is an automated email from the ASF dual-hosted git repository.

lijibing pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 63555ea9700 [fix](mysql)Support COM_CHANGE_USER and other mysql 
command. (#40932) (#41251)
63555ea9700 is described below

commit 63555ea970005b9e905fe21805be6e065b8711cd
Author: Jibing-Li <64681310+jibing...@users.noreply.github.com>
AuthorDate: Wed Sep 25 15:37:02 2024 +0800

    [fix](mysql)Support COM_CHANGE_USER and other mysql command. (#40932) 
(#41251)
    
    backport: https://github.com/apache/doris/pull/40932
---
 .../java/org/apache/doris/mysql/MysqlCommand.java  |   2 +
 .../apache/doris/mysql/MysqlHandshakePacket.java   |   2 +-
 .../java/org/apache/doris/mysql/MysqlProto.java    |   1 +
 .../java/org/apache/doris/qe/ConnectContext.java   |  11 +++
 .../java/org/apache/doris/qe/ConnectProcessor.java |  10 ++
 .../org/apache/doris/qe/MysqlConnectProcessor.java | 109 +++++++++++++++++++++
 6 files changed, 134 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
index f1f1a443131..75b967aa83c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java
@@ -48,6 +48,8 @@ public enum MysqlCommand {
     COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24),
     COM_STMT_CLOSE("COM_STMT_CLOSE", 25),
     COM_STMT_RESET("COM_STMT_RESET", 26),
+    COM_SET_OPTION("COM_RESET_CONNECTION", 27),
+    COM_STMT_FETCH("COM_RESET_CONNECTION", 28),
     COM_DAEMON("COM_DAEMON", 29),
     COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java
index c2ba21a23ee..566f4ac3f7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java
@@ -33,7 +33,7 @@ public class MysqlHandshakePacket extends MysqlPacket {
     private static final MysqlCapability SSL_CAPABILITY = 
MysqlCapability.SSL_CAPABILITY;
     // status flags not supported in palo
     private static final int STATUS_FLAGS = 0;
-    private static final String AUTH_PLUGIN_NAME = "mysql_native_password";
+    public static final String AUTH_PLUGIN_NAME = "mysql_native_password";
 
     // connection id used in KILL statement.
     private int connectionId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java 
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
index df7919e1118..5e4509a57ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java
@@ -90,6 +90,7 @@ public class MysqlProto {
         serializer.reset();
         MysqlHandshakePacket handshakePacket = new 
MysqlHandshakePacket(context.getConnectionId());
         handshakePacket.writeTo(serializer);
+        context.setMysqlHandshakePacket(handshakePacket);
         try {
             channel.sendAndFlush(serializer.toByteBuffer());
         } catch (IOException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 2eca9fdedbc..b4a388f4b4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -45,6 +45,7 @@ import org.apache.doris.mysql.DummyMysqlChannel;
 import org.apache.doris.mysql.MysqlCapability;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
+import org.apache.doris.mysql.MysqlHandshakePacket;
 import org.apache.doris.mysql.MysqlSslContext;
 import org.apache.doris.mysql.ProxyMysqlChannel;
 import org.apache.doris.nereids.StatementContext;
@@ -243,6 +244,8 @@ public class ConnectContext {
     // it's default thread-safe
     private boolean isProxy = false;
 
+    private MysqlHandshakePacket mysqlHandshakePacket;
+
     public void setUserQueryTimeout(int queryTimeout) {
         if (queryTimeout > 0) {
             sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -1189,4 +1192,12 @@ public class ConnectContext {
     public boolean isProxy() {
         return isProxy;
     }
+
+    public void setMysqlHandshakePacket(MysqlHandshakePacket 
mysqlHandshakePacket) {
+        this.mysqlHandshakePacket = mysqlHandshakePacket;
+    }
+
+    public byte[] getAuthPluginData() {
+        return mysqlHandshakePacket == null ? null : 
mysqlHandshakePacket.getAuthPluginData();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 97ab86ff352..3a5b35e00d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -174,6 +174,16 @@ public abstract class ConnectProcessor {
         ctx.getState().setOk();
     }
 
+    // Do nothing for now.
+    protected void handleStatistics() {
+        ctx.getState().setOk();
+    }
+
+    // Do nothing for now.
+    protected void handleDebug() {
+        ctx.getState().setOk();
+    }
+
     protected void handleStmtReset() {
         ctx.getState().setOk();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
index 8b3ba580aea..acc2998d0f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -23,13 +23,22 @@ import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.PrepareStmt;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MysqlColType;
+import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.ConnectionException;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.mysql.MysqlChannel;
 import org.apache.doris.mysql.MysqlCommand;
+import org.apache.doris.mysql.MysqlHandshakePacket;
 import org.apache.doris.mysql.MysqlProto;
+import org.apache.doris.mysql.MysqlSerializer;
+import org.apache.doris.mysql.privilege.Auth;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import org.apache.doris.nereids.trees.expressions.Placeholder;
@@ -38,6 +47,9 @@ import org.apache.doris.nereids.trees.plans.PlaceholderId;
 import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand;
 import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -309,6 +321,15 @@ public class MysqlConnectProcessor extends 
ConnectProcessor {
                 // process COM_PING statement, do nothing, just return one OK 
packet.
                 handlePing();
                 break;
+            case COM_STATISTICS:
+                handleStatistics();
+                break;
+            case COM_DEBUG:
+                handleDebug();
+                break;
+            case COM_CHANGE_USER:
+                handleChangeUser();
+                break;
             case COM_STMT_RESET:
                 handleStmtReset();
                 break;
@@ -327,6 +348,94 @@ public class MysqlConnectProcessor extends 
ConnectProcessor {
         handleFieldList(tableName);
     }
 
+    private void handleChangeUser() throws IOException {
+        // Random bytes generated when creating connection.
+        byte[] authPluginData = getConnectContext().getAuthPluginData();
+        Preconditions.checkNotNull(authPluginData, "Auth plugin data is 
null.");
+        String userName = new 
String(MysqlProto.readNulTerminateString(packetBuf));
+        int passwordLen = MysqlProto.readInt1(packetBuf);
+        byte[] password = MysqlProto.readFixedString(packetBuf, passwordLen);
+        String db = new String(MysqlProto.readNulTerminateString(packetBuf));
+        // Read the character set.
+        MysqlProto.readInt2(packetBuf);
+        String authPluginName = new 
String(MysqlProto.readNulTerminateString(packetBuf));
+
+        // Send Protocol::AuthSwitchRequest to client if auth plugin name is 
not mysql_native_password
+        if (!MysqlHandshakePacket.AUTH_PLUGIN_NAME.equals(authPluginName)) {
+            MysqlChannel channel = ctx.mysqlChannel;
+            MysqlSerializer serializer = MysqlSerializer.newInstance();
+            serializer.writeInt1((byte) 0xfe);
+            
serializer.writeNulTerminateString(MysqlHandshakePacket.AUTH_PLUGIN_NAME);
+            serializer.writeBytes(authPluginData);
+            serializer.writeInt1(0);
+            channel.sendAndFlush(serializer.toByteBuffer());
+            // Server receive auth switch response packet from client.
+            ByteBuffer authSwitchResponse = channel.fetchOnePacket();
+            int length = authSwitchResponse.limit();
+            password = new byte[length];
+            System.arraycopy(authSwitchResponse.array(), 0, password, 0, 
length);
+        }
+
+        // For safety, not allowed to change to root or admin.
+        if (Auth.ROOT_USER.equals(userName) || 
Auth.ADMIN_USER.equals(userName)) {
+            ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Change 
to root or admin is forbidden");
+            return;
+        }
+
+        // Check password.
+        List<UserIdentity> currentUserIdentity = Lists.newArrayList();
+        try {
+            Env.getCurrentEnv().getAuth()
+                .checkPassword(userName, ctx.remoteIP, password, 
authPluginData, currentUserIdentity);
+        } catch (AuthenticationException e) {
+            ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, 
"Authentication failed.");
+            return;
+        }
+        ctx.setCurrentUserIdentity(currentUserIdentity.get(0));
+        ctx.setQualifiedUser(userName);
+
+        // Change default db if set.
+        if (Strings.isNullOrEmpty(db)) {
+            ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME);
+        } else {
+            String catalogName = null;
+            String dbName = null;
+            String[] dbNames = db.split("\\.");
+            if (dbNames.length == 1) {
+                dbName = db;
+            } else if (dbNames.length == 2) {
+                catalogName = dbNames[0];
+                dbName = dbNames[1];
+            } else if (dbNames.length > 2) {
+                ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Only one 
dot can be in the name: " + db);
+                return;
+            }
+
+            // check catalog and db exists
+            if (catalogName != null) {
+                CatalogIf catalogIf = 
ctx.getEnv().getCatalogMgr().getCatalog(catalogName);
+                if (catalogIf == null) {
+                    ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No 
match catalog in doris: " + db);
+                    return;
+                }
+                if (catalogIf.getDbNullable(dbName) == null) {
+                    ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No 
match database in doris: " + db);
+                    return;
+                }
+            }
+            try {
+                if (catalogName != null) {
+                    ctx.getEnv().changeCatalog(ctx, catalogName);
+                }
+                Env.getCurrentEnv().changeDb(ctx, dbName);
+            } catch (DdlException e) {
+                ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+                return;
+            }
+        }
+        ctx.getState().setOk();
+    }
+
     // Process a MySQL request
     public void processOnce() throws IOException {
         // set status of query to OK.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to