This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 55c1d27 [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc 55c1d27 is described below commit 55c1d276746b9bbf5458ca92cb55b2aeb0bae595 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Jun 8 10:02:45 2020 +0800 [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc ### What is this PR for? User credential only works in the default jdbc interpreter. If user create a new jdbc interpreter, e.g. hive, then credential won't work. This PR fix this and also do some code refactoring to make the jdbc interpreter module more readable. ### What type of PR is it? [Bug Fix | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4863 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3806 from zjffdu/ZEPPELIN-4863 and squashes the following commits: 9fd10eeb2 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc abb00b376 [Jeff Zhang] [ZEPPELIN-4863]. User credential usage in jdbc interpreter doesn't work when the interpreter name is not jdbc --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 193 +++++++++++---------- .../zeppelin/jdbc/JDBCUserConfigurations.java | 48 +++-- jdbc/src/main/resources/interpreter-setting.json | 14 +- .../apache/zeppelin/jdbc/JDBCInterpreterTest.java | 166 ++++++++++-------- .../org/apache/zeppelin/notebook/Paragraph.java | 7 +- 5 files changed, 235 insertions(+), 193 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index bdda696..5dcf4d0 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -97,7 +97,7 @@ import org.apache.zeppelin.user.UsernamePassword; * </p> */ public class JDBCInterpreter extends KerberosInterpreter { - private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JDBCInterpreter.class); static final String INTERPRETER_NAME = "jdbc"; static final String COMMON_KEY = "common"; @@ -152,7 +152,9 @@ public class JDBCInterpreter extends KerberosInterpreter { "KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath", "extraCredentials", "roles", "sessionProperties")); + // database --> Properties private final HashMap<String, Properties> basePropertiesMap; + // username --> User Configuration private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap; private final HashMap<String, SqlCompleter> sqlCompletersMap; @@ -189,23 +191,19 @@ public class JDBCInterpreter extends KerberosInterpreter { return true; } } catch (Exception e) { - logger.error("Unable to run kinit for zeppelin", e); + LOGGER.error("Unable to run kinit for zeppelin", e); } return false; } - public HashMap<String, Properties> getPropertiesMap() { - return basePropertiesMap; - } - @Override public void open() { super.open(); for (String propertyKey : properties.stringPropertyNames()) { - logger.debug("propertyKey: {}", propertyKey); + LOGGER.debug("propertyKey: {}", propertyKey); String[] keyValue = propertyKey.split("\\.", 2); if (2 == keyValue.length) { - logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]); + LOGGER.debug("key: {}, value: {}", keyValue[0], keyValue[1]); Properties prefixProperties; if (basePropertiesMap.containsKey(keyValue[0])) { @@ -223,7 +221,7 @@ public class JDBCInterpreter extends KerberosInterpreter { if (!COMMON_KEY.equals(key)) { Properties properties = basePropertiesMap.get(key); if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) { - logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.", + LOGGER.error("{} will be ignored. {}.{} and {}.{} is mandatory.", key, DRIVER_KEY, key, key, URL_KEY); removeKeySet.add(key); } @@ -233,7 +231,7 @@ public class JDBCInterpreter extends KerberosInterpreter { for (String key : removeKeySet) { basePropertiesMap.remove(key); } - logger.debug("JDBC PropretiesMap: {}", basePropertiesMap); + LOGGER.debug("JDBC PropertiesMap: {}", basePropertiesMap); setMaxLineResults(); setMaxRows(); @@ -295,12 +293,12 @@ public class JDBCInterpreter extends KerberosInterpreter { // protection to release connection executorService.awaitTermination(3, TimeUnit.SECONDS); } catch (InterruptedException e) { - logger.warn("Completion timeout", e); + LOGGER.warn("Completion timeout", e); if (connection != null) { try { connection.close(); } catch (SQLException e1) { - logger.warn("Error close connection", e1); + LOGGER.warn("Error close connection", e1); } } } @@ -312,7 +310,7 @@ public class JDBCInterpreter extends KerberosInterpreter { try { configurations.initStatementMap(); } catch (Exception e) { - logger.error("Error while closing paragraphIdStatementMap statement...", e); + LOGGER.error("Error while closing paragraphIdStatementMap statement...", e); } } } @@ -322,13 +320,13 @@ public class JDBCInterpreter extends KerberosInterpreter { try { closeDBPool(key, DEFAULT_KEY); } catch (SQLException e) { - logger.error("Error while closing database pool.", e); + LOGGER.error("Error while closing database pool.", e); } try { JDBCUserConfigurations configurations = jdbcUserConfigurationsMap.get(key); configurations.initConnectionPoolMap(); } catch (SQLException e) { - logger.error("Error while closing initConnectionPoolMap.", e); + LOGGER.error("Error while closing initConnectionPoolMap.", e); } } } @@ -340,22 +338,22 @@ public class JDBCInterpreter extends KerberosInterpreter { initStatementMap(); initConnectionPoolMap(); } catch (Exception e) { - logger.error("Error while closing...", e); + LOGGER.error("Error while closing...", e); } } - private String getEntityName(String replName) { - StringBuffer entityName = new StringBuffer(); - entityName.append(INTERPRETER_NAME); - entityName.append("."); - entityName.append(replName); - return entityName.toString(); + private String getEntityName(String replName, String propertyKey) { + if ("jdbc".equals(replName)) { + return propertyKey; + } else { + return replName; + } } - private String getJDBCDriverName(String user, String propertyKey) { + private String getJDBCDriverName(String user, String dbPrefix) { StringBuffer driverName = new StringBuffer(); driverName.append(DBCP_STRING); - driverName.append(propertyKey); + driverName.append(dbPrefix); driverName.append(user); return driverName.toString(); } @@ -367,10 +365,10 @@ public class JDBCInterpreter extends KerberosInterpreter { } private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext, - String replName) { + String entity) { UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials(); if (uc != null) { - return uc.getUsernamePassword(replName); + return uc.getUsernamePassword(entity); } return null; } @@ -393,36 +391,36 @@ public class JDBCInterpreter extends KerberosInterpreter { } } - private void setUserProperty(String propertyKey, InterpreterContext interpreterContext) + private void setUserProperty(String dbPrefix, InterpreterContext context) throws SQLException, IOException, InterpreterException { - String user = interpreterContext.getAuthenticationInfo().getUser(); + String user = context.getAuthenticationInfo().getUser(); JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user); - if (basePropertiesMap.get(propertyKey).containsKey(USER_KEY) && - !basePropertiesMap.get(propertyKey).getProperty(USER_KEY).isEmpty()) { - String password = getPassword(basePropertiesMap.get(propertyKey)); + if (basePropertiesMap.get(dbPrefix).containsKey(USER_KEY) && + !basePropertiesMap.get(dbPrefix).getProperty(USER_KEY).isEmpty()) { + String password = getPassword(basePropertiesMap.get(dbPrefix)); if (!isEmpty(password)) { - basePropertiesMap.get(propertyKey).setProperty(PASSWORD_KEY, password); + basePropertiesMap.get(dbPrefix).setProperty(PASSWORD_KEY, password); } } - jdbcUserConfigurations.setPropertyMap(propertyKey, basePropertiesMap.get(propertyKey)); - if (existAccountInBaseProperty(propertyKey)) { + jdbcUserConfigurations.setPropertyMap(dbPrefix, basePropertiesMap.get(dbPrefix)); + if (existAccountInBaseProperty(dbPrefix)) { return; } - jdbcUserConfigurations.cleanUserProperty(propertyKey); + jdbcUserConfigurations.cleanUserProperty(dbPrefix); - UsernamePassword usernamePassword = getUsernamePassword(interpreterContext, - getEntityName(interpreterContext.getReplName())); + UsernamePassword usernamePassword = getUsernamePassword(context, + getEntityName(context.getReplName(), dbPrefix)); if (usernamePassword != null) { - jdbcUserConfigurations.setUserProperty(propertyKey, usernamePassword); + jdbcUserConfigurations.setUserProperty(dbPrefix, usernamePassword); } else { - closeDBPool(user, propertyKey); + closeDBPool(user, dbPrefix); } } - private void createConnectionPool(String url, String user, String propertyKey, - Properties properties) throws SQLException, ClassNotFoundException, IOException { + private void createConnectionPool(String url, String user, String dbPrefix, + Properties properties) throws SQLException, ClassNotFoundException { String driverClass = properties.getProperty(DRIVER_KEY); if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver") @@ -449,62 +447,62 @@ public class JDBCInterpreter extends KerberosInterpreter { poolableConnectionFactory.setPool(connectionPool); Class.forName(driverClass); PoolingDriver driver = new PoolingDriver(); - driver.registerPool(propertyKey + user, connectionPool); - getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver); + driver.registerPool(dbPrefix + user, connectionPool); + getJDBCConfiguration(user).saveDBDriverPool(dbPrefix, driver); } - private Connection getConnectionFromPool(String url, String user, String propertyKey, - Properties properties) throws SQLException, ClassNotFoundException, IOException { - String jdbcDriver = getJDBCDriverName(user, propertyKey); + private Connection getConnectionFromPool(String url, String user, String dbPrefix, + Properties properties) throws SQLException, ClassNotFoundException { + String jdbcDriver = getJDBCDriverName(user, dbPrefix); - if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) { - createConnectionPool(url, user, propertyKey, properties); + if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(dbPrefix)) { + createConnectionPool(url, user, dbPrefix, properties); } return DriverManager.getConnection(jdbcDriver); } - public Connection getConnection(String propertyKey, InterpreterContext interpreterContext) + public Connection getConnection(String dbPrefix, InterpreterContext context) throws ClassNotFoundException, SQLException, InterpreterException, IOException { - final String user = interpreterContext.getAuthenticationInfo().getUser(); + final String user = context.getAuthenticationInfo().getUser(); Connection connection; - if (propertyKey == null || basePropertiesMap.get(propertyKey) == null) { + if (dbPrefix == null || basePropertiesMap.get(dbPrefix) == null) { return null; } JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user); - setUserProperty(propertyKey, interpreterContext); + setUserProperty(dbPrefix, context); - final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey); + final Properties properties = jdbcUserConfigurations.getPropertyMap(dbPrefix); final String url = properties.getProperty(URL_KEY); if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) { - connection = getConnectionFromPool(url, user, propertyKey, properties); + connection = getConnectionFromPool(url, user, dbPrefix, properties); } else { UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(getProperties()); - final String connectionUrl = appendProxyUserToURL(url, user, propertyKey); + final String connectionUrl = appendProxyUserToURL(url, user, dbPrefix); JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType); switch (authType) { case KERBEROS: if (user == null || "false".equalsIgnoreCase( getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) { - connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties); + connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties); } else { - if (basePropertiesMap.get(propertyKey).containsKey("proxy.user.property")) { - connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties); + if (basePropertiesMap.get(dbPrefix).containsKey("proxy.user.property")) { + connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties); } else { UserGroupInformation ugi = null; try { ugi = UserGroupInformation.createProxyUser( user, UserGroupInformation.getCurrentUser()); } catch (Exception e) { - logger.error("Error in getCurrentUser", e); + LOGGER.error("Error in getCurrentUser", e); throw new InterpreterException("Error in getCurrentUser", e); } - final String poolKey = propertyKey; + final String poolKey = dbPrefix; try { connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() { @Override @@ -513,7 +511,7 @@ public class JDBCInterpreter extends KerberosInterpreter { } }); } catch (Exception e) { - logger.error("Error in doAs", e); + LOGGER.error("Error in doAs", e); throw new InterpreterException("Error in doAs", e); } } @@ -521,7 +519,7 @@ public class JDBCInterpreter extends KerberosInterpreter { break; default: - connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties); + connection = getConnectionFromPool(connectionUrl, user, dbPrefix, properties); } } @@ -538,13 +536,13 @@ public class JDBCInterpreter extends KerberosInterpreter { if (lastIndexOfUrl == -1) { lastIndexOfUrl = connectionUrl.length(); } - logger.info("Using proxy user as :" + user); - logger.info("Using proxy property for user as :" + + LOGGER.info("Using proxy user as: {}", user); + LOGGER.info("Using proxy property for user as: {}", basePropertiesMap.get(propertyKey).getProperty("proxy.user.property")); connectionUrl.insert(lastIndexOfUrl, ";" + basePropertiesMap.get(propertyKey).getProperty("proxy.user.property") + "=" + user + ";"); } else if (user != null && !user.equals("anonymous") && url.contains("hive")) { - logger.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" + + LOGGER.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" + ".org/docs/latest/interpreter/jdbc.html#apache-hive"); } @@ -570,9 +568,9 @@ public class JDBCInterpreter extends KerberosInterpreter { + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY)); } } catch (Exception e) { - logger.error("Failed to retrieve password from JCEKS \n" + - "For file: " + properties.getProperty(JDBC_JCEKS_FILE) + - "\nFor key: " + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e); + LOGGER.error("Failed to retrieve password from JCEKS \n" + + "For file: {} \nFor key: {}", properties.getProperty(JDBC_JCEKS_FILE), + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e); throw e; } } @@ -661,7 +659,16 @@ public class JDBCInterpreter extends KerberosInterpreter { return sqlSplitter.splitSql(text); } - private InterpreterResult executeSql(String propertyKey, String sql, + /** + * Execute the sql statement under this dbPrefix. + * + * @param dbPrefix + * @param sql + * @param context + * @return + * @throws InterpreterException + */ + private InterpreterResult executeSql(String dbPrefix, String sql, InterpreterContext context) throws InterpreterException { Connection connection = null; Statement statement; @@ -670,13 +677,13 @@ public class JDBCInterpreter extends KerberosInterpreter { String user = context.getAuthenticationInfo().getUser(); try { - connection = getConnection(propertyKey, context); + connection = getConnection(dbPrefix, context); } catch (Exception e) { String errorMsg = ExceptionUtils.getStackTrace(e); try { - closeDBPool(user, propertyKey); + closeDBPool(user, dbPrefix); } catch (SQLException e1) { - logger.error("Cannot close DBPool for user, propertyKey: " + user + propertyKey, e1); + LOGGER.error("Cannot close DBPool for user, dbPrefix: " + user + dbPrefix, e1); } try { context.out.write(errorMsg); @@ -706,20 +713,20 @@ public class JDBCInterpreter extends KerberosInterpreter { getJDBCConfiguration(user).saveStatement(paragraphId, statement); String statementPrecode = - getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, propertyKey)); + getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, dbPrefix)); if (StringUtils.isNotBlank(statementPrecode)) { statement.execute(statementPrecode); } // start hive monitor thread if it is hive jdbc - if (getJDBCConfiguration(user).getPropertyMap(propertyKey).getProperty(URL_KEY) + if (getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY) .startsWith("jdbc:hive2://")) { HiveUtils.startHiveMonitorThread(statement, context, Boolean.parseBoolean(getProperty("hive.log.display", "true"))); } boolean isResultSetAvailable = statement.execute(sqlToExecute); - getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(propertyKey); + getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix); if (isResultSetAvailable) { resultSet = statement.getResultSet(); @@ -771,7 +778,7 @@ public class JDBCInterpreter extends KerberosInterpreter { } } } catch (Throwable e) { - logger.error("Cannot run " + sql, e); + LOGGER.error("Cannot run " + sql, e); return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); } finally { //In case user ran an insert/update/upsert statement @@ -841,11 +848,11 @@ public class JDBCInterpreter extends KerberosInterpreter { @Override public InterpreterResult internalInterpret(String cmd, InterpreterContext context) throws InterpreterException { - logger.debug("Run SQL command '{}'", cmd); - String propertyKey = getPropertyKey(context); - logger.debug("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd); + LOGGER.debug("Run SQL command '{}'", cmd); + String dbPrefix = getDBPrefix(context); + LOGGER.debug("DBPrefix: {}, SQL command: '{}'", dbPrefix, cmd); if (!isRefreshMode(context)) { - return executeSql(propertyKey, cmd.trim(), context); + return executeSql(dbPrefix, cmd.trim(), context); } else { int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval")); final String code = cmd.trim(); @@ -857,14 +864,14 @@ public class JDBCInterpreter extends KerberosInterpreter { refreshExecutor.scheduleAtFixedRate(() -> { context.out.clear(false); try { - InterpreterResult result = executeSql(propertyKey, code, context); + InterpreterResult result = executeSql(dbPrefix, code, context); context.out.flush(); interpreterResultRef.set(result); if (result.code() != Code.SUCCESS) { refreshExecutor.shutdownNow(); } } catch (Exception e) { - logger.warn("Fail to run sql", e); + LOGGER.warn("Fail to run sql", e); } }, 0, refreshInterval, TimeUnit.MILLISECONDS); @@ -872,7 +879,7 @@ public class JDBCInterpreter extends KerberosInterpreter { try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.error(""); + LOGGER.error(""); } } refreshExecutorServices.remove(context.getParagraphId()); @@ -890,7 +897,7 @@ public class JDBCInterpreter extends KerberosInterpreter { public void cancel(InterpreterContext context) { if (isRefreshMode(context)) { - logger.info("Shutdown refreshExecutorService for paragraph: " + context.getParagraphId()); + LOGGER.info("Shutdown refreshExecutorService for paragraph: {}", context.getParagraphId()); ScheduledExecutorService executorService = refreshExecutorServices.get(context.getParagraphId()); if (executorService != null) { @@ -900,19 +907,25 @@ public class JDBCInterpreter extends KerberosInterpreter { return; } - logger.info("Cancel current query statement."); + LOGGER.info("Cancel current query statement."); String paragraphId = context.getParagraphId(); JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(context.getAuthenticationInfo().getUser()); try { jdbcUserConfigurations.cancelStatement(paragraphId); } catch (SQLException e) { - logger.error("Error while cancelling...", e); + LOGGER.error("Error while cancelling...", e); } } - public String getPropertyKey(InterpreterContext interpreterContext) { - Map<String, String> localProperties = interpreterContext.getLocalProperties(); + /** + * + * + * @param context + * @return + */ + public String getDBPrefix(InterpreterContext context) { + Map<String, String> localProperties = context.getLocalProperties(); // It is recommended to use this kind of format: %jdbc(db=mysql) if (localProperties.containsKey("db")) { return localProperties.get("db"); @@ -949,7 +962,7 @@ public class JDBCInterpreter extends KerberosInterpreter { public List<InterpreterCompletion> completion(String buf, int cursor, InterpreterContext interpreterContext) throws InterpreterException { List<InterpreterCompletion> candidates = new ArrayList<>(); - String propertyKey = getPropertyKey(interpreterContext); + String propertyKey = getDBPrefix(interpreterContext); String sqlCompleterKey = String.format("%s.%s", interpreterContext.getAuthenticationInfo().getUser(), propertyKey); SqlCompleter sqlCompleter = sqlCompletersMap.get(sqlCompleterKey); @@ -960,7 +973,7 @@ public class JDBCInterpreter extends KerberosInterpreter { connection = getConnection(propertyKey, interpreterContext); } } catch (ClassNotFoundException | SQLException | IOException e) { - logger.warn("SQLCompleter will created without use connection"); + LOGGER.warn("SQLCompleter will created without use connection"); } sqlCompleter = createOrUpdateSqlCompleter(sqlCompleter, connection, propertyKey, buf, cursor); diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java index 4eac9fc..223b85b 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCUserConfigurations.java @@ -29,8 +29,11 @@ import org.apache.zeppelin.user.UsernamePassword; */ public class JDBCUserConfigurations { private final Map<String, Statement> paragraphIdStatementMap; + // dbPrefix --> PoolingDriver private final Map<String, PoolingDriver> poolingDriverMap; + // dbPrefix --> Properties private final HashMap<String, Properties> propertiesMap; + // dbPrefix --> Boolean private HashMap<String, Boolean> isSuccessful; public JDBCUserConfigurations() { @@ -52,40 +55,40 @@ public class JDBCUserConfigurations { isSuccessful.clear(); } - public void setPropertyMap(String key, Properties properties) { + public void setPropertyMap(String dbPrefix, Properties properties) { Properties p = (Properties) properties.clone(); - propertiesMap.put(key, p); + propertiesMap.put(dbPrefix, p); } public Properties getPropertyMap(String key) { return propertiesMap.get(key); } - public void cleanUserProperty(String propertyKey) { - propertiesMap.get(propertyKey).remove("user"); - propertiesMap.get(propertyKey).remove("password"); + public void cleanUserProperty(String dfPrefix) { + propertiesMap.get(dfPrefix).remove("user"); + propertiesMap.get(dfPrefix).remove("password"); } - public void setUserProperty(String propertyKey, UsernamePassword usernamePassword) { - propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername()); - propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword()); + public void setUserProperty(String dbPrefix, UsernamePassword usernamePassword) { + propertiesMap.get(dbPrefix).setProperty("user", usernamePassword.getUsername()); + propertiesMap.get(dbPrefix).setProperty("password", usernamePassword.getPassword()); } - public void saveStatement(String key, Statement statement) throws SQLException { - paragraphIdStatementMap.put(key, statement); + public void saveStatement(String paragraphId, Statement statement) throws SQLException { + paragraphIdStatementMap.put(paragraphId, statement); } - public void cancelStatement(String key) throws SQLException { - paragraphIdStatementMap.get(key).cancel(); + public void cancelStatement(String paragraphId) throws SQLException { + paragraphIdStatementMap.get(paragraphId).cancel(); } - public void removeStatement(String key) { - paragraphIdStatementMap.remove(key); + public void removeStatement(String paragraphId) { + paragraphIdStatementMap.remove(paragraphId); } - public void saveDBDriverPool(String key, PoolingDriver driver) throws SQLException { - poolingDriverMap.put(key, driver); - isSuccessful.put(key, false); + public void saveDBDriverPool(String dbPrefix, PoolingDriver driver) throws SQLException { + poolingDriverMap.put(dbPrefix, driver); + isSuccessful.put(dbPrefix, false); } public PoolingDriver removeDBDriverPool(String key) throws SQLException { isSuccessful.remove(key); @@ -96,14 +99,7 @@ public class JDBCUserConfigurations { return poolingDriverMap.containsKey(key); } - public void setConnectionInDBDriverPoolSuccessful(String key) { - isSuccessful.put(key, true); - } - - public boolean isConnectionInDBDriverPoolSuccessful(String key) { - if (isSuccessful.containsKey(key)) { - return isSuccessful.get(key); - } - return false; + public void setConnectionInDBDriverPoolSuccessful(String dbPrefix) { + isSuccessful.put(dbPrefix, true); } } diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json index c84056f..5aac46e 100644 --- a/jdbc/src/main/resources/interpreter-setting.json +++ b/jdbc/src/main/resources/interpreter-setting.json @@ -25,13 +25,6 @@ "description": "The JDBC user password", "type": "password" }, - "default.completer.ttlInSeconds": { - "envName": null, - "propertyName": "default.completer.ttlInSeconds", - "defaultValue": "120", - "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)", - "type": "number" - }, "default.driver": { "envName": null, "propertyName": "default.driver", @@ -39,6 +32,13 @@ "description": "JDBC Driver Name", "type": "string" }, + "default.completer.ttlInSeconds": { + "envName": null, + "propertyName": "default.completer.ttlInSeconds", + "defaultValue": "120", + "description": "Time to live sql completer in seconds (-1 to update everytime, 0 to disable update)", + "type": "number" + }, "default.completer.schemaFilters": { "envName": null, "propertyName": "default.completer.schemaFilters", diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java index f6fe108..92e0c24 100644 --- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java +++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/JDBCInterpreterTest.java @@ -14,11 +14,12 @@ */ package org.apache.zeppelin.jdbc; -import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter; -import net.jodah.concurrentunit.Waiter; + import org.apache.zeppelin.completer.CompletionType; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; + +import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; @@ -45,6 +46,9 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeoutException; +import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter; +import net.jodah.concurrentunit.Waiter; + import static java.lang.String.format; import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE; import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER; @@ -55,12 +59,13 @@ import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL; import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER; import static org.apache.zeppelin.jdbc.JDBCInterpreter.PRECODE_KEY_TEMPLATE; import static org.apache.zeppelin.jdbc.JDBCInterpreter.STATEMENT_PRECODE_KEY_TEMPLATE; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + /** * JDBC interpreter unit tests. */ @@ -116,21 +121,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { InterpreterContext interpreterContext = InterpreterContext.builder() .setLocalProperties(localProperties) .build(); - assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getPropertyKey(interpreterContext)); + assertEquals(JDBCInterpreter.DEFAULT_KEY, t.getDBPrefix(interpreterContext)); localProperties = new HashMap<>(); localProperties.put("db", "mysql"); interpreterContext = InterpreterContext.builder() .setLocalProperties(localProperties) .build(); - assertEquals("mysql", t.getPropertyKey(interpreterContext)); + assertEquals("mysql", t.getDBPrefix(interpreterContext)); localProperties = new HashMap<>(); localProperties.put("hive", "hive"); interpreterContext = InterpreterContext.builder() .setLocalProperties(localProperties) .build(); - assertEquals("hive", t.getPropertyKey(interpreterContext)); + assertEquals("hive", t.getDBPrefix(interpreterContext)); } @Test @@ -270,7 +275,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { InterpreterResult interpreterResult = t.interpret(sqlQuery, context); List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage(); - assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + assertEquals(interpreterResult.toString(), + InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType()); assertEquals("SOME_OTHER_NAME\na_name\n", resultMessages.get(0).getData()); } @@ -491,17 +497,21 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { assertEquals(true, completionList.contains(correctCompletionKeyword)); } - private Properties getDBProperty(String dbUser, + private Properties getDBProperty(String dbPrefix, + String dbUser, String dbPassowrd) throws IOException { Properties properties = new Properties(); properties.setProperty("common.max_count", "1000"); properties.setProperty("common.max_retry", "3"); - properties.setProperty("default.driver", "org.h2.Driver"); - properties.setProperty("default.url", getJdbcConnection()); - if (dbUser != null) { + if (!StringUtils.isBlank(dbPrefix)) { + properties.setProperty(dbPrefix + ".driver", "org.h2.Driver"); + properties.setProperty(dbPrefix + ".url", getJdbcConnection()); + properties.setProperty(dbPrefix + ".user", dbUser); + properties.setProperty(dbPrefix + ".password", dbPassowrd); + } else { + properties.setProperty("default.driver", "org.h2.Driver"); + properties.setProperty("default.url", getJdbcConnection()); properties.setProperty("default.user", dbUser); - } - if (dbPassowrd != null) { properties.setProperty("default.password", dbPassowrd); } return properties; @@ -521,75 +531,95 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter { } @Test - public void testMultiTenant() throws IOException, InterpreterException { - /* - * assume that the database user is 'dbuser' and password is 'dbpassword' - * 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property - * 'jdbc2' interpreter doesn't have user/password property - * 'user1' doesn't have Credential information. - * 'user2' has 'jdbc2' Credential information that is 'user2Id' / 'user2Pw' as id and password - */ - - JDBCInterpreter jdbc1 = new JDBCInterpreter(getDBProperty("dbuser", "dbpassword")); - JDBCInterpreter jdbc2 = new JDBCInterpreter(getDBProperty("", "")); - + public void testMultiTenant_1() throws IOException, InterpreterException { + // user1 %jdbc select from default db + // user2 %jdbc select from default db + // user2 %jdbc select from from hive db + Properties properties = getDBProperty("default", "dbuser", "dbpassword"); + properties.putAll(getDBProperty("hive", "", "")); + + JDBCInterpreter jdbc = new JDBCInterpreter(properties); AuthenticationInfo user1Credential = getUserAuth("user1", null, null, null); - AuthenticationInfo user2Credential = getUserAuth("user2", "jdbc.jdbc2", "user2Id", "user2Pw"); + AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw"); + jdbc.open(); - // user1 runs jdbc1 - jdbc1.open(); - InterpreterContext ctx1 = InterpreterContext.builder() - .setAuthenticationInfo(user1Credential) - .setInterpreterOut(new InterpreterOutput(null)) - .setReplName("jdbc1") - .build(); - jdbc1.interpret("", ctx1); + // user1 runs default + InterpreterContext context = InterpreterContext.builder() + .setAuthenticationInfo(user1Credential) + .setInterpreterOut(new InterpreterOutput(null)) + .setReplName("jdbc") + .build(); + jdbc.interpret("", context); - JDBCUserConfigurations user1JDBC1Conf = jdbc1.getJDBCConfiguration("user1"); + JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1"); assertEquals("dbuser", user1JDBC1Conf.getPropertyMap("default").get("user")); assertEquals("dbpassword", user1JDBC1Conf.getPropertyMap("default").get("password")); - jdbc1.close(); - - // user1 runs jdbc2 - jdbc2.open(); - InterpreterContext ctx2 = InterpreterContext.builder() - .setAuthenticationInfo(user1Credential) - .setReplName("jdbc2") - .build(); - jdbc2.interpret("", ctx2); - - JDBCUserConfigurations user1JDBC2Conf = jdbc2.getJDBCConfiguration("user1"); - assertNull(user1JDBC2Conf.getPropertyMap("default").get("user")); - assertNull(user1JDBC2Conf.getPropertyMap("default").get("password")); - jdbc2.close(); - // user2 runs jdbc1 - jdbc1.open(); - InterpreterContext ctx3 = InterpreterContext.builder() + // user2 run default + context = InterpreterContext.builder() .setAuthenticationInfo(user2Credential) .setInterpreterOut(new InterpreterOutput(null)) - .setReplName("jdbc1") + .setReplName("jdbc") .build(); - jdbc1.interpret("", ctx3); + jdbc.interpret("", context); - JDBCUserConfigurations user2JDBC1Conf = jdbc1.getJDBCConfiguration("user2"); + JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2"); assertEquals("dbuser", user2JDBC1Conf.getPropertyMap("default").get("user")); assertEquals("dbpassword", user2JDBC1Conf.getPropertyMap("default").get("password")); - jdbc1.close(); - // user2 runs jdbc2 - jdbc2.open(); - InterpreterContext ctx4 = InterpreterContext.builder() - .setAuthenticationInfo(user2Credential) - .setInterpreterOut(new InterpreterOutput(null)) - .setReplName("jdbc2") - .build(); - jdbc2.interpret("", ctx4); + // user2 run hive + Map<String, String> localProperties = new HashMap<>(); + localProperties.put("db", "hive"); + context = InterpreterContext.builder() + .setAuthenticationInfo(user2Credential) + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(localProperties) + .setReplName("jdbc") + .build(); + jdbc.interpret("", context); + + user2JDBC1Conf = jdbc.getJDBCConfiguration("user2"); + assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("hive").get("user")); + assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("hive").get("password")); + + jdbc.close(); + } + + @Test + public void testMultiTenant_2() throws IOException, InterpreterException { + // user1 %hive select from default db + // user2 %hive select from default db + Properties properties = getDBProperty("default", "", ""); + JDBCInterpreter jdbc = new JDBCInterpreter(properties); + AuthenticationInfo user1Credential = getUserAuth("user1", "hive", "user1Id", "user1Pw"); + AuthenticationInfo user2Credential = getUserAuth("user2", "hive", "user2Id", "user2Pw"); + jdbc.open(); + + // user1 runs default + InterpreterContext context = InterpreterContext.builder() + .setAuthenticationInfo(user1Credential) + .setInterpreterOut(new InterpreterOutput(null)) + .setReplName("hive") + .build(); + jdbc.interpret("", context); + + JDBCUserConfigurations user1JDBC1Conf = jdbc.getJDBCConfiguration("user1"); + assertEquals("user1Id", user1JDBC1Conf.getPropertyMap("default").get("user")); + assertEquals("user1Pw", user1JDBC1Conf.getPropertyMap("default").get("password")); + + // user2 run default + context = InterpreterContext.builder() + .setAuthenticationInfo(user2Credential) + .setInterpreterOut(new InterpreterOutput(null)) + .setReplName("hive") + .build(); + jdbc.interpret("", context); + + JDBCUserConfigurations user2JDBC1Conf = jdbc.getJDBCConfiguration("user2"); + assertEquals("user2Id", user2JDBC1Conf.getPropertyMap("default").get("user")); + assertEquals("user2Pw", user2JDBC1Conf.getPropertyMap("default").get("password")); - JDBCUserConfigurations user2JDBC2Conf = jdbc2.getJDBCConfiguration("user2"); - assertEquals("user2Id", user2JDBC2Conf.getPropertyMap("default").get("user")); - assertEquals("user2Pw", user2JDBC2Conf.getPropertyMap("default").get("password")); - jdbc2.close(); + jdbc.close(); } @Test diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 30d3330..e8c02ac 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -501,10 +501,13 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen private InterpreterContext getInterpreterContext() { AngularObjectRegistry registry = null; ResourcePool resourcePool = null; - + String replName = null; if (this.interpreter != null) { registry = this.interpreter.getInterpreterGroup().getAngularObjectRegistry(); resourcePool = this.interpreter.getInterpreterGroup().getResourcePool(); + InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup) + interpreter.getInterpreterGroup()).getInterpreterSetting(); + replName = interpreterSetting.getName(); } Credentials credentials = note.getCredentials(); @@ -523,7 +526,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen .setNoteId(note.getId()) .setNoteName(note.getName()) .setParagraphId(getId()) - .setReplName(intpText) + .setReplName(replName) .setParagraphTitle(title) .setParagraphText(text) .setAuthenticationInfo(subject)