KYLIN-1351 code review
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5e1c9be4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5e1c9be4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5e1c9be4 Branch: refs/heads/master Commit: 5e1c9be45fefbfd3ff890e51cdf78c50b0ed8106 Parents: c7f32b8 Author: Li Yang <liy...@apache.org> Authored: Tue Jun 6 16:19:06 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Tue Jun 6 16:36:50 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/DeployUtil.java | 118 +++---- .../apache/kylin/common/KylinConfigBase.java | 4 +- .../java/org/apache/kylin/cube/CubeManager.java | 8 - .../kylin/cube/cli/DictionaryGeneratorCLI.java | 11 +- .../InMemCubeBuilderInputConverter.java | 1 + .../org/apache/kylin/job/JoinedFlatTable.java | 25 +- .../kylin/metadata/model/ISourceAware.java | 1 - .../kylin/source/ISampleDataDeployer.java | 47 +++ .../java/org/apache/kylin/source/ISource.java | 6 + .../source/datagen/ModelDataGenerator.java | 8 +- .../kylin/provision/BuildCubeWithEngine.java | 14 +- .../kylin/source/hive/BeelineHiveClient.java | 21 -- .../apache/kylin/source/hive/CLIHiveClient.java | 55 ---- .../org/apache/kylin/source/hive/CmdStep.java | 69 ----- .../apache/kylin/source/hive/DBConnConf.java | 34 ++- .../kylin/source/hive/HiveClientFactory.java | 13 - .../apache/kylin/source/hive/HiveCmdStep.java | 77 ----- .../apache/kylin/source/hive/HiveMRInput.java | 86 ++---- .../kylin/source/hive/HiveMetadataExplorer.java | 78 ++++- .../apache/kylin/source/hive/HiveSource.java | 17 ++ .../apache/kylin/source/hive/IHiveClient.java | 9 +- .../apache/kylin/source/hive/IJDBCExecutor.java | 39 --- .../apache/kylin/source/hive/JdbcExplorer.java | 289 ------------------ .../apache/kylin/source/hive/JdbcSource.java | 60 ---- .../org/apache/kylin/source/hive/JdbcTable.java | 67 ---- .../kylin/source/hive/JdbcTableReader.java | 106 ------- .../org/apache/kylin/source/hive/SqlUtil.java | 106 ------- .../org/apache/kylin/source/jdbc/CmdStep.java | 69 +++++ .../apache/kylin/source/jdbc/HiveCmdStep.java | 77 +++++ .../apache/kylin/source/jdbc/JdbcExplorer.java | 305 +++++++++++++++++++ .../kylin/source/jdbc/JdbcHiveMRInput.java | 93 ++++++ .../apache/kylin/source/jdbc/JdbcSource.java | 66 ++++ .../org/apache/kylin/source/jdbc/JdbcTable.java | 67 ++++ .../kylin/source/jdbc/JdbcTableReader.java | 107 +++++++ .../org/apache/kylin/source/jdbc/SqlUtil.java | 107 +++++++ .../apache/kylin/source/kafka/KafkaSource.java | 8 +- 36 files changed, 1161 insertions(+), 1107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index 077c056..939e839 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -26,14 +26,15 @@ import java.io.FileReader; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.HashSet; import java.util.List; +import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.ResourceTool; -import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.CubeInstance; @@ -41,14 +42,12 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.streaming.StreamDataLoader; import org.apache.kylin.job.streaming.StreamingTableDataGenerator; import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.ISampleDataDeployer; +import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.datagen.ModelDataGenerator; -import org.apache.kylin.source.hive.HiveClientFactory; -import org.apache.kylin.source.hive.IHiveClient; import org.apache.kylin.source.kafka.TimedJsonStreamParser; import org.apache.maven.model.Model; import org.apache.maven.model.io.xpp3.MavenXpp3Reader; @@ -66,16 +65,20 @@ public class DeployUtil { execCliCommand("mkdir -p " + config().getKylinJobLogDir()); } - public static void deployMetadata() throws IOException { + public static void deployMetadata(String localMetaData) throws IOException { // install metadata to hbase ResourceTool.reset(config()); - ResourceTool.copy(KylinConfig.createInstanceFromUri(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA), config()); + ResourceTool.copy(KylinConfig.createInstanceFromUri(localMetaData), config()); // update cube desc signature. for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) { CubeDescManager.getInstance(config()).updateCubeDesc(cube.getDescriptor());//enforce signature updating } } + + public static void deployMetadata() throws IOException { + deployMetadata(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA); + } public static void overrideJobJarLocations() { File jobJar = getJobJarFile(); @@ -121,20 +124,6 @@ public class DeployUtil { // ============================================================================ - static final String TABLE_CAL_DT = "edw.test_cal_dt"; - static final String TABLE_CATEGORY_GROUPINGS = "default.test_category_groupings"; - static final String TABLE_KYLIN_FACT = "default.test_kylin_fact"; - static final String TABLE_ORDER = "default.test_order"; - static final String TABLE_ACCOUNT = "default.test_account"; - static final String TABLE_COUNTRY = "default.test_country"; - static final String VIEW_SELLER_TYPE_DIM = "edw.test_seller_type_dim"; - static final String TABLE_SELLER_TYPE_DIM_TABLE = "edw.test_seller_type_dim_table"; - static final String TABLE_SITES = "edw.test_sites"; - - static final String[] TABLE_NAMES = new String[] { // - TABLE_CAL_DT, TABLE_ORDER, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, // - TABLE_SELLER_TYPE_DIM_TABLE, TABLE_SITES, TABLE_ACCOUNT, TABLE_COUNTRY }; - public static void prepareTestDataForNormalCubes(String modelName) throws Exception { boolean buildCubeUsingProvidedData = Boolean.parseBoolean(System.getProperty("buildCubeUsingProvidedData")); @@ -143,14 +132,13 @@ public class DeployUtil { // data is generated according to cube descriptor and saved in resource store MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - DataModelDesc model = mgr.getDataModelDesc(modelName); - ModelDataGenerator gen = new ModelDataGenerator(model, 10000); + ModelDataGenerator gen = new ModelDataGenerator(mgr.getDataModelDesc(modelName), 10000); gen.generate(); } else { System.out.println("build normal cubes with provided dataset"); } - deployHiveTables(); + deployTables(modelName); } public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException { @@ -204,10 +192,21 @@ public class DeployUtil { } - private static void deployHiveTables() throws Exception { + private static void deployTables(String modelName) throws Exception { MetadataManager metaMgr = MetadataManager.getInstance(config()); - + DataModelDesc model = metaMgr.getDataModelDesc(modelName); + + Set<TableRef> tables = model.getAllTables(); + Set<String> TABLE_NAMES = new HashSet<String>(); + for (TableRef tr:tables){ + if (!tr.getTableDesc().isView()){ + String tableName = tr.getTableName(); + String schema = tr.getTableDesc().getDatabase(); + String identity = String.format("%s.%s", schema, tableName); + TABLE_NAMES.add(identity); + } + } // scp data files, use the data from hbase, instead of local files File tempDir = Files.createTempDir(); String tempDirAbsPath = tempDir.getAbsolutePath(); @@ -217,6 +216,7 @@ public class DeployUtil { File localBufferFile = new File(tempDirAbsPath + "/" + tablename + ".csv"); localBufferFile.createNewFile(); + logger.info(String.format("get resource from hbase:/data/%s.csv", tablename)); InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream; FileOutputStream localFileStream = new FileOutputStream(localBufferFile); IOUtils.copy(hbaseDataStream, localFileStream); @@ -228,67 +228,25 @@ public class DeployUtil { } tempDir.deleteOnExit(); - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + ISampleDataDeployer sampleDataDeployer = SourceFactory.getSource(model.getRootFactTable().getTableDesc()) + .getSampleDataDeployer(); + // create hive tables - hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW"); + sampleDataDeployer.createSampleDatabase("EDW"); for (String tablename : TABLE_NAMES) { - hiveClient.executeHQL(generateCreateTableHql(metaMgr.getTableDesc(tablename.toUpperCase()))); + logger.info(String.format("get table desc %s", tablename)); + sampleDataDeployer.createSampleTable(metaMgr.getTableDesc(tablename)); } // load data to hive tables // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename for (String tablename : TABLE_NAMES) { - hiveClient.executeHQL(generateLoadDataHql(tablename.toUpperCase(), tempDirAbsPath)); + sampleDataDeployer.loadSampleData(tablename, tempDirAbsPath); } - - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatements(generateCreateViewHql(VIEW_SELLER_TYPE_DIM, TABLE_SELLER_TYPE_DIM_TABLE)); - - config().getCliCommandExecutor().execute(hiveCmdBuilder.build()); - } - - private static String generateLoadDataHql(String tableName, String tableFileDir) { - return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName + ".csv' OVERWRITE INTO TABLE " + tableName; - } - - private static String[] generateCreateTableHql(TableDesc tableDesc) { - - String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); - String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity(); - - StringBuilder ddl = new StringBuilder(); - ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); - ddl.append("(" + "\n"); - - for (int i = 0; i < tableDesc.getColumns().length; i++) { - ColumnDesc col = tableDesc.getColumns()[i]; - if (i > 0) { - ddl.append(","); - } - ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n"); - } - - ddl.append(")" + "\n"); - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); - ddl.append("STORED AS TEXTFILE"); - - return new String[] { dropsql, dropsql2, ddl.toString() }; - } - - private static String[] generateCreateViewHql(String viewName, String tableName) { - - String dropView = "DROP VIEW IF EXISTS " + viewName + ";\n"; - String dropTable = "DROP TABLE IF EXISTS " + viewName + ";\n"; - - String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + ";\n"); - - return new String[] { dropView, dropTable, createSql }; - } - - private static String getHiveDataType(String javaDataType) { - String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; - hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; - - return hiveDataType.toLowerCase(); + + //TODO create the view automatically here + final String VIEW_SELLER_TYPE_DIM = "edw.test_seller_type_dim"; + final String TABLE_SELLER_TYPE_DIM_TABLE = "edw.test_seller_type_dim_table"; + sampleDataDeployer.createWrapperView(TABLE_SELLER_TYPE_DIM_TABLE, VIEW_SELLER_TYPE_DIM); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index a776a61..59907f8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -594,7 +594,7 @@ abstract public class KylinConfigBase implements Serializable { // ============================================================================ public String getJdbcConnectionUrl() { - return getOptional("kylin.source.jdbc.connectionUrl"); + return getOptional("kylin.source.jdbc.connection-url"); } public String getJdbcDriver() { @@ -614,7 +614,7 @@ abstract public class KylinConfigBase implements Serializable { } public String getSqoopHome() { - return getOptional("kylin.source.sqoop.home"); + return getOptional("kylin.source.jdbc.sqoop-home"); } // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index c348055..54902fb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -54,7 +54,6 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; -import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; @@ -283,13 +282,6 @@ public class CubeManager implements IRealizationProvider { SnapshotManager snapshotMgr = getSnapshotManager(); TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable)); - if (tableDesc.isView() && - tableDesc.getSourceType()!=ISourceAware.ID_JDBC) { - String tableName = tableDesc.getMaterializedName(); - tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); - tableDesc.setName(tableName); - } - IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index 9555bc3..881d87d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -32,7 +32,6 @@ import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; @@ -112,15 +111,7 @@ public class DictionaryGeneratorCLI { } else { MetadataManager metadataManager = MetadataManager.getInstance(config); TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable)); - if (tableDesc.isView() && - tableDesc.getSourceType()!=ISourceAware.ID_JDBC) { - TableDesc materializedTbl = new TableDesc(); - materializedTbl.setDatabase(config.getHiveDatabaseForIntermediateTable()); - materializedTbl.setName(tableDesc.getMaterializedName()); - inpTable = SourceFactory.createReadableTable(materializedTbl); - } else { - inpTable = SourceFactory.createReadableTable(tableDesc); - } + inpTable = SourceFactory.createReadableTable(tableDesc); } return inpTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index 387feb7..6dd20d8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -41,6 +41,7 @@ import com.google.common.collect.Lists; */ public class InMemCubeBuilderInputConverter { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class); public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index f9ee1b1..5ea2335 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -112,20 +112,12 @@ public class JoinedFlatTable { } public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { - return generateSelectDataStatement(flatDesc, false); - } - - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine) { - return generateSelectDataStatement(flatDesc, singleLine, null); + return generateSelectDataStatement(flatDesc, false, null); } public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine, String[] skipAs) { - String sep = "\n"; - if (singleLine) sep=" "; - List<String> skipAsList = new ArrayList<String>(); - if (skipAs!=null){ - skipAsList = Arrays.asList(skipAs); - } + final String sep = singleLine ? " " : "\n"; + final List<String> skipAsList = (skipAs == null) ? new ArrayList<String>() : Arrays.asList(skipAs); StringBuilder sql = new StringBuilder(); sql.append("SELECT" + sep); @@ -136,9 +128,9 @@ public class JoinedFlatTable { sql.append(","); } String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName()); - if (skipAsList.contains(colTotalName)){ + if (skipAsList.contains(colTotalName)) { sql.append(col.getExpressionInSourceDB() + sep); - }else{ + } else { sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); } } @@ -157,8 +149,7 @@ public class JoinedFlatTable { } private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { - String sep="\n"; - if (singleLine) sep=" "; + final String sep = singleLine ? " " : "\n"; Set<TableRef> dimTableCache = new HashSet<>(); DataModelDesc model = flatDesc.getDataModel(); @@ -207,9 +198,9 @@ public class JoinedFlatTable { private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { appendWhereStatement(flatDesc, sql, false); } + private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { - String sep="\n"; - if (singleLine) sep=" "; + final String sep = singleLine ? " " : "\n"; boolean hasCondition = false; StringBuilder whereBuilder = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java index 50ca773..7ab1bca 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java @@ -24,7 +24,6 @@ public interface ISourceAware { public static final int ID_STREAMING = 1; public static final int ID_SPARKSQL = 5; public static final int ID_EXTERNAL = 7; - public static final int ID_JDBC = 8; int getSourceType(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-metadata/src/main/java/org/apache/kylin/source/ISampleDataDeployer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISampleDataDeployer.java b/core-metadata/src/main/java/org/apache/kylin/source/ISampleDataDeployer.java new file mode 100644 index 0000000..37914f5 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISampleDataDeployer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source; + +import org.apache.kylin.metadata.model.TableDesc; + +/** + * Responsible for deploying sample (CSV) data to the source database. + */ +public interface ISampleDataDeployer { + + /** + * Create a new database (or schema) if not exists. + */ + void createSampleDatabase(String database) throws Exception; + + /** + * Create a new table if not exists. + */ + void createSampleTable(TableDesc table) throws Exception; + + /** + * Overwrite sample CSV data into a table. + */ + void loadSampleData(String tableName, String tmpDataDir) throws Exception; + + /** + * Create a view that wraps over a table, like "create view VIEW_NAME as select * from TABLE_NAME" + */ + void createWrapperView(String origTableName, String viewName) throws Exception; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-metadata/src/main/java/org/apache/kylin/source/ISource.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java index 302c53c..42548ae 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/ISource.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/ISource.java @@ -47,4 +47,10 @@ public interface ISource { * Particularly, Kafka source use this chance to define start/end offsets within each partition. */ SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition); + + /** + * Return an object that is responsible for deploying sample (CSV) data to the source database. + * For testing purpose. + */ + ISampleDataDeployer getSampleDataDeployer(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java index 07bbab0..6eb0e71 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java @@ -53,7 +53,6 @@ import com.google.common.base.Preconditions; public class ModelDataGenerator { private static final Logger logger = LoggerFactory.getLogger(ModelDataGenerator.class); - private MetadataManager mdMgr;// final private DataModelDesc model; final private int targetRows; final private ResourceStore outputStore; @@ -61,11 +60,6 @@ public class ModelDataGenerator { boolean outprint = false; // for debug - public ModelDataGenerator(MetadataManager mdMgr, String modelName, int nRows){ - this(mdMgr.getDataModelDesc(modelName), nRows); - this.mdMgr = mdMgr; - } - public ModelDataGenerator(DataModelDesc model, int nRows) { this(model, nRows, ResourceStore.getStore(model.getConfig())); } @@ -86,7 +80,7 @@ public class ModelDataGenerator { Set<TableDesc> allTableDesc = new LinkedHashSet<>(); JoinTableDesc[] allTables = model.getJoinTables(); - for (int i = allTables.length - 1; i >= -1; i--) {//reverse order needed for FK generation + for (int i = allTables.length - 1; i >= -1; i--) { // reverse order needed for FK generation TableDesc table = (i == -1) ? model.getRootFactTable().getTableDesc() : allTables[i].getTableRef().getTableDesc(); allTableDesc.add(table); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index bd563a2..4ff303d 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -103,8 +103,12 @@ public class BuildCubeWithEngine { } public static void beforeClass() throws Exception { - logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); - ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); + beforeClass(HBaseMetadataTestCase.SANDBOX_TEST_DATA); + } + + public static void beforeClass(String confDir) throws Exception { + logger.info("Adding to classpath: " + new File(confDir).getAbsolutePath()); + ClassUtil.addClasspath(new File(confDir).getAbsolutePath()); String fastModeStr = System.getProperty("fastBuildMode"); if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { @@ -121,14 +125,14 @@ public class BuildCubeWithEngine { engineType = 2; } - System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); + System.setProperty(KylinConfig.KYLIN_CONF, confDir); System.setProperty("SPARK_HOME", "/usr/local/spark"); // need manually create and put spark to this folder on Jenkins - System.setProperty("kylin.hadoop.conf.dir", HBaseMetadataTestCase.SANDBOX_TEST_DATA); + System.setProperty("kylin.hadoop.conf.dir", confDir); if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169"); } - HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); + HBaseMetadataTestCase.staticCreateTestMetadata(confDir); try { //check hdfs permission http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index 81a2ec2..ee693c5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -30,7 +30,6 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.kylin.common.util.DBUtils; -import org.apache.kylin.metadata.model.TableDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -221,24 +220,4 @@ public class BeelineHiveClient implements IHiveClient { System.out.println(hiveTableMeta); loader.close(); } - - @Override - public String generateCreateSchemaSql(String schemaName) { - throw new UnsupportedOperationException(); - } - - @Override - public String generateLoadDataSql(String tableName, String tableFileDir) { - throw new UnsupportedOperationException(); - } - - @Override - public String[] generateCreateTableSql(TableDesc tableDesc) { - throw new UnsupportedOperationException(); - } - - @Override - public String[] generateCreateViewSql(String viewName, String tableName) { - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index f8e4c29..e8a93bd 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -33,8 +33,6 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; import com.google.common.collect.Lists; @@ -52,59 +50,6 @@ public class CLIHiveClient implements IHiveClient { hiveConf = new HiveConf(CLIHiveClient.class); } - private static String getHiveDataType(String javaDataType) { - String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; - hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; - - return hiveDataType.toLowerCase(); - } - - @Override - public String generateCreateSchemaSql(String schemaName){ - return String.format("CREATE DATABASE IF NOT EXISTS %s", schemaName); - } - - @Override - public String generateLoadDataSql(String tableName, String tableFileDir) { - return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName + ".csv' OVERWRITE INTO TABLE " + tableName; - } - - @Override - public String[] generateCreateTableSql(TableDesc tableDesc) { - - String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); - String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity(); - - StringBuilder ddl = new StringBuilder(); - ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); - ddl.append("(" + "\n"); - - for (int i = 0; i < tableDesc.getColumns().length; i++) { - ColumnDesc col = tableDesc.getColumns()[i]; - if (i > 0) { - ddl.append(","); - } - ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n"); - } - - ddl.append(")" + "\n"); - ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); - ddl.append("STORED AS TEXTFILE"); - - return new String[] { dropsql, dropsql2, ddl.toString() }; - } - - @Override - public String[] generateCreateViewSql(String viewName, String tableName) { - - String dropView = "DROP VIEW IF EXISTS " + viewName; - String dropTable = "DROP TABLE IF EXISTS " + viewName; - - String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); - - return new String[] { dropView, dropTable, createSql }; - } - /** * only used by Deploy Util */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java deleted file mode 100644 index a38e0d9..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.hive; - -import java.io.IOException; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.job.common.PatternedLogger; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class CmdStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(CmdStep.class); - private final PatternedLogger stepLogger = new PatternedLogger(logger); - - public void setCmd(String cmd) { - setParam("cmd", cmd); - } - - public CmdStep(){ - } - - protected void sqoopFlatHiveTable(KylinConfig config) throws IOException { - String cmd = getParam("cmd"); - logger.info(String.format("exe cmd:%s", cmd)); - Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); - getManager().addJobInfo(getId(), stepLogger.getInfo()); - if (response.getFirst() != 0) { - throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); - } - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - try { - sqoopFlatHiveTable(config); - return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); - - } catch (Exception e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java index 0e0d58b..fd9bfa9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java @@ -21,57 +21,65 @@ package org.apache.kylin.source.hive; import org.apache.commons.configuration.PropertiesConfiguration; public class DBConnConf { - public static final String KEY_DRIVER="driver"; - public static final String KEY_URL="url"; - public static final String KEY_USER="user"; - public static final String KEY_PASS="pass"; - + public static final String KEY_DRIVER = "driver"; + public static final String KEY_URL = "url"; + public static final String KEY_USER = "user"; + public static final String KEY_PASS = "pass"; + private String driver; private String url; private String user; private String pass; - - public DBConnConf(){ + + public DBConnConf() { } - - public DBConnConf(String prefix, PropertiesConfiguration pc){ + + public DBConnConf(String prefix, PropertiesConfiguration pc) { driver = pc.getString(prefix + KEY_DRIVER); url = pc.getString(prefix + KEY_URL); user = pc.getString(prefix + KEY_USER); pass = pc.getString(prefix + KEY_PASS); } - - public DBConnConf(String driver, String url, String user, String pass){ + + public DBConnConf(String driver, String url, String user, String pass) { this.driver = driver; this.url = url; this.user = user; this.pass = pass; } - - public String toString(){ + + public String toString() { return String.format("%s,%s,%s,%s", driver, url, user, pass); } + public String getDriver() { return driver; } + public void setDriver(String driver) { this.driver = driver; } + public String getUrl() { return url; } + public void setUrl(String url) { this.url = url; } + public String getUser() { return user; } + public void setUser(String user) { this.user = user; } + public String getPass() { return pass; } + public void setPass(String pass) { this.pass = pass; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java index 5f8d536..4687973 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java @@ -19,22 +19,9 @@ package org.apache.kylin.source.hive; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.ISourceAware; public class HiveClientFactory { - public static IJDBCExecutor getJDBCExector(String modelName) { - MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - DataModelDesc model = mgr.getDataModelDesc(modelName); - if (model.getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ - return new JdbcExplorer(); - }else{ - return getHiveClient(); - } - } - public static IHiveClient getHiveClient() { if ("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { return new CLIHiveClient(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java deleted file mode 100644 index ee6fa1e..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.source.hive; - -import java.io.IOException; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HiveCmdBuilder; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.job.common.PatternedLogger; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class HiveCmdStep extends AbstractExecutable { - - private static final Logger logger = LoggerFactory.getLogger(HiveCmdStep.class); - private final PatternedLogger stepLogger = new PatternedLogger(logger); - - protected void createFlatHiveTable(KylinConfig config) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); - hiveCmdBuilder.addStatement(getCmd()); - final String cmd = hiveCmdBuilder.toString(); - - stepLogger.log("cmd: "); - stepLogger.log(cmd); - - Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); - getManager().addJobInfo(getId(), stepLogger.getInfo()); - if (response.getFirst() != 0) { - throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); - } - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - try { - createFlatHiveTable(config); - return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); - - } catch (Exception e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); - } - } - - public void setCmd(String sql) { - setParam("cmd", sql); - } - - public String getCmd() { - return getParam("cmd"); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 1a7ddd6..337bafd 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -43,7 +43,6 @@ import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@ -52,7 +51,6 @@ import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; -import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; @@ -62,6 +60,7 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class); public static String getTableNameForHCat(TableDesc table) { @@ -124,9 +123,9 @@ public class HiveMRInput implements IMRInput { public static class BatchCubingInputSide implements IMRBatchCubingInputSide { - final IJoinedFlatTableDesc flatDesc; - final String flatTableDatabase; - final String hdfsWorkingDir; + final protected IJoinedFlatTableDesc flatDesc; + final protected String flatTableDatabase; + final protected String hdfsWorkingDir; String hiveViewIntermediateTables = ""; @@ -141,33 +140,39 @@ public class HiveMRInput implements IMRInput { public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); - JobEngineConfig conf = new JobEngineConfig(cubeConfig); - final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); - final String jobWorkingDir = getJobWorkingDir(jobFlow); - // create flat table first, then count and redistribute - if (flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ - jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName)); - jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir)); - }else{ - jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); - } - if (cubeConfig.isHiveRedistributeEnabled() == true) { + // create flat table first + addStepPhase1_DoCreateFlatTable(jobFlow); + + // then count and redistribute + if (cubeConfig.isHiveRedistributeEnabled()) { jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); } - logger.info(String.format("source aware:%d", flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType())); - if (flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ - logger.info(String.format("skip createLookupHiveViewMaterializationStep")); - }else{ - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); - if (task != null) { - jobFlow.addTask(task); - } + + // special for hive + addStepPhase1_DoMaterializeLookupTable(jobFlow); + } + + protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) { + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow); + + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); + } + + protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) { + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); + final String jobWorkingDir = getJobWorkingDir(jobFlow); + + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); + if (task != null) { + jobFlow.addTask(task); } } - private String getJobWorkingDir(DefaultChainedExecutable jobFlow) { + protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) { return JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId()); } @@ -222,37 +227,6 @@ public class HiveMRInput implements IMRInput { return step; } - private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) { - KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); - String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname - //using sqoop to extract data from jdbc source and dump them to hive - String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol}); - String hiveTable = flatDesc.getTableName(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); - String sqoopHome = config.getSqoopHome(); - String cmd= String.format(String.format("%s/sqoop import " - + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " - + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, - jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol)); - logger.info(String.format("sqoop cmd:%s", cmd)); - CmdStep step = new CmdStep(); - step.setCmd(cmd); - step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); - return step; - } - - private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) { - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE"); - - HiveCmdStep step = new HiveCmdStep(); - step.setCmd(hiveInitStatements + dropTableHql + createTableHql); - return step; - } - private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java index 0f7152b..0db4b40 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMetadataExplorer.java @@ -29,26 +29,26 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISourceMetadataExplorer; -public class HiveMetadataExplorer implements ISourceMetadataExplorer { +public class HiveMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + @Override public List<String> listDatabases() throws Exception { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); return hiveClient.getHiveDbNames(); } @Override public List<String> listTables(String database) throws Exception { - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); return hiveClient.getHiveTableNames(database); } @Override public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String tableName) { KylinConfig config = KylinConfig.getInstanceFromEnv(); - IHiveClient hiveClient = HiveClientFactory.getHiveClient(); MetadataManager metaMgr = MetadataManager.getInstance(config); HiveTableMeta hiveTableMeta; @@ -113,4 +113,74 @@ public class HiveMetadataExplorer implements ISourceMetadataExplorer { public List<String> getRelatedKylinResources(TableDesc table) { return Collections.emptyList(); } + + @Override + public void createSampleDatabase(String database) throws Exception { + hiveClient.executeHQL(generateCreateSchemaSql(database)); + } + + private String generateCreateSchemaSql(String schemaName){ + return String.format("CREATE DATABASE IF NOT EXISTS %s", schemaName); + } + + @Override + public void createSampleTable(TableDesc table) throws Exception { + hiveClient.executeHQL(generateCreateTableSql(table)); + } + + private String[] generateCreateTableSql(TableDesc tableDesc) { + + String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); + String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity(); + + StringBuilder ddl = new StringBuilder(); + ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); + ddl.append("(" + "\n"); + + for (int i = 0; i < tableDesc.getColumns().length; i++) { + ColumnDesc col = tableDesc.getColumns()[i]; + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n"); + } + + ddl.append(")" + "\n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); + ddl.append("STORED AS TEXTFILE"); + + return new String[] { dropsql, dropsql2, ddl.toString() }; + } + + @Override + public void loadSampleData(String tableName, String tmpDataDir) throws Exception { + hiveClient.executeHQL(generateLoadDataSql(tableName, tmpDataDir)); + } + + private String generateLoadDataSql(String tableName, String tableFileDir) { + return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName + ".csv' OVERWRITE INTO TABLE " + tableName; + } + + @Override + public void createWrapperView(String origTableName, String viewName) throws Exception { + hiveClient.executeHQL(generateCreateViewSql(viewName, origTableName)); + } + + private String[] generateCreateViewSql(String viewName, String tableName) { + + String dropView = "DROP VIEW IF EXISTS " + viewName; + String dropTable = "DROP TABLE IF EXISTS " + viewName; + + String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); + + return new String[] { dropView, dropTable, createSql }; + } + + private static String getHiveDataType(String javaDataType) { + String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; + hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; + + return hiveDataType.toLowerCase(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java index 77c8582..6e45406 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSource.java @@ -18,10 +18,12 @@ package org.apache.kylin.source.hive; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.metadata.model.IBuildable; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISampleDataDeployer; import org.apache.kylin.source.ISource; import org.apache.kylin.source.ISourceMetadataExplorer; import org.apache.kylin.source.SourcePartition; @@ -46,6 +48,16 @@ public class HiveSource implements ISource { @Override public IReadableTable createReadableTable(TableDesc tableDesc) { + // hive view must have been materialized already + // ref HiveMRInput.createLookupHiveViewMaterializationStep() + if (tableDesc.isView()) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String tableName = tableDesc.getMaterializedName(); + + tableDesc = new TableDesc(); + tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); + tableDesc.setName(tableName); + } return new HiveTable(tableDesc); } @@ -57,4 +69,9 @@ public class HiveSource implements ISource { return result; } + @Override + public ISampleDataDeployer getSampleDataDeployer() { + return new HiveMetadataExplorer(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java index 8d4034c..ca5312d 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java @@ -18,10 +18,17 @@ package org.apache.kylin.source.hive; +import java.io.IOException; import java.util.List; -public interface IHiveClient extends IJDBCExecutor{ +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +public interface IHiveClient { + + void executeHQL(String hql) throws CommandNeedRetryException, IOException; + + void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException; + HiveTableMeta getHiveTableMeta(String database, String tableName) throws Exception; List<String> getHiveDbNames() throws Exception; http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java deleted file mode 100644 index 521ea87..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.source.hive; - -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.kylin.metadata.model.TableDesc; - -import java.io.IOException; - -public interface IJDBCExecutor { - - void executeHQL(String hql) throws CommandNeedRetryException, IOException; - - void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException; - - public String generateCreateSchemaSql(String schemaName); - - public String generateLoadDataSql(String tableName, String tableFileDir); - - public String[] generateCreateTableSql(TableDesc tableDesc); - - public String[] generateCreateViewSql(String viewName, String tableName); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java deleted file mode 100644 index 193a37f..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import org.apache.hadoop.hive.ql.CommandNeedRetryException; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableExtDesc; -import org.apache.kylin.source.ISourceMetadataExplorer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class JdbcExplorer implements ISourceMetadataExplorer, IJDBCExecutor{ - private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); - - public static final String DIALECT_VERTICA="vertica"; - public static final String DIALECT_ORACLE="oracle"; - public static final String DIALECT_MYSQL="mysql"; - public static final String DIALECT_HIVE="hive"; - - public static final String TABLE_TYPE_TABLE="TABLE"; - public static final String TABLE_TYPE_VIEW="VIEW"; - - private KylinConfig config; - private DBConnConf dbconf; - private String dialect; - - public JdbcExplorer() { - config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); - dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); - } - - private String getSqlDataType(String javaDataType) { - if (DIALECT_VERTICA.equals(dialect)){ - if (javaDataType.toLowerCase().equals("double")){ - return "float"; - } - } - - return javaDataType.toLowerCase(); - } - - @Override - public String generateCreateSchemaSql(String schemaName){ - if (DIALECT_VERTICA.equals(dialect)){ - return String.format("CREATE schema IF NOT EXISTS %s", schemaName); - }else{ - logger.error(String.format("unsupported dialect %s.", dialect)); - return null; - } - } - - @Override - public String generateLoadDataSql(String tableName, String tableFileDir) { - if (DIALECT_VERTICA.equals(dialect)){ - return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); - }else{ - logger.error(String.format("unsupported dialect %s.", dialect)); - return null; - } - } - - @Override - public String[] generateCreateTableSql(TableDesc tableDesc) { - logger.info(String.format("gen create table sql:%s", tableDesc)); - String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase(); - String dropsql = "DROP TABLE IF EXISTS " + tableIdentity; - String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity; - - StringBuilder ddl = new StringBuilder(); - ddl.append("CREATE TABLE " + tableIdentity + "\n"); - ddl.append("(" + "\n"); - - for (int i = 0; i < tableDesc.getColumns().length; i++) { - ColumnDesc col = tableDesc.getColumns()[i]; - if (i > 0) { - ddl.append(","); - } - ddl.append(col.getName() + " " + getSqlDataType((col.getDatatype())) + "\n"); - } - - ddl.append(")"); - - return new String[] { dropsql, dropsql2, ddl.toString() }; - } - - @Override - public String[] generateCreateViewSql(String viewName, String tableName) { - - String dropView = "DROP VIEW IF EXISTS " + viewName; - String dropTable = "DROP TABLE IF EXISTS " + viewName; - - String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); - - return new String[] { dropView, dropTable, createSql }; - } - - @Override - public void executeHQL(String sql) throws CommandNeedRetryException, IOException { - Connection con = SqlUtil.getConnection(dbconf); - logger.info(String.format(sql)); - SqlUtil.execUpdateSQL(con, sql); - SqlUtil.closeResources(con, null); - } - - @Override - public void executeHQL(String[] sqls) throws CommandNeedRetryException, IOException { - Connection con = SqlUtil.getConnection(dbconf); - for (String sql : sqls){ - logger.info(String.format(sql)); - SqlUtil.execUpdateSQL(con, sql); - } - SqlUtil.closeResources(con, null); - } - - @Override - public List<String> listDatabases() throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getSchemas(); - List<String> ret = new ArrayList<String>(); - /* - The schema columns are: - - TABLE_SCHEM String => schema name - - TABLE_CATALOG String => catalog name (may be null) - */ - while (rs.next()){ - String schema = rs.getString(1); - String catalog = rs.getString(2); - logger.info(String.format("%s,%s", schema, catalog)); - ret.add(schema); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public List<String> listTables(String database) throws Exception { - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, null, null); - List<String> ret = new ArrayList<String>(); - /* - - TABLE_CAT String => table catalog (may be null) - - TABLE_SCHEM String => table schema (may be null) - - TABLE_NAME String => table name - - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL - TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". - - REMARKS String => explanatory comment on the table - - TYPE_CAT String => the types catalog (may be null) - - TYPE_SCHEM String => the types schema (may be null) - - TYPE_NAME String => type name (may be null) - - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed - table (may be null) - - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. - Values are "SYSTEM", "USER", "DERIVED". (may be null) - */ - while (rs.next()){ - String catalog = rs.getString(1); - String schema = rs.getString(2); - String name = rs.getString(3); - String type = rs.getString(4); - logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type)); - ret.add(name); - } - SqlUtil.closeResources(con, null); - return ret; - } - - @Override - public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table) throws Exception { - - TableDesc tableDesc = new TableDesc(); - tableDesc.setDatabase(database.toUpperCase()); - tableDesc.setName(table.toUpperCase()); - tableDesc.setUuid(UUID.randomUUID().toString()); - tableDesc.setLastModified(0); - - Connection con = SqlUtil.getConnection(dbconf); - DatabaseMetaData dbmd = con.getMetaData(); - ResultSet rs = dbmd.getTables(null, database, table, null); - String tableType=null; - while (rs.next()){ - tableType = rs.getString(4); - } - if (tableType!=null){ - tableDesc.setTableType(tableType); - }else{ - logger.error(String.format("table %s not found in schema:%s", table, database)); - } - /* - - 1. TABLE_CAT String => table catalog (may be null) - - 2. TABLE_SCHEM String => table schema (may be null) - - 3. TABLE_NAME String => table name - - 4. COLUMN_NAME String => column name - - 5. DATA_TYPE int => SQL type from java.sql.Types - - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified - - 7. COLUMN_SIZE int => column size. - - 8. BUFFER_LENGTH is not used. - - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. - - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) - - 11.NULLABLE int => is NULL allowed. - - columnNoNulls - might not allow NULL values - - columnNullable - definitely allows NULL values - - columnNullableUnknown - nullability unknown - - 12.REMARKS String => comment describing column (may be null) - - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) - - 14.SQL_DATA_TYPE int => unused - - 15.SQL_DATETIME_SUB int => unused - - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column - - 17.ORDINAL_POSITION int => index of column in table (starting at 1) - - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. - - YES --- if the column can include NULLs - - NO --- if the column cannot include NULLs - - empty string --- if the nullability for the column is unknown - */ - List<ColumnDesc> columns = new ArrayList<ColumnDesc>(); - rs = dbmd.getColumns(null, database, table, null); - while (rs.next()){ - String tname = rs.getString(3); - String cname = rs.getString(4); - int type=rs.getInt(5); - String typeName=rs.getString(6); - int csize=rs.getInt(7); - int digits = rs.getInt(9); - int nullable = rs.getInt(11); - String comment = rs.getString(12); - int pos = rs.getInt(17); - logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos)); - - ColumnDesc cdesc = new ColumnDesc(); - cdesc.setName(cname.toUpperCase()); - // use "double" in kylin for "float" - cdesc.setDatatype(typeName); - cdesc.setId(String.valueOf(pos)); - columns.add(cdesc); - } - - - tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); - - TableExtDesc tableExtDesc = new TableExtDesc(); - tableExtDesc.setName(table); - tableExtDesc.setUuid(UUID.randomUUID().toString()); - tableExtDesc.setLastModified(0); - tableExtDesc.init(); - - return Pair.newPair(tableDesc, tableExtDesc); - } - - @Override - public List<String> getRelatedKylinResources(TableDesc table) { - return Collections.emptyList(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java deleted file mode 100644 index cc9cea8..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import org.apache.kylin.engine.mr.IMRInput; -import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.IReadableTable; -import org.apache.kylin.source.ISource; -import org.apache.kylin.source.ISourceMetadataExplorer; -import org.apache.kylin.source.SourcePartition; - -//used by reflection -public class JdbcSource implements ISource { - - @Override - public ISourceMetadataExplorer getSourceMetadataExplorer() { - return new JdbcExplorer(); - } - - @SuppressWarnings("unchecked") - @Override - public <I> I adaptToBuildEngine(Class<I> engineInterface) { - if (engineInterface == IMRInput.class) { - return (I) new HiveMRInput(); - } else { - throw new RuntimeException("Cannot adapt to " + engineInterface); - } - } - - @Override - public IReadableTable createReadableTable(TableDesc tableDesc) { - return new JdbcTable(tableDesc); - } - - @Override - public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { - SourcePartition result = SourcePartition.getCopyOf(srcPartition); - result.setStartOffset(0); - result.setEndOffset(0); - return result; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java deleted file mode 100644 index 0871938..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.io.IOException; - - -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.IReadableTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - */ -public class JdbcTable implements IReadableTable { - - private static final Logger logger = LoggerFactory.getLogger(JdbcTable.class); - - final private String database; - final private String tableName; - - - public JdbcTable(TableDesc tableDesc) { - this.database = tableDesc.getDatabase(); - this.tableName = tableDesc.getName(); - } - - @Override - public TableReader getReader() throws IOException { - return new JdbcTableReader(database, tableName); - } - - @Override - public TableSignature getSignature() throws IOException { - String path = String.format("%s.%s", database, tableName); - long lastModified = System.currentTimeMillis(); // assume table is ever changing - int size=0; - return new TableSignature(path, size, lastModified); - } - - @Override - public boolean exists() { - return true; - } - - @Override - public String toString() { - return "database=[" + database + "], table=[" + tableName + "]"; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e1c9be4/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java deleted file mode 100644 index 8c70d61..0000000 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.source.hive; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.source.IReadableTable.TableReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of TableReader with HCatalog for Hive table. - */ -public class JdbcTableReader implements TableReader { - private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class); - - private String dbName; - private String tableName; - - private DBConnConf dbconf; - private String dialect; - private Connection jdbcCon; - private Statement statement; - private ResultSet rs; - private int colCount; - - /** - * Constructor for reading whole hive table - * @param dbName - * @param tableName - * @throws IOException - */ - public JdbcTableReader(String dbName, String tableName) throws IOException { - this.dbName = dbName; - this.tableName = tableName; - KylinConfig config = KylinConfig.getInstanceFromEnv(); - String connectionUrl = config.getJdbcConnectionUrl(); - String driverClass = config.getJdbcDriver(); - String jdbcUser = config.getJdbcUser(); - String jdbcPass = config.getJdbcPass(); - dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); - this.dialect = config.getJdbcDialect(); - jdbcCon = SqlUtil.getConnection(dbconf); - String sql = String.format("select * from %s.%s", dbName, tableName); - try { - statement = jdbcCon.createStatement(); - rs = statement.executeQuery(sql); - colCount = rs.getMetaData().getColumnCount(); - }catch(SQLException e){ - throw new IOException(String.format("error while exec %s", sql), e); - } - - } - - @Override - public boolean next() throws IOException { - try { - return rs.next(); - } catch (SQLException e) { - throw new IOException(e); - } - } - - @Override - public String[] getRow() { - String[] ret = new String[colCount]; - for (int i=1; i<=colCount; i++){ - try { - Object o = rs.getObject(i); - ret[i-1] = (o == null? null:o.toString()); - }catch(Exception e){ - logger.error("", e); - } - } - return ret; - } - - @Override - public void close() throws IOException { - SqlUtil.closeResources(jdbcCon, statement); - } - - public String toString() { - return "jdbc table reader for: " + dbName + "." + tableName; - } -}