KYLIN-3044, KYLIN-3052, support SQL Server & Redshift as kylin data source
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e787d7cd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e787d7cd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e787d7cd Branch: refs/heads/master Commit: e787d7cd1b13d87ec68373893ede8c1ea534c58b Parents: ca42b92 Author: etherge <[email protected]> Authored: Mon Dec 18 14:00:26 2017 +0800 Committer: Li Yang <[email protected]> Committed: Thu Dec 21 23:06:06 2017 -0600 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 22 +- .../main/resources/kylin-defaults.properties | 9 + .../org/apache/kylin/job/JoinedFlatTable.java | 38 ++- .../kylin/job/constant/ExecutableConstants.java | 1 + pom.xml | 2 +- .../apache/kylin/rest/service/TableService.java | 9 +- source-hive/pom.xml | 25 ++ .../apache/kylin/source/jdbc/JdbcDialect.java | 26 ++ .../apache/kylin/source/jdbc/JdbcExplorer.java | 288 ++++++++----------- .../kylin/source/jdbc/JdbcHiveMRInput.java | 132 +++++++-- .../kylin/source/jdbc/JdbcTableReader.java | 32 ++- .../org/apache/kylin/source/jdbc/SqlUtil.java | 140 ++++++--- .../jdbc/metadata/DefaultJdbcMetadata.java | 76 +++++ .../source/jdbc/metadata/IJdbcMetadata.java | 33 +++ .../jdbc/metadata/JdbcMetadataFactory.java | 35 +++ .../source/jdbc/metadata/MySQLJdbcMetadata.java | 67 +++++ .../jdbc/metadata/SQLServerJdbcMetadata.java | 61 ++++ .../kylin/source/jdbc/JdbcExplorerTest.java | 156 ++++++++++ .../apache/kylin/source/jdbc/SqlUtilTest.java | 46 +++ .../jdbc/metadata/DefaultJdbcMetadataTest.java | 126 ++++++++ .../jdbc/metadata/JdbcMetadataFactoryTest.java | 35 +++ .../jdbc/metadata/MySQLJdbcMetadataTest.java | 99 +++++++ .../metadata/SQLServerJdbcMetadataTest.java | 68 +++++ 23 files changed, 1252 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 8572fa3..21362e9 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -341,7 +341,7 @@ abstract public class KylinConfigBase implements Serializable { public String getHBaseMappingAdapter() { return getOptional("kylin.metadata.hbasemapping-adapter"); } - + public boolean isCheckCopyOnWrite() { return Boolean.parseBoolean(getOptional("kylin.metadata.check-copy-on-write", "false")); } @@ -752,23 +752,23 @@ abstract public class KylinConfigBase implements Serializable { // SOURCE.JDBC // ============================================================================ - public String getJdbcConnectionUrl() { + public String getJdbcSourceConnectionUrl() { return getOptional("kylin.source.jdbc.connection-url"); } - public String getJdbcDriver() { + public String getJdbcSourceDriver() { return getOptional("kylin.source.jdbc.driver"); } - public String getJdbcDialect() { + public String getJdbcSourceDialect() { return getOptional("kylin.source.jdbc.dialect"); } - public String getJdbcUser() { + public String getJdbcSourceUser() { return getOptional("kylin.source.jdbc.user"); } - public String getJdbcPass() { + public String getJdbcSourcePass() { return getOptional("kylin.source.jdbc.pass"); } @@ -776,6 +776,14 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.source.jdbc.sqoop-home"); } + public int getSqoopMapperNum() { + return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4")); + } + + public String getFieldDelimiter() { + return getOptional("kylin.source.jdbc.field-delimiter", "|"); + } + // ============================================================================ // STORAGE.HBASE // ============================================================================ @@ -1390,7 +1398,7 @@ abstract public class KylinConfigBase implements Serializable { public String getPerfLoggerClassName() { return getOptional("kylin.metrics.perflogger-class", "org.apache.kylin.common.metrics.perflog.PerfLogger"); } - + public boolean isShowingGuiTraceToggle() { return Boolean.valueOf(getOptional("kylin.htrace.show-gui-trace-toggle", "false")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/core-common/src/main/resources/kylin-defaults.properties ---------------------------------------------------------------------- diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index 4f36974..c7afc19 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -282,3 +282,12 @@ kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false #kylin.query.pushdown.jdbc.pool-max-total=8 #kylin.query.pushdown.jdbc.pool-max-idle=8 #kylin.query.pushdown.jdbc.pool-min-idle=0 + +### JDBC Data Source +#kylin.source.jdbc.connection-url= +#kylin.source.jdbc.driver= +#kylin.source.jdbc.dialect= +#kylin.source.jdbc.user= +#kylin.source.jdbc.pass= +#kylin.source.jdbc.sqoop-home= +#kylin.source.jdbc.filed-delimiter=| http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index d918777..27678ac 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.engine.JobEngineConfig; @@ -60,6 +61,11 @@ public class JoinedFlatTable { public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, String format) { + return generateCreateTableStatement(flatDesc, storageDfsDir, format, "|"); + } + + public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, + String format, String fieldDelimiter) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@ -74,7 +80,7 @@ public class JoinedFlatTable { } ddl.append(")" + "\n"); if ("TEXTFILE".equals(format)) { - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + fieldDelimiter + "'\n"); } ddl.append("STORED AS " + format + "\n"); ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); @@ -207,15 +213,13 @@ public class JoinedFlatTable { private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { final String sep = singleLine ? " " : "\n"; - boolean hasCondition = false; StringBuilder whereBuilder = new StringBuilder(); - whereBuilder.append("WHERE"); + whereBuilder.append("WHERE 1=1"); DataModelDesc model = flatDesc.getDataModel(); - if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) { - whereBuilder.append(" (").append(model.getFilterCondition()).append(") "); - hasCondition = true; + if (StringUtils.isNotEmpty(model.getFilterCondition())) { + whereBuilder.append(" AND (").append(model.getFilterCondition()).append(") "); } if (flatDesc.getSegment() != null) { @@ -224,18 +228,15 @@ public class JoinedFlatTable { SegmentRange segRange = flatDesc.getSegRange(); if (segRange != null && !segRange.isInfinite()) { - whereBuilder.append(hasCondition ? " AND (" : " ("); + whereBuilder.append(" AND ("); whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, flatDesc.getSegment(), segRange)); whereBuilder.append(")" + sep); - hasCondition = true; } } } - if (hasCondition) { - sql.append(whereBuilder.toString()); - } + sql.append(whereBuilder.toString()); } private static String colName(TblColRef col) { @@ -243,10 +244,19 @@ public class JoinedFlatTable { } private static String getHiveDataType(String javaDataType) { - String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; - hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; + String originDataType = javaDataType.toLowerCase(); + String hiveDataType; + if (originDataType.startsWith("varchar")) { + hiveDataType = "string"; + } else if (originDataType.startsWith("integer")) { + hiveDataType = "int"; + } else if (originDataType.startsWith("bigint")) { + hiveDataType = "bigint"; + } else { + hiveDataType = originDataType; + } - return hiveDataType.toLowerCase(); + return hiveDataType; } public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 2de3efa..fae93be 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -34,6 +34,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE = "Sqoop To Flat Hive Table"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid"; http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 07096a3..7e663ee 100644 --- a/pom.xml +++ b/pom.xml @@ -95,7 +95,7 @@ <jetty.version>9.2.20.v20161216</jetty.version> <jamm.version>0.3.1</jamm.version> <mockito.version>2.7.14</mockito.version> - + <powermock.version>1.7.0</powermock.version> <!-- Commons --> <commons-lang3.version>3.4</commons-lang3.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java index 1147ab1..6bb446c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java @@ -119,7 +119,7 @@ public class TableService extends BasicService { SetMultimap<String, String> db2tables = LinkedHashMultimap.create(); for (String fullTableName : tables) { String[] parts = HadoopUtil.parseHiveTableName(fullTableName); - db2tables.put(parts[0].toUpperCase(), parts[1].toUpperCase()); + db2tables.put(parts[0], parts[1]); } // load all tables first @@ -128,9 +128,10 @@ public class TableService extends BasicService { for (Map.Entry<String, String> entry : db2tables.entries()) { Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project); TableDesc tableDesc = pair.getFirst(); - Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey())); - Preconditions.checkState(tableDesc.getName().equals(entry.getValue())); - Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey() + "." + entry.getValue())); + Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase())); + Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase())); + Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry + .getValue().toUpperCase())); TableExtDesc extDesc = pair.getSecond(); Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity())); allMeta.add(pair); http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/pom.xml ---------------------------------------------------------------------- diff --git a/source-hive/pom.xml b/source-hive/pom.xml index 9a4d537..b9f87ee 100644 --- a/source-hive/pom.xml +++ b/source-hive/pom.xml @@ -77,6 +77,31 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-module-junit4</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-core</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.powermock</groupId> + <artifactId>powermock-api-mockito</artifactId> + <version>${powermock.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <!-- Powermock has conflict with newer version of Mockito, so use OLDER version here --> + <version>1.10.19</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java new file mode 100644 index 0000000..7e5ecee --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcDialect.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc; + +public class JdbcDialect { + public static final String DIALECT_VERTICA = "vertica"; + public static final String DIALECT_ORACLE = "oracle"; + public static final String DIALECT_MYSQL = "mysql"; + public static final String DIALECT_HIVE = "hive"; + public static final String DIALECT_MSSQL = "mssql"; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java index 736cf2e..1278128 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcExplorer.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -31,74 +32,153 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.DBUtils; import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeployer { private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); - - public static final String DIALECT_VERTICA="vertica"; - public static final String DIALECT_ORACLE="oracle"; - public static final String DIALECT_MYSQL="mysql"; - public static final String DIALECT_HIVE="hive"; - - public static final String TABLE_TYPE_TABLE="TABLE"; - public static final String TABLE_TYPE_VIEW="VIEW"; - - private KylinConfig config; - private DBConnConf dbconf; - private String dialect; + + private final KylinConfig config; + private final String dialect; + private final DBConnConf dbconf; + private final IJdbcMetadata jdbcMetadataDialect; public JdbcExplorer() { config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); - dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); + this.dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcSourceDialect(); + this.jdbcMetadataDialect = JdbcMetadataFactory.getJdbcMetadata(dialect, dbconf); + } + + @Override + public List<String> listDatabases() throws SQLException { + return jdbcMetadataDialect.listDatabases(); + } + + @Override + public List<String> listTables(String schema) throws SQLException { + return jdbcMetadataDialect.listTables(schema); + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) + throws SQLException { + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(table.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + tableDesc.setSourceType(ISourceAware.ID_JDBC); + + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + + try (ResultSet rs = jdbcMetadataDialect.getTable(dbmd, database, table)) { + String tableType = null; + while (rs.next()) { + tableType = rs.getString("TABLE_TYPE"); + } + if (tableType != null) { + tableDesc.setTableType(tableType); + } else { + throw new RuntimeException(String.format("table %s not found in schema:%s", table, database)); + } + } + + List<ColumnDesc> columns = new ArrayList<>(); + try (ResultSet rs = jdbcMetadataDialect.listColumns(dbmd, database, table)) { + while (rs.next()) { + String cname = rs.getString("COLUMN_NAME"); + int type = rs.getInt("DATA_TYPE"); + int csize = rs.getInt("COLUMN_SIZE"); + int digits = rs.getInt("DECIMAL_DIGITS"); + int pos = rs.getInt("ORDINAL_POSITION"); + String remarks = rs.getString("REMARKS"); + + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(cname.toUpperCase()); + + String kylinType = SqlUtil.jdbcTypetoKylinDataType(type); + int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1; + int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1; + + cdesc.setDatatype(new DataType(kylinType, precision, scale).toString()); + cdesc.setId(String.valueOf(pos)); + cdesc.setComment(remarks); + columns.add(cdesc); + } + } finally { + DBUtils.closeQuietly(con); + } + + tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); + + TableExtDesc tableExtDesc = new TableExtDesc(); + tableExtDesc.setIdentity(tableDesc.getIdentity()); + tableExtDesc.setUuid(UUID.randomUUID().toString()); + tableExtDesc.setLastModified(0); + tableExtDesc.init(prj); + + return Pair.newPair(tableDesc, tableExtDesc); } - + private String getSqlDataType(String javaDataType) { - if (DIALECT_VERTICA.equals(dialect)){ - if (javaDataType.toLowerCase().equals("double")){ + if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + if (javaDataType.toLowerCase().equals("double")) { return "float"; } } return javaDataType.toLowerCase(); } - + @Override public void createSampleDatabase(String database) throws Exception { executeSQL(generateCreateSchemaSql(database)); } - private String generateCreateSchemaSql(String schemaName){ - if (DIALECT_VERTICA.equals(dialect)){ + private String generateCreateSchemaSql(String schemaName) { + if (JdbcDialect.DIALECT_VERTICA.equals(dialect) || JdbcDialect.DIALECT_MYSQL.equals(dialect)) { return String.format("CREATE schema IF NOT EXISTS %s", schemaName); - }else{ + } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + return String.format("IF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'%s') EXEC('CREATE SCHEMA" + + " [%s] AUTHORIZATION [dbo]')", schemaName, schemaName); + } else { logger.error(String.format("unsupported dialect %s.", dialect)); return null; } } - + @Override public void loadSampleData(String tableName, String tmpDataDir) throws Exception { executeSQL(generateLoadDataSql(tableName, tmpDataDir)); } private String generateLoadDataSql(String tableName, String tableFileDir) { - if (DIALECT_VERTICA.equals(dialect)){ - return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); - }else{ + if (JdbcDialect.DIALECT_VERTICA.equals(dialect)) { + return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, + tableName); + } else if (JdbcDialect.DIALECT_MYSQL.equals(dialect)) { + return String.format("LOAD DATA INFILE '%s/%s.csv' INTO %s FIELDS TERMINATED BY ',';", tableFileDir, + tableName, tableName); + } else if (JdbcDialect.DIALECT_MSSQL.equals(dialect)) { + return String.format("BULK INSERT %s FROM '%s/%s.csv' WITH(FIELDTERMINATOR = ',')", tableName, tableFileDir, + tableName); + } else { logger.error(String.format("unsupported dialect %s.", dialect)); return null; } @@ -111,7 +191,8 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye private String[] generateCreateTableSql(TableDesc tableDesc) { logger.info(String.format("gen create table sql:%s", tableDesc)); - String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase(); + String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()) + .toUpperCase(); String dropsql = "DROP TABLE IF EXISTS " + tableIdentity; String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity; @@ -147,157 +228,20 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye return new String[] { dropView, dropTable, createSql }; } - private void executeSQL(String sql) throws CommandNeedRetryException, IOException { + private void executeSQL(String sql) throws CommandNeedRetryException, IOException, SQLException { Connection con = SqlUtil.getConnection(dbconf); logger.info(String.format(sql)); SqlUtil.execUpdateSQL(con, sql); - SqlUtil.closeResources(con, null); + DBUtils.closeQuietly(con); } - private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException { + private void executeSQL(String[] sqls) throws CommandNeedRetryException, IOException, SQLException { Connection con = SqlUtil.getConnection(dbconf); - for (String sql : sqls){ + for (String sql : sqls) { logger.info(String.format(sql)); SqlUtil.execUpdateSQL(con, sql); } - SqlUtil.closeResources(con, null); - } - - @Override - public List<String> listDatabases() throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getSchemas(); - List<String> ret = new ArrayList<String>(); - /* - The schema columns are: - - TABLE_SCHEM String => schema name - - TABLE_CATALOG String => catalog name (may be null) - */ - while (rs.next()){ - String schema = rs.getString(1); - String catalog = rs.getString(2); - logger.info(String.format("%s,%s", schema, catalog)); - ret.add(schema); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public List<String> listTables(String database) throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, null, null); - List<String> ret = new ArrayList<String>(); - /* - - TABLE_CAT String => table catalog (may be null) - - TABLE_SCHEM String => table schema (may be null) - - TABLE_NAME String => table name - - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL - TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". - - REMARKS String => explanatory comment on the table - - TYPE_CAT String => the types catalog (may be null) - - TYPE_SCHEM String => the types schema (may be null) - - TYPE_NAME String => type name (may be null) - - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed - table (may be null) - - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. - Values are "SYSTEM", "USER", "DERIVED". (may be null) - */ - while (rs.next()){ - String catalog = rs.getString(1); - String schema = rs.getString(2); - String name = rs.getString(3); - String type = rs.getString(4); - logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type)); - ret.add(name); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table, String prj) throws Exception { - - TableDesc tableDesc = new TableDesc(); - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(table.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - tableDesc.setLastModified(0); - - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, table, null); - String tableType=null; - while (rs.next()){ - tableType = rs.getString(4); - } - DBUtils.closeQuietly(rs); - if (tableType!=null){ - tableDesc.setTableType(tableType); - }else{ - logger.error(String.format("table %s not found in schema:%s", table, database)); - } - /* - - 1. TABLE_CAT String => table catalog (may be null) - - 2. TABLE_SCHEM String => table schema (may be null) - - 3. TABLE_NAME String => table name - - 4. COLUMN_NAME String => column name - - 5. DATA_TYPE int => SQL type from java.sql.Types - - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified - - 7. COLUMN_SIZE int => column size. - - 8. BUFFER_LENGTH is not used. - - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. - - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) - - 11.NULLABLE int => is NULL allowed. - - columnNoNulls - might not allow NULL values - - columnNullable - definitely allows NULL values - - columnNullableUnknown - nullability unknown - - 12.REMARKS String => comment describing column (may be null) - - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) - - 14.SQL_DATA_TYPE int => unused - - 15.SQL_DATETIME_SUB int => unused - - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column - - 17.ORDINAL_POSITION int => index of column in table (starting at 1) - - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. - - YES --- if the column can include NULLs - - NO --- if the column cannot include NULLs - - empty string --- if the nullability for the column is unknown - */ - List<ColumnDesc> columns = new ArrayList<ColumnDesc>(); - rs = dbmd.getColumns(null, database, table, null); - while (rs.next()){ - String tname = rs.getString(3); - String cname = rs.getString(4); - int type=rs.getInt(5); - String typeName=rs.getString(6); - int csize=rs.getInt(7); - int digits = rs.getInt(9); - int nullable = rs.getInt(11); - String comment = rs.getString(12); - int pos = rs.getInt(17); - logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos)); - - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(cname.toUpperCase()); - // use "double" in kylin for "float" - cdesc.setDatatype(typeName); - cdesc.setId(String.valueOf(pos)); - columns.add(cdesc); - } - DBUtils.closeQuietly(rs); DBUtils.closeQuietly(con); - - tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); - - TableExtDesc tableExtDesc = new TableExtDesc(); - tableExtDesc.setIdentity(tableDesc.getIdentity()); - tableExtDesc.setUuid(UUID.randomUUID().toString()); - tableExtDesc.setLastModified(0); - tableExtDesc.init(prj); - - return Pair.newPair(tableDesc, tableExtDesc); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java index ddd38db..59780e6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java @@ -18,6 +18,8 @@ package org.apache.kylin.source.jdbc; +import java.util.List; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -25,21 +27,27 @@ import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats; +import org.apache.kylin.metadata.model.TableRef; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.hive.HiveMRInput; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JdbcHiveMRInput extends HiveMRInput { - + private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class); - + public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { return new BatchCubingInputSide(flatDesc); } public static class BatchCubingInputSide extends HiveMRInput.BatchCubingInputSide { - + public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { super(flatDesc); } @@ -49,42 +57,124 @@ public class JdbcHiveMRInput extends HiveMRInput { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); final String jobWorkingDir = getJobWorkingDir(jobFlow); - + jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName)); jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir)); } private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) { final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE"); - + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String filedDelimiter = config.getFieldDelimiter(); + // Sqoop does not support exporting SEQUENSEFILE to Hive now SQOOP-869 + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, + "TEXTFILE", filedDelimiter); + HiveCmdStep step = new HiveCmdStep(); step.setCmd(hiveInitStatements + dropTableHql + createTableHql); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); return step; } - + + /** + * Choose a better split-by column for sqoop. The strategy is: + * 1. Prefer ClusteredBy column + * 2. Prefer DistributedBy column + * 3. Prefer Partition date column + * 4. Prefer Higher cardinality column + * 5. Prefer numeric column + * 6. Pick a column at first glance + * @return A column reference <code>TblColRef</code>for sqoop split-by + */ + private TblColRef determineSplitColumn() { + if (null != flatDesc.getClusterBy()) { + return flatDesc.getClusterBy(); + } + if (null != flatDesc.getDistributedBy()) { + return flatDesc.getDistributedBy(); + } + PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc(); + if (partitionDesc.isPartitioned()) { + return partitionDesc.getPartitionDateColumnRef(); + } + TblColRef splitColumn = null; + TableMetadataManager tblManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + long maxCardinality = 0; + for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) { + TableExtDesc tableExtDesc = tblManager.getTableExt(tableRef.getTableDesc()); + List<ColumnStats> columnStatses = tableExtDesc.getColumnStats(); + if (!columnStatses.isEmpty()) { + for (TblColRef colRef : tableRef.getColumns()) { + long cardinality = columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex()) + .getCardinality(); + splitColumn = cardinality > maxCardinality ? colRef : splitColumn; + } + } + } + if (null == splitColumn) { + for (TblColRef colRef : flatDesc.getAllColumns()) { + if (colRef.getType().isIntegerFamily()) { + return colRef; + } + } + splitColumn = flatDesc.getAllColumns().get(0); + } + + return splitColumn; + } + private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) { - KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); - String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname + KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName) + .getConfig(); + PartitionDesc partitionDesc = flatDesc.getDataModel().getPartitionDesc(); + String partCol = null; + String partitionString = null; + + if (partitionDesc.isPartitioned()) { + partCol = partitionDesc.getPartitionDateColumn();//tablename.colname + partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc, + flatDesc.getSegment(), flatDesc.getSegRange()); + } + + String splitTable; + String splitColumn; + String splitDatabase; + TblColRef splitColRef = determineSplitColumn(); + splitTable = splitColRef.getTableRef().getTableName(); + splitColumn = splitColRef.getName(); + splitDatabase = splitColRef.getColumnDesc().getTable().getDatabase(); + //using sqoop to extract data from jdbc source and dump them to hive - String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol}); + String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { partCol }); String hiveTable = flatDesc.getTableName(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); String sqoopHome = config.getSqoopHome(); - String cmd= String.format(String.format("%s/sqoop import " - + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " - + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, - jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol)); - logger.info(String.format("sqoop cmd:%s", cmd)); + String filedDelimiter = config.getFieldDelimiter(); + int mapperNum = config.getSqoopMapperNum(); + + String bquery = String.format("SELECT min(%s), max(%s) FROM %s.%s", splitColumn, splitColumn, splitDatabase, + splitTable); + if (partitionString != null) { + bquery += " WHERE " + partitionString; + } + + String cmd = String.format(String.format( + "%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true " + + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " + + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' " + + "--fields-terminated-by '%s' --num-mappers %d", + sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, + splitTable, splitColumn, bquery, filedDelimiter, mapperNum)); + logger.debug(String.format("sqoop cmd:%s", cmd)); CmdStep step = new CmdStep(); step.setCmd(cmd); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE); return step; } - + @Override protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { // skip http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java index b8865d6..e2616b7 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcTableReader.java @@ -23,6 +23,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.source.IReadableTable.TableReader; import org.apache.kylin.source.hive.DBConnConf; @@ -30,23 +31,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * An implementation of TableReader with HCatalog for Hive table. + * An implementation of TableReader with JDBC. */ public class JdbcTableReader implements TableReader { private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class); - + private String dbName; private String tableName; private DBConnConf dbconf; - private String dialect; private Connection jdbcCon; private Statement statement; private ResultSet rs; private int colCount; /** - * Constructor for reading whole hive table + * Constructor for reading whole jdbc table * @param dbName * @param tableName * @throws IOException @@ -55,22 +55,20 @@ public class JdbcTableReader implements TableReader { this.dbName = dbName; this.tableName = tableName; KylinConfig config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); + String connectionUrl = config.getJdbcSourceConnectionUrl(); + String driverClass = config.getJdbcSourceDriver(); + String jdbcUser = config.getJdbcSourceUser(); + String jdbcPass = config.getJdbcSourcePass(); dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); jdbcCon = SqlUtil.getConnection(dbconf); String sql = String.format("select * from %s.%s", dbName, tableName); try { statement = jdbcCon.createStatement(); rs = statement.executeQuery(sql); colCount = rs.getMetaData().getColumnCount(); - }catch(SQLException e){ + } catch (SQLException e) { throw new IOException(String.format("error while exec %s", sql), e); } - } @Override @@ -85,11 +83,17 @@ public class JdbcTableReader implements TableReader { @Override public String[] getRow() { String[] ret = new String[colCount]; - for (int i=1; i<=colCount; i++){ + for (int i = 1; i <= colCount; i++) { try { Object o = rs.getObject(i); - ret[i-1] = (o == null? null:o.toString()); - }catch(Exception e){ + String result; + if (null == o || o instanceof byte[]) { + result = null; + } else { + result = o.toString(); + } + ret[i - 1] = result; + } catch (Exception e) { logger.error("", e); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java index a112d87..79fab7d 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java @@ -21,87 +21,145 @@ package org.apache.kylin.source.jdbc; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; +import java.sql.Types; import java.util.Random; -import javax.sql.DataSource; - -import org.slf4j.LoggerFactory; +import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.source.hive.DBConnConf; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SqlUtil { private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class); - public static void closeResources(Connection con, Statement statement){ - try{ - if (statement!=null && !statement.isClosed()){ + public static void closeResources(Connection con, Statement statement) { + try { + if (statement != null && !statement.isClosed()) { statement.close(); } - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } - - try{ - if (con!=null && !con.isClosed()){ + + try { + if (con != null && !con.isClosed()) { con.close(); } - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } } - - - public static void execUpdateSQL(String sql, DataSource ds){ - Connection con = null; - try{ - con = ds.getConnection(); - execUpdateSQL(con, sql); - }catch(Exception e){ - logger.error("", e); - }finally{ - closeResources(con, null); - } - } - - public static void execUpdateSQL(Connection db, String sql){ - Statement statement=null; - try{ + + public static void execUpdateSQL(Connection db, String sql) { + Statement statement = null; + try { statement = db.createStatement(); - statement.executeUpdate(sql); - }catch(Exception e){ + statement.executeUpdate(sql); + } catch (Exception e) { logger.error("", e); - }finally{ + } finally { closeResources(null, statement); } } - - public static int tryTimes=10; - public static Connection getConnection(DBConnConf dbconf){ - if (dbconf.getUrl()==null) + + public static int tryTimes = 5; + + public static Connection getConnection(DBConnConf dbconf) { + if (dbconf.getUrl() == null) return null; Connection con = null; try { Class.forName(dbconf.getDriver()); - }catch(Exception e){ + } catch (Exception e) { logger.error("", e); } - boolean got=false; - int times=0; + boolean got = false; + int times = 0; Random r = new Random(); - while(!got && times<tryTimes){ + while (!got && times < tryTimes) { times++; try { con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass()); got = true; - }catch(Exception e){ + } catch (Exception e) { logger.warn("while use:" + dbconf, e); try { int rt = r.nextInt(10); - Thread.sleep(rt*1000); + Thread.sleep(rt * 1000); } catch (InterruptedException e1) { } } } + if (null == con) { + throw new RuntimeException("Can not connect to the data source."); + } return con; } + + public static String jdbcTypetoKylinDataType(int sqlType) { + String result = "any"; + + switch (sqlType) { + case Types.CHAR: + result = "char"; + break; + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + result = "varchar"; + break; + case Types.NUMERIC: + case Types.DECIMAL: + result = "decimal"; + break; + case Types.BIT: + case Types.BOOLEAN: + result = "boolean"; + break; + case Types.TINYINT: + result = "tinyint"; + break; + case Types.SMALLINT: + result = "smallint"; + break; + case Types.INTEGER: + result = "integer"; + break; + case Types.BIGINT: + result = "bigint"; + break; + case Types.REAL: + case Types.FLOAT: + case Types.DOUBLE: + result = "double"; + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + result = "byte"; + break; + case Types.DATE: + result = "date"; + break; + case Types.TIME: + result = "time"; + break; + case Types.TIMESTAMP: + result = "timestamp"; + break; + default: + //do nothing + break; + } + + return result; + } + + public static boolean isPrecisionApplicable(String typeName) { + return isScaleApplicable(typeName) || DataType.STRING_FAMILY.contains(typeName); + } + + public static boolean isScaleApplicable(String typeName) { + return DataType.NUMBER_FAMILY.contains(typeName) && !DataType.INTEGER_FAMILY.contains(typeName); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java new file mode 100644 index 0000000..f4ffc23 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadata.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultJdbcMetadata implements IJdbcMetadata { + private final static Logger logger = LoggerFactory.getLogger(DefaultJdbcMetadata.class); + protected DBConnConf dbconf; + + public DefaultJdbcMetadata(DBConnConf dbConnConf) { + this.dbconf = dbConnConf; + } + + @Override + public List<String> listDatabases() throws SQLException { + List<String> ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); ResultSet rs = con.getMetaData().getSchemas()) { + while (rs.next()) { + String schema = rs.getString("TABLE_SCHEM"); + String catalog = rs.getString("TABLE_CATALOG"); + logger.info(String.format("%s,%s", schema, catalog)); + ret.add(schema); + } + } + return ret; + } + + @Override + public List<String> listTables(String schema) throws SQLException { + List<String> ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); + ResultSet rs = con.getMetaData().getTables(null, schema, null, null)) { + while (rs.next()) { + String name = rs.getString("TABLE_NAME"); + ret.add(name); + } + } + return ret; + } + + @Override + public ResultSet getTable(final DatabaseMetaData dbmd, String schema, String table) throws SQLException { + return dbmd.getTables(null, schema, table, null); + } + + @Override + public ResultSet listColumns(final DatabaseMetaData dbmd, String schema, String table) throws SQLException { + return dbmd.getColumns(null, schema, table, null); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java new file mode 100644 index 0000000..169fe60 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/IJdbcMetadata.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +public interface IJdbcMetadata { + List<String> listDatabases() throws SQLException; + + List<String> listTables(String database) throws SQLException; + + ResultSet getTable(final DatabaseMetaData dbmd, String database, String table) throws SQLException; + + ResultSet listColumns(final DatabaseMetaData dbmd, String database, String table) throws SQLException; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java new file mode 100644 index 0000000..4100f79 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.JdbcDialect; + +public abstract class JdbcMetadataFactory { + public static IJdbcMetadata getJdbcMetadata(String dialect, final DBConnConf dbConnConf) { + String jdbcDialect = (null == dialect) ? "" : dialect.toLowerCase(); + switch (jdbcDialect) { + case (JdbcDialect.DIALECT_MSSQL): + return new SQLServerJdbcMetadata(dbConnConf); + case (JdbcDialect.DIALECT_MYSQL): + return new MySQLJdbcMetadata(dbConnConf); + default: + return new DefaultJdbcMetadata(dbConnConf); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java new file mode 100644 index 0000000..54c2a03 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadata.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; + +public class MySQLJdbcMetadata extends DefaultJdbcMetadata { + public MySQLJdbcMetadata(DBConnConf dbConnConf) { + super(dbConnConf); + } + + @Override + public List<String> listDatabases() throws SQLException { + List<String> ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf)) { + ret.add(con.getCatalog()); + } + return ret; + } + + @Override + public List<String> listTables(String catalog) throws SQLException { + List<String> ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf); + ResultSet res = con.getMetaData().getTables(catalog, null, null, null)) { + String table; + while (res.next()) { + table = res.getString("TABLE_NAME"); + ret.add(table); + } + } + return ret; + } + + @Override + public ResultSet listColumns(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException { + return dbmd.getColumns(catalog, null, table, null); + } + + @Override + public ResultSet getTable(final DatabaseMetaData dbmd, String catalog, String table) throws SQLException { + return dbmd.getTables(catalog, null, table, null); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java new file mode 100644 index 0000000..1a34b37 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/metadata/SQLServerJdbcMetadata.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.jdbc.metadata; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; + +import com.google.common.base.Preconditions; + +public class SQLServerJdbcMetadata extends DefaultJdbcMetadata { + public SQLServerJdbcMetadata(DBConnConf dbConnConf) { + super(dbConnConf); + } + + @Override + public List<String> listDatabases() throws SQLException { + List<String> ret = new ArrayList<>(); + try (Connection con = SqlUtil.getConnection(dbconf)) { + + String database = con.getCatalog(); + Preconditions.checkArgument(StringUtils.isNotEmpty(database), + "SQL Server needs a specific database in " + "connection string."); + + try (ResultSet rs = con.getMetaData().getSchemas(database, "%")) { + String schema; + String catalog; + while (rs.next()) { + schema = rs.getString("TABLE_SCHEM"); + catalog = rs.getString("TABLE_CATALOG"); + // Skip system schemas + if (database.equals(catalog)) { + ret.add(schema); + } + } + } + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java new file mode 100644 index 0000000..b269329 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/JdbcExplorerTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.metadata.DefaultJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.IJdbcMetadata; +import org.apache.kylin.source.jdbc.metadata.JdbcMetadataFactory; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ JdbcMetadataFactory.class, SqlUtil.class }) + +public class JdbcExplorerTest extends LocalFileMetadataTestCase { + private JdbcExplorer jdbcExplorer; + private static Connection connection; + private static DatabaseMetaData dbmd; + private IJdbcMetadata jdbcMetadata; + + @BeforeClass + public static void setupClass() throws SQLException { + staticCreateTestMetadata(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + kylinConfig.setProperty("kylin.source.jdbc.connection-url", "jdbc:vertica://fakehost:1433/database"); + kylinConfig.setProperty("kylin.source.jdbc.driver", "com.vertica.jdbc.Driver"); + kylinConfig.setProperty("kylin.source.jdbc.user", "user"); + kylinConfig.setProperty("kylin.source.jdbc.pass", ""); + kylinConfig.setProperty("kylin.source.jdbc.dialect", "vertica"); + } + + @Before + public void setup() throws SQLException { + connection = mock(Connection.class); + dbmd = mock(DatabaseMetaData.class); + jdbcMetadata = mock(DefaultJdbcMetadata.class); + + PowerMockito.stub(PowerMockito.method(SqlUtil.class, "getConnection")).toReturn(connection); + PowerMockito.mockStatic(JdbcMetadataFactory.class); + + when(JdbcMetadataFactory.getJdbcMetadata(anyString(), any(DBConnConf.class))).thenReturn(jdbcMetadata); + when(connection.getMetaData()).thenReturn(dbmd); + + jdbcExplorer = spy(JdbcExplorer.class); + } + + @Test + public void testListDatabases() throws SQLException { + List<String> databases = new ArrayList<>(); + databases.add("DB1"); + databases.add("DB2"); + when(jdbcMetadata.listDatabases()).thenReturn(databases); + + List<String> result = jdbcExplorer.listDatabases(); + + verify(jdbcMetadata, times(1)).listDatabases(); + Assert.assertEquals(databases, result); + } + + @Test + public void testListTables() throws SQLException { + List<String> tables = new ArrayList<>(); + tables.add("T1"); + tables.add("T2"); + String databaseName = "testDb"; + when(jdbcMetadata.listTables(databaseName)).thenReturn(tables); + + List<String> result = jdbcExplorer.listTables(databaseName); + verify(jdbcMetadata, times(1)).listTables(databaseName); + Assert.assertEquals(tables, result); + } + + @Test + public void testLoadTableMetadata() throws SQLException { + String tableName = "tb1"; + String databaseName = "testdb"; + ResultSet rs1 = mock(ResultSet.class); + when(rs1.next()).thenReturn(true).thenReturn(false); + when(rs1.getString("TABLE_TYPE")).thenReturn("TABLE"); + + ResultSet rs2 = mock(ResultSet.class); + when(rs2.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs2.getString("COLUMN_NAME")).thenReturn("COL1").thenReturn("COL2").thenReturn("COL3"); + when(rs2.getInt("DATA_TYPE")).thenReturn(Types.VARCHAR).thenReturn(Types.INTEGER).thenReturn(Types.DECIMAL); + when(rs2.getInt("COLUMN_SIZE")).thenReturn(128).thenReturn(10).thenReturn(19); + when(rs2.getInt("DECIMAL_DIGITS")).thenReturn(0).thenReturn(0).thenReturn(4); + when(rs2.getInt("ORDINAL_POSITION")).thenReturn(1).thenReturn(3).thenReturn(2); + when(rs2.getString("REMARKS")).thenReturn("comment1").thenReturn("comment2").thenReturn("comment3"); + + when(jdbcMetadata.getTable(dbmd, databaseName, tableName)).thenReturn(rs1); + when(jdbcMetadata.listColumns(dbmd, databaseName, tableName)).thenReturn(rs2); + + Pair<TableDesc, TableExtDesc> result = jdbcExplorer.loadTableMetadata(databaseName, tableName, "proj"); + TableDesc tableDesc = result.getFirst(); + ColumnDesc columnDesc = tableDesc.getColumns()[1]; + + Assert.assertEquals(databaseName.toUpperCase(), tableDesc.getDatabase()); + Assert.assertEquals(3, tableDesc.getColumnCount()); + Assert.assertEquals("TABLE", tableDesc.getTableType()); + Assert.assertEquals("COL2", columnDesc.getName()); + Assert.assertEquals("integer", columnDesc.getTypeName()); + Assert.assertEquals("comment2", columnDesc.getComment()); + Assert.assertEquals(databaseName.toUpperCase() + "." + tableName.toUpperCase(), + result.getSecond().getIdentity()); + } + + @AfterClass + public static void clenup() { + staticCleanupTestMetadata(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java new file mode 100644 index 0000000..d952675 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/SqlUtilTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc; + +import java.sql.Types; + +import org.junit.Assert; +import org.junit.Test; + +public class SqlUtilTest { + + @Test + public void testJdbcTypetoKylinDataType() { + this.getClass().getClassLoader().toString(); + Assert.assertEquals("double", SqlUtil.jdbcTypetoKylinDataType(Types.FLOAT)); + Assert.assertEquals("varchar", SqlUtil.jdbcTypetoKylinDataType(Types.NVARCHAR)); + Assert.assertEquals("any", SqlUtil.jdbcTypetoKylinDataType(Types.ARRAY)); + } + + @Test + public void testIsPrecisionApplicable() { + Assert.assertFalse(SqlUtil.isPrecisionApplicable("boolean")); + Assert.assertTrue(SqlUtil.isPrecisionApplicable("varchar")); + } + + @Test + public void testIsScaleApplicable() { + Assert.assertFalse(SqlUtil.isScaleApplicable("varchar")); + Assert.assertTrue(SqlUtil.isScaleApplicable("decimal")); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java new file mode 100644 index 0000000..43d467d --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/DefaultJdbcMetadataTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.apache.kylin.source.jdbc.SqlUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(SqlUtil.class) +public class DefaultJdbcMetadataTest { + protected DBConnConf dbConnConf; + protected Connection connection; + protected DatabaseMetaData dbmd; + protected IJdbcMetadata jdbcMetadata; + + @Before + public void setup() { + dbConnConf = new DBConnConf(); + dbConnConf.setUrl("jdbc:vertica://fakehost:1433/database"); + dbConnConf.setDriver("com.vertica.jdbc.Driver"); + dbConnConf.setUser("user"); + dbConnConf.setPass("pass"); + jdbcMetadata = new DefaultJdbcMetadata(dbConnConf); + + setupProperties(); + } + + protected void setupProperties() { + connection = mock(Connection.class); + dbmd = mock(DatabaseMetaData.class); + + PowerMockito.mockStatic(SqlUtil.class); + when(SqlUtil.getConnection(dbConnConf)).thenReturn(connection); + } + + @Test + public void testListDatabases() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_SCHEM")).thenReturn("schema1").thenReturn("schema2"); + when(rs.getString("TABLE_CATALOG")).thenReturn("catalog1").thenReturn("catalog2"); + + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getSchemas()).thenReturn(rs); + + List<String> dbs = jdbcMetadata.listDatabases(); + + Assert.assertEquals(2, dbs.size()); + Assert.assertEquals("schema1", dbs.get(0)); + } + + @Test + public void testListTables() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT"); + + String schema = "testschema"; + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getTables(null, schema, null, null)).thenReturn(rs); + + List<String> tables = jdbcMetadata.listTables(schema); + + Assert.assertEquals(3, tables.size()); + Assert.assertEquals("CAT_DT", tables.get(1)); + } + + @Test + public void testGetTable() throws SQLException { + String schema = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getTables(null, schema, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.getTable(dbmd, schema, table); + + verify(dbmd, times(1)).getTables(null, schema, table, null); + Assert.assertEquals(rs, result); + } + + @Test + public void testListColumns() throws SQLException { + String schema = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getColumns(null, schema, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.listColumns(dbmd, schema, table); + + verify(dbmd, times(1)).getColumns(null, schema, table, null); + Assert.assertEquals(rs, result); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java new file mode 100644 index 0000000..d9c7425 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/JdbcMetadataFactoryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import org.apache.kylin.source.jdbc.JdbcDialect; +import org.junit.Assert; +import org.junit.Test; + +public class JdbcMetadataFactoryTest { + + @Test + public void testGetJdbcMetadata() { + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MSSQL, null) instanceof SQLServerJdbcMetadata); + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_MYSQL, null) instanceof MySQLJdbcMetadata); + Assert.assertTrue( + JdbcMetadataFactory.getJdbcMetadata(JdbcDialect.DIALECT_VERTICA, null) instanceof DefaultJdbcMetadata); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e787d7cd/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java ---------------------------------------------------------------------- diff --git a/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java new file mode 100644 index 0000000..e461c15 --- /dev/null +++ b/source-hive/src/test/java/org/apache/kylin/source/jdbc/metadata/MySQLJdbcMetadataTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.source.jdbc.metadata; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.kylin.source.hive.DBConnConf; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MySQLJdbcMetadataTest extends DefaultJdbcMetadataTest { + + @Before + public void setup() { + dbConnConf = new DBConnConf(); + dbConnConf.setUrl("jdbc:mysql://fakehost:1433/database"); + dbConnConf.setDriver("com.mysql.jdbc.Driver"); + dbConnConf.setUser("user"); + dbConnConf.setPass("pass"); + jdbcMetadata = new MySQLJdbcMetadata(dbConnConf); + + setupProperties(); + } + + @Test + public void testListDatabases() throws SQLException { + when(connection.getCatalog()).thenReturn("catalog1"); + + List<String> dbs = jdbcMetadata.listDatabases(); + + Assert.assertEquals(1, dbs.size()); + Assert.assertEquals("catalog1", dbs.get(0)); + } + + @Test + public void testListTables() throws SQLException { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true).thenReturn(true).thenReturn(true).thenReturn(false); + when(rs.getString("TABLE_NAME")).thenReturn("KYLIN_SALES").thenReturn("CAT_DT").thenReturn("KYLIN_CAT"); + + String catalog = "testCatalog"; + when(connection.getMetaData()).thenReturn(dbmd); + when(dbmd.getTables(catalog, null, null, null)).thenReturn(rs); + + List<String> tables = jdbcMetadata.listTables(catalog); + + Assert.assertEquals(3, tables.size()); + Assert.assertEquals("CAT_DT", tables.get(1)); + } + + @Test + public void testGetTable() throws SQLException { + String catalog = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getTables(catalog, null, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.getTable(dbmd, catalog, table); + + verify(dbmd, times(1)).getTables(catalog, null, table, null); + Assert.assertEquals(rs, result); + } + + @Test + public void testListColumns() throws SQLException { + String catalog = "testSchema"; + String table = "testTable"; + ResultSet rs = mock(ResultSet.class); + when(dbmd.getColumns(catalog, null, table, null)).thenReturn(rs); + + ResultSet result = jdbcMetadata.listColumns(dbmd, catalog, table); + + verify(dbmd, times(1)).getColumns(catalog, null, table, null); + Assert.assertEquals(rs, result); + } +}
