This is an automated email from the ASF dual-hosted git repository. lijibing pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 63555ea9700 [fix](mysql)Support COM_CHANGE_USER and other mysql command. (#40932) (#41251) 63555ea9700 is described below commit 63555ea970005b9e905fe21805be6e065b8711cd Author: Jibing-Li <64681310+jibing...@users.noreply.github.com> AuthorDate: Wed Sep 25 15:37:02 2024 +0800 [fix](mysql)Support COM_CHANGE_USER and other mysql command. (#40932) (#41251) backport: https://github.com/apache/doris/pull/40932 --- .../java/org/apache/doris/mysql/MysqlCommand.java | 2 + .../apache/doris/mysql/MysqlHandshakePacket.java | 2 +- .../java/org/apache/doris/mysql/MysqlProto.java | 1 + .../java/org/apache/doris/qe/ConnectContext.java | 11 +++ .../java/org/apache/doris/qe/ConnectProcessor.java | 10 ++ .../org/apache/doris/qe/MysqlConnectProcessor.java | 109 +++++++++++++++++++++ 6 files changed, 134 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java index f1f1a443131..75b967aa83c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlCommand.java @@ -48,6 +48,8 @@ public enum MysqlCommand { COM_STMT_SEND_LONG_DATA("COM_STMT_SEND_LONG_DATA", 24), COM_STMT_CLOSE("COM_STMT_CLOSE", 25), COM_STMT_RESET("COM_STMT_RESET", 26), + COM_SET_OPTION("COM_RESET_CONNECTION", 27), + COM_STMT_FETCH("COM_RESET_CONNECTION", 28), COM_DAEMON("COM_DAEMON", 29), COM_RESET_CONNECTION("COM_RESET_CONNECTION", 31); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java index c2ba21a23ee..566f4ac3f7e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlHandshakePacket.java @@ -33,7 +33,7 @@ public class MysqlHandshakePacket extends MysqlPacket { private static final MysqlCapability SSL_CAPABILITY = MysqlCapability.SSL_CAPABILITY; // status flags not supported in palo private static final int STATUS_FLAGS = 0; - private static final String AUTH_PLUGIN_NAME = "mysql_native_password"; + public static final String AUTH_PLUGIN_NAME = "mysql_native_password"; // connection id used in KILL statement. private int connectionId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java index df7919e1118..5e4509a57ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java @@ -90,6 +90,7 @@ public class MysqlProto { serializer.reset(); MysqlHandshakePacket handshakePacket = new MysqlHandshakePacket(context.getConnectionId()); handshakePacket.writeTo(serializer); + context.setMysqlHandshakePacket(handshakePacket); try { channel.sendAndFlush(serializer.toByteBuffer()); } catch (IOException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 2eca9fdedbc..b4a388f4b4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -45,6 +45,7 @@ import org.apache.doris.mysql.DummyMysqlChannel; import org.apache.doris.mysql.MysqlCapability; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.mysql.MysqlHandshakePacket; import org.apache.doris.mysql.MysqlSslContext; import org.apache.doris.mysql.ProxyMysqlChannel; import org.apache.doris.nereids.StatementContext; @@ -243,6 +244,8 @@ public class ConnectContext { // it's default thread-safe private boolean isProxy = false; + private MysqlHandshakePacket mysqlHandshakePacket; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -1189,4 +1192,12 @@ public class ConnectContext { public boolean isProxy() { return isProxy; } + + public void setMysqlHandshakePacket(MysqlHandshakePacket mysqlHandshakePacket) { + this.mysqlHandshakePacket = mysqlHandshakePacket; + } + + public byte[] getAuthPluginData() { + return mysqlHandshakePacket == null ? null : mysqlHandshakePacket.getAuthPluginData(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 97ab86ff352..3a5b35e00d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -174,6 +174,16 @@ public abstract class ConnectProcessor { ctx.getState().setOk(); } + // Do nothing for now. + protected void handleStatistics() { + ctx.getState().setOk(); + } + + // Do nothing for now. + protected void handleDebug() { + ctx.getState().setOk(); + } + protected void handleStmtReset() { ctx.getState().setOk(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java index 8b3ba580aea..acc2998d0f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java @@ -23,13 +23,22 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PrepareStmt; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MysqlColType; +import org.apache.doris.common.AuthenticationException; import org.apache.doris.common.ConnectionException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.mysql.MysqlChannel; import org.apache.doris.mysql.MysqlCommand; +import org.apache.doris.mysql.MysqlHandshakePacket; import org.apache.doris.mysql.MysqlProto; +import org.apache.doris.mysql.MysqlSerializer; +import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.expressions.Placeholder; @@ -38,6 +47,9 @@ import org.apache.doris.nereids.trees.plans.PlaceholderId; import org.apache.doris.nereids.trees.plans.commands.ExecuteCommand; import org.apache.doris.nereids.trees.plans.commands.PrepareCommand; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -309,6 +321,15 @@ public class MysqlConnectProcessor extends ConnectProcessor { // process COM_PING statement, do nothing, just return one OK packet. handlePing(); break; + case COM_STATISTICS: + handleStatistics(); + break; + case COM_DEBUG: + handleDebug(); + break; + case COM_CHANGE_USER: + handleChangeUser(); + break; case COM_STMT_RESET: handleStmtReset(); break; @@ -327,6 +348,94 @@ public class MysqlConnectProcessor extends ConnectProcessor { handleFieldList(tableName); } + private void handleChangeUser() throws IOException { + // Random bytes generated when creating connection. + byte[] authPluginData = getConnectContext().getAuthPluginData(); + Preconditions.checkNotNull(authPluginData, "Auth plugin data is null."); + String userName = new String(MysqlProto.readNulTerminateString(packetBuf)); + int passwordLen = MysqlProto.readInt1(packetBuf); + byte[] password = MysqlProto.readFixedString(packetBuf, passwordLen); + String db = new String(MysqlProto.readNulTerminateString(packetBuf)); + // Read the character set. + MysqlProto.readInt2(packetBuf); + String authPluginName = new String(MysqlProto.readNulTerminateString(packetBuf)); + + // Send Protocol::AuthSwitchRequest to client if auth plugin name is not mysql_native_password + if (!MysqlHandshakePacket.AUTH_PLUGIN_NAME.equals(authPluginName)) { + MysqlChannel channel = ctx.mysqlChannel; + MysqlSerializer serializer = MysqlSerializer.newInstance(); + serializer.writeInt1((byte) 0xfe); + serializer.writeNulTerminateString(MysqlHandshakePacket.AUTH_PLUGIN_NAME); + serializer.writeBytes(authPluginData); + serializer.writeInt1(0); + channel.sendAndFlush(serializer.toByteBuffer()); + // Server receive auth switch response packet from client. + ByteBuffer authSwitchResponse = channel.fetchOnePacket(); + int length = authSwitchResponse.limit(); + password = new byte[length]; + System.arraycopy(authSwitchResponse.array(), 0, password, 0, length); + } + + // For safety, not allowed to change to root or admin. + if (Auth.ROOT_USER.equals(userName) || Auth.ADMIN_USER.equals(userName)) { + ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Change to root or admin is forbidden"); + return; + } + + // Check password. + List<UserIdentity> currentUserIdentity = Lists.newArrayList(); + try { + Env.getCurrentEnv().getAuth() + .checkPassword(userName, ctx.remoteIP, password, authPluginData, currentUserIdentity); + } catch (AuthenticationException e) { + ctx.getState().setError(ErrorCode.ERR_ACCESS_DENIED_ERROR, "Authentication failed."); + return; + } + ctx.setCurrentUserIdentity(currentUserIdentity.get(0)); + ctx.setQualifiedUser(userName); + + // Change default db if set. + if (Strings.isNullOrEmpty(db)) { + ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME); + } else { + String catalogName = null; + String dbName = null; + String[] dbNames = db.split("\\."); + if (dbNames.length == 1) { + dbName = db; + } else if (dbNames.length == 2) { + catalogName = dbNames[0]; + dbName = dbNames[1]; + } else if (dbNames.length > 2) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "Only one dot can be in the name: " + db); + return; + } + + // check catalog and db exists + if (catalogName != null) { + CatalogIf catalogIf = ctx.getEnv().getCatalogMgr().getCatalog(catalogName); + if (catalogIf == null) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match catalog in doris: " + db); + return; + } + if (catalogIf.getDbNullable(dbName) == null) { + ctx.getState().setError(ErrorCode.ERR_BAD_DB_ERROR, "No match database in doris: " + db); + return; + } + } + try { + if (catalogName != null) { + ctx.getEnv().changeCatalog(ctx, catalogName); + } + Env.getCurrentEnv().changeDb(ctx, dbName); + } catch (DdlException e) { + ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + return; + } + } + ctx.getState().setOk(); + } + // Process a MySQL request public void processOnce() throws IOException { // set status of query to OK. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org