Repository: kylin Updated Branches: refs/heads/KYLIN-1351 [created] 5e1c9be45
KYLIN-1351 Support general RDBMS data source through JDBC Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c7f32b83 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c7f32b83 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c7f32b83 Branch: refs/heads/KYLIN-1351 Commit: c7f32b83a1c114014a30d295117dcba1b1dc7e81 Parents: c2b2cea Author: chengyi <phillipchen...@gmail.com> Authored: Mon Jun 5 15:49:37 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Tue Jun 6 16:36:43 2017 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 28 ++ .../java/org/apache/kylin/cube/CubeManager.java | 4 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 5 +- .../InMemCubeBuilderInputConverter.java | 4 + .../org/apache/kylin/job/JoinedFlatTable.java | 61 +++- .../kylin/metadata/model/ISourceAware.java | 2 + .../source/datagen/ModelDataGenerator.java | 13 +- .../kylin/source/hive/BeelineHiveClient.java | 23 +- .../apache/kylin/source/hive/CLIHiveClient.java | 55 ++++ .../org/apache/kylin/source/hive/CmdStep.java | 69 +++++ .../apache/kylin/source/hive/DBConnConf.java | 78 +++++ .../kylin/source/hive/HiveClientFactory.java | 14 + .../apache/kylin/source/hive/HiveCmdStep.java | 77 +++++ .../apache/kylin/source/hive/HiveMRInput.java | 53 +++- .../apache/kylin/source/hive/IHiveClient.java | 8 +- .../apache/kylin/source/hive/IJDBCExecutor.java | 39 +++ .../apache/kylin/source/hive/JdbcExplorer.java | 289 +++++++++++++++++++ .../apache/kylin/source/hive/JdbcSource.java | 60 ++++ .../org/apache/kylin/source/hive/JdbcTable.java | 67 +++++ .../kylin/source/hive/JdbcTableReader.java | 106 +++++++ .../org/apache/kylin/source/hive/SqlUtil.java | 106 +++++++ 21 files changed, 1133 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index c83c546..a776a61 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -588,6 +588,34 @@ abstract public class KylinConfigBase implements Serializable { public Map<String, String> getKafkaConfigOverride() { return getPropertiesByPrefix("kylin.source.kafka.config-override."); } + + // ============================================================================ + // SOURCE.JDBC + // ============================================================================ + + public String getJdbcConnectionUrl() { + return getOptional("kylin.source.jdbc.connectionUrl"); + } + + public String getJdbcDriver() { + return getOptional("kylin.source.jdbc.driver"); + } + + public String getJdbcDialect() { + return getOptional("kylin.source.jdbc.dialect"); + } + + public String getJdbcUser() { + return getOptional("kylin.source.jdbc.user"); + } + + public String getJdbcPass() { + return getOptional("kylin.source.jdbc.pass"); + } + + public String getSqoopHome() { + return getOptional("kylin.source.sqoop.home"); + } // ============================================================================ // STORAGE.HBASE http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 32e2316..c348055 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -54,6 +54,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.cachesync.Broadcaster; import org.apache.kylin.metadata.cachesync.Broadcaster.Event; import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; @@ -282,7 +283,8 @@ public class CubeManager implements IRealizationProvider { SnapshotManager snapshotMgr = getSnapshotManager(); TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable)); - if (tableDesc.isView()) { + if (tableDesc.isView() && + tableDesc.getSourceType()!=ISourceAware.ID_JDBC) { String tableName = tableDesc.getMaterializedName(); tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); tableDesc.setName(tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index e7368e8..9555bc3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -32,6 +32,7 @@ import org.apache.kylin.dict.DictionaryProvider; import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TableRef; @@ -111,7 +112,8 @@ public class DictionaryGeneratorCLI { } else { MetadataManager metadataManager = MetadataManager.getInstance(config); TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable)); - if (tableDesc.isView()) { + if (tableDesc.isView() && + tableDesc.getSourceType()!=ISourceAware.ID_JDBC) { TableDesc materializedTbl = new TableDesc(); materializedTbl.setDatabase(config.getHiveDatabaseForIntermediateTable()); materializedTbl.setName(tableDesc.getMaterializedName()); @@ -120,7 +122,6 @@ public class DictionaryGeneratorCLI { inpTable = SourceFactory.createReadableTable(tableDesc); } } - return inpTable; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index ab44f63..387feb7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -32,6 +32,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; @@ -39,6 +41,8 @@ import com.google.common.collect.Lists; */ public class InMemCubeBuilderInputConverter { + private static final Logger logger = LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class); + public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); private final CubeJoinedFlatTableEnrich flatDesc; http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 4665465..f9ee1b1 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -19,7 +19,10 @@ package org.apache.kylin.job; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.kylin.common.KylinConfig; @@ -51,6 +54,10 @@ public class JoinedFlatTable { } public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { + return generateCreateTableStatement(flatDesc, storageDfsDir, "SEQUENCEFILE"); + } + + public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc, String storageDfsDir, String format) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + flatDesc.getTableName() + "\n"); @@ -64,7 +71,10 @@ public class JoinedFlatTable { ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) + "\n"); } ddl.append(")" + "\n"); - ddl.append("STORED AS SEQUENCEFILE" + "\n"); + if ("TEXTFILE".equals(format)){ + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); + } + ddl.append("STORED AS " + format + "\n"); ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + "';").append("\n"); return ddl.toString(); } @@ -102,17 +112,38 @@ public class JoinedFlatTable { } public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { + return generateSelectDataStatement(flatDesc, false); + } + + public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine) { + return generateSelectDataStatement(flatDesc, singleLine, null); + } + + public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean singleLine, String[] skipAs) { + String sep = "\n"; + if (singleLine) sep=" "; + List<String> skipAsList = new ArrayList<String>(); + if (skipAs!=null){ + skipAsList = Arrays.asList(skipAs); + } + StringBuilder sql = new StringBuilder(); - sql.append("SELECT" + "\n"); + sql.append("SELECT" + sep); + for (int i = 0; i < flatDesc.getAllColumns().size(); i++) { TblColRef col = flatDesc.getAllColumns().get(i); if (i > 0) { sql.append(","); } - sql.append(col.getExpressionInSourceDB() + "\n"); + String colTotalName = String.format("%s.%s", col.getTableRef().getTableName(), col.getName()); + if (skipAsList.contains(colTotalName)){ + sql.append(col.getExpressionInSourceDB() + sep); + }else{ + sql.append(col.getExpressionInSourceDB() + " as " + colName(col) + sep); + } } - appendJoinStatement(flatDesc, sql); - appendWhereStatement(flatDesc, sql); + appendJoinStatement(flatDesc, sql, singleLine); + appendWhereStatement(flatDesc, sql, singleLine); return sql.toString(); } @@ -124,13 +155,15 @@ public class JoinedFlatTable { appendWhereStatement(flatDesc, sql); return sql.toString(); } - - private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { + + private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { + String sep="\n"; + if (singleLine) sep=" "; Set<TableRef> dimTableCache = new HashSet<>(); DataModelDesc model = flatDesc.getDataModel(); TableRef rootTable = model.getRootFactTable(); - sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " \n"); + sql.append("FROM " + rootTable.getTableIdentity() + " as " + rootTable.getAlias() + " " + sep); for (JoinTableDesc lookupDesc : model.getJoinTables()) { JoinDesc join = lookupDesc.getJoin(); @@ -143,7 +176,7 @@ public class JoinedFlatTable { if (pk.length != fk.length) { throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc); } - sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + "\n"); + sql.append(joinType + " JOIN " + dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep); sql.append("ON "); for (int i = 0; i < pk.length; i++) { if (i > 0) { @@ -151,7 +184,7 @@ public class JoinedFlatTable { } sql.append(fk[i].getIdentity() + " = " + pk[i].getIdentity()); } - sql.append("\n"); + sql.append(sep); dimTableCache.add(dimTable); } @@ -172,6 +205,12 @@ public class JoinedFlatTable { } private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { + appendWhereStatement(flatDesc, sql, false); + } + private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, boolean singleLine) { + String sep="\n"; + if (singleLine) sep=" "; + boolean hasCondition = false; StringBuilder whereBuilder = new StringBuilder(); whereBuilder.append("WHERE"); @@ -192,7 +231,7 @@ public class JoinedFlatTable { if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) { whereBuilder.append(hasCondition ? " AND (" : " ("); whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd)); - whereBuilder.append(")\n"); + whereBuilder.append(")" + sep); hasCondition = true; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java index 0f98d5d..50ca773 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java @@ -24,6 +24,8 @@ public interface ISourceAware { public static final int ID_STREAMING = 1; public static final int ID_SPARKSQL = 5; public static final int ID_EXTERNAL = 7; + + public static final int ID_JDBC = 8; int getSourceType(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java index 3caf2f4..07bbab0 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java @@ -46,11 +46,14 @@ import org.apache.kylin.metadata.model.JoinDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public class ModelDataGenerator { - + private static final Logger logger = LoggerFactory.getLogger(ModelDataGenerator.class); + private MetadataManager mdMgr;// final private DataModelDesc model; final private int targetRows; final private ResourceStore outputStore; @@ -58,6 +61,11 @@ public class ModelDataGenerator { boolean outprint = false; // for debug + public ModelDataGenerator(MetadataManager mdMgr, String modelName, int nRows){ + this(mdMgr.getDataModelDesc(modelName), nRows); + this.mdMgr = mdMgr; + } + public ModelDataGenerator(DataModelDesc model, int nRows) { this(model, nRows, ResourceStore.getStore(model.getConfig())); } @@ -78,13 +86,14 @@ public class ModelDataGenerator { Set<TableDesc> allTableDesc = new LinkedHashSet<>(); JoinTableDesc[] allTables = model.getJoinTables(); - for (int i = allTables.length - 1; i >= -1; i--) { + for (int i = allTables.length - 1; i >= -1; i--) {//reverse order needed for FK generation TableDesc table = (i == -1) ? model.getRootFactTable().getTableDesc() : allTables[i].getTableRef().getTableDesc(); allTableDesc.add(table); if (generated.contains(table)) continue; + logger.info(String.format("generating data for %s", table)); boolean gen = generateTable(table); if (gen) http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index 468ccb1..81a2ec2 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.kylin.common.util.DBUtils; +import org.apache.kylin.metadata.model.TableDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -115,7 +116,7 @@ public class BeelineHiveClient implements IHiveClient { } return count; } - + @Override public void executeHQL(String hql) throws CommandNeedRetryException, IOException { throw new UnsupportedOperationException(); @@ -220,4 +221,24 @@ public class BeelineHiveClient implements IHiveClient { System.out.println(hiveTableMeta); loader.close(); } + + @Override + public String generateCreateSchemaSql(String schemaName) { + throw new UnsupportedOperationException(); + } + + @Override + public String generateLoadDataSql(String tableName, String tableFileDir) { + throw new UnsupportedOperationException(); + } + + @Override + public String[] generateCreateTableSql(TableDesc tableDesc) { + throw new UnsupportedOperationException(); + } + + @Override + public String[] generateCreateViewSql(String viewName, String tableName) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index e8a93bd..f8e4c29 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; import com.google.common.collect.Lists; @@ -50,6 +52,59 @@ public class CLIHiveClient implements IHiveClient { hiveConf = new HiveConf(CLIHiveClient.class); } + private static String getHiveDataType(String javaDataType) { + String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType; + hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType; + + return hiveDataType.toLowerCase(); + } + + @Override + public String generateCreateSchemaSql(String schemaName){ + return String.format("CREATE DATABASE IF NOT EXISTS %s", schemaName); + } + + @Override + public String generateLoadDataSql(String tableName, String tableFileDir) { + return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName + ".csv' OVERWRITE INTO TABLE " + tableName; + } + + @Override + public String[] generateCreateTableSql(TableDesc tableDesc) { + + String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity(); + String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity(); + + StringBuilder ddl = new StringBuilder(); + ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n"); + ddl.append("(" + "\n"); + + for (int i = 0; i < tableDesc.getColumns().length; i++) { + ColumnDesc col = tableDesc.getColumns()[i]; + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + getHiveDataType((col.getDatatype())) + "\n"); + } + + ddl.append(")" + "\n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n"); + ddl.append("STORED AS TEXTFILE"); + + return new String[] { dropsql, dropsql2, ddl.toString() }; + } + + @Override + public String[] generateCreateViewSql(String viewName, String tableName) { + + String dropView = "DROP VIEW IF EXISTS " + viewName; + String dropTable = "DROP TABLE IF EXISTS " + viewName; + + String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); + + return new String[] { dropView, dropTable, createSql }; + } + /** * only used by Deploy Util */ http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java new file mode 100644 index 0000000..a38e0d9 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive; + +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class CmdStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(CmdStep.class); + private final PatternedLogger stepLogger = new PatternedLogger(logger); + + public void setCmd(String cmd) { + setParam("cmd", cmd); + } + + public CmdStep(){ + } + + protected void sqoopFlatHiveTable(KylinConfig config) throws IOException { + String cmd = getParam("cmd"); + logger.info(String.format("exe cmd:%s", cmd)); + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); + } + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + try { + sqoopFlatHiveTable(config); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java new file mode 100644 index 0000000..0e0d58b --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import org.apache.commons.configuration.PropertiesConfiguration; + +public class DBConnConf { + public static final String KEY_DRIVER="driver"; + public static final String KEY_URL="url"; + public static final String KEY_USER="user"; + public static final String KEY_PASS="pass"; + + private String driver; + private String url; + private String user; + private String pass; + + public DBConnConf(){ + } + + public DBConnConf(String prefix, PropertiesConfiguration pc){ + driver = pc.getString(prefix + KEY_DRIVER); + url = pc.getString(prefix + KEY_URL); + user = pc.getString(prefix + KEY_USER); + pass = pc.getString(prefix + KEY_PASS); + } + + public DBConnConf(String driver, String url, String user, String pass){ + this.driver = driver; + this.url = url; + this.user = user; + this.pass = pass; + } + + public String toString(){ + return String.format("%s,%s,%s,%s", driver, url, user, pass); + } + public String getDriver() { + return driver; + } + public void setDriver(String driver) { + this.driver = driver; + } + public String getUrl() { + return url; + } + public void setUrl(String url) { + this.url = url; + } + public String getUser() { + return user; + } + public void setUser(String user) { + this.user = user; + } + public String getPass() { + return pass; + } + public void setPass(String pass) { + this.pass = pass; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java index 8c883af..5f8d536 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java @@ -19,8 +19,22 @@ package org.apache.kylin.source.hive; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.ISourceAware; public class HiveClientFactory { + + public static IJDBCExecutor getJDBCExector(String modelName) { + MetadataManager mgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + DataModelDesc model = mgr.getDataModelDesc(modelName); + if (model.getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ + return new JdbcExplorer(); + }else{ + return getHiveClient(); + } + } + public static IHiveClient getHiveClient() { if ("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) { return new CLIHiveClient(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java new file mode 100644 index 0000000..ee6fa1e --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive; + +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveCmdBuilder; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.job.common.PatternedLogger; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class HiveCmdStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(HiveCmdStep.class); + private final PatternedLogger stepLogger = new PatternedLogger(logger); + + protected void createFlatHiveTable(KylinConfig config) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride()); + hiveCmdBuilder.addStatement(getCmd()); + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); + } + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + try { + createFlatHiveTable(config); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + public void setCmd(String sql) { + setParam("cmd", sql); + } + + public String getCmd() { + return getParam("cmd"); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 15d4456..1a7ddd6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -52,6 +52,7 @@ import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISegment; +import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; @@ -61,6 +62,8 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { + private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class); + public static String getTableNameForHCat(TableDesc table) { String tableName = (table.isView()) ? table.getMaterializedName() : table.getName(); return String.format("%s.%s", table.getDatabase(), tableName).toUpperCase(); @@ -144,13 +147,23 @@ public class HiveMRInput implements IMRInput { final String jobWorkingDir = getJobWorkingDir(jobFlow); // create flat table first, then count and redistribute - jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); + if (flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ + jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, cubeName)); + jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, jobWorkingDir)); + }else{ + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); + } if (cubeConfig.isHiveRedistributeEnabled() == true) { jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); } - AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); - if (task != null) { - jobFlow.addTask(task); + logger.info(String.format("source aware:%d", flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType())); + if (flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){ + logger.info(String.format("skip createLookupHiveViewMaterializationStep")); + }else{ + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); + if (task != null) { + jobFlow.addTask(task); + } } } @@ -209,7 +222,39 @@ public class HiveMRInput implements IMRInput { return step; } + private AbstractExecutable createSqoopToFlatHiveStep(String jobWorkingDir, String cubeName) { + KylinConfig config = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); + String partCol = flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname + //using sqoop to extract data from jdbc source and dump them to hive + String selectSql = JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[]{partCol}); + String hiveTable = flatDesc.getTableName(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + String sqoopHome = config.getSqoopHome(); + String cmd= String.format(String.format("%s/sqoop import " + + "--connect %s --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" " + + "--target-dir %s/%s --split-by %s", sqoopHome, connectionUrl, driverClass, jdbcUser, + jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol)); + logger.info(String.format("sqoop cmd:%s", cmd)); + CmdStep step = new CmdStep(); + step.setCmd(cmd); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); + return step; + } + + private AbstractExecutable createFlatHiveTableFromFiles(String hiveInitStatements, String jobWorkingDir) { + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, "TEXTFILE"); + + HiveCmdStep step = new HiveCmdStep(); + step.setCmd(hiveInitStatements + dropTableHql + createTableHql); + return step; + } + private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { + //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java index 22bea46..8d4034c 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java @@ -18,15 +18,9 @@ package org.apache.kylin.source.hive; -import org.apache.hadoop.hive.ql.CommandNeedRetryException; - -import java.io.IOException; import java.util.List; -public interface IHiveClient { - void executeHQL(String hql) throws CommandNeedRetryException, IOException; - - void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException; +public interface IHiveClient extends IJDBCExecutor{ HiveTableMeta getHiveTableMeta(String database, String tableName) throws Exception; http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java new file mode 100644 index 0000000..521ea87 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.source.hive; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.kylin.metadata.model.TableDesc; + +import java.io.IOException; + +public interface IJDBCExecutor { + + void executeHQL(String hql) throws CommandNeedRetryException, IOException; + + void executeHQL(String[] hqls) throws CommandNeedRetryException, IOException; + + public String generateCreateSchemaSql(String schemaName); + + public String generateLoadDataSql(String tableName, String tableFileDir); + + public String[] generateCreateTableSql(TableDesc tableDesc); + + public String[] generateCreateViewSql(String viewName, String tableName); +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java new file mode 100644 index 0000000..193a37f --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class JdbcExplorer implements ISourceMetadataExplorer, IJDBCExecutor{ + private static final Logger logger = LoggerFactory.getLogger(JdbcExplorer.class); + + public static final String DIALECT_VERTICA="vertica"; + public static final String DIALECT_ORACLE="oracle"; + public static final String DIALECT_MYSQL="mysql"; + public static final String DIALECT_HIVE="hive"; + + public static final String TABLE_TYPE_TABLE="TABLE"; + public static final String TABLE_TYPE_VIEW="VIEW"; + + private KylinConfig config; + private DBConnConf dbconf; + private String dialect; + + public JdbcExplorer() { + config = KylinConfig.getInstanceFromEnv(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcDialect(); + } + + private String getSqlDataType(String javaDataType) { + if (DIALECT_VERTICA.equals(dialect)){ + if (javaDataType.toLowerCase().equals("double")){ + return "float"; + } + } + + return javaDataType.toLowerCase(); + } + + @Override + public String generateCreateSchemaSql(String schemaName){ + if (DIALECT_VERTICA.equals(dialect)){ + return String.format("CREATE schema IF NOT EXISTS %s", schemaName); + }else{ + logger.error(String.format("unsupported dialect %s.", dialect)); + return null; + } + } + + @Override + public String generateLoadDataSql(String tableName, String tableFileDir) { + if (DIALECT_VERTICA.equals(dialect)){ + return String.format("copy %s from local '%s/%s.csv' delimiter as ',';", tableName, tableFileDir, tableName); + }else{ + logger.error(String.format("unsupported dialect %s.", dialect)); + return null; + } + } + + @Override + public String[] generateCreateTableSql(TableDesc tableDesc) { + logger.info(String.format("gen create table sql:%s", tableDesc)); + String tableIdentity = String.format("%s.%s", tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase(); + String dropsql = "DROP TABLE IF EXISTS " + tableIdentity; + String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity; + + StringBuilder ddl = new StringBuilder(); + ddl.append("CREATE TABLE " + tableIdentity + "\n"); + ddl.append("(" + "\n"); + + for (int i = 0; i < tableDesc.getColumns().length; i++) { + ColumnDesc col = tableDesc.getColumns()[i]; + if (i > 0) { + ddl.append(","); + } + ddl.append(col.getName() + " " + getSqlDataType((col.getDatatype())) + "\n"); + } + + ddl.append(")"); + + return new String[] { dropsql, dropsql2, ddl.toString() }; + } + + @Override + public String[] generateCreateViewSql(String viewName, String tableName) { + + String dropView = "DROP VIEW IF EXISTS " + viewName; + String dropTable = "DROP TABLE IF EXISTS " + viewName; + + String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName); + + return new String[] { dropView, dropTable, createSql }; + } + + @Override + public void executeHQL(String sql) throws CommandNeedRetryException, IOException { + Connection con = SqlUtil.getConnection(dbconf); + logger.info(String.format(sql)); + SqlUtil.execUpdateSQL(con, sql); + SqlUtil.closeResources(con, null); + } + + @Override + public void executeHQL(String[] sqls) throws CommandNeedRetryException, IOException { + Connection con = SqlUtil.getConnection(dbconf); + for (String sql : sqls){ + logger.info(String.format(sql)); + SqlUtil.execUpdateSQL(con, sql); + } + SqlUtil.closeResources(con, null); + } + + @Override + public List<String> listDatabases() throws Exception { + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getSchemas(); + List<String> ret = new ArrayList<String>(); + /* + The schema columns are: + - TABLE_SCHEM String => schema name + - TABLE_CATALOG String => catalog name (may be null) + */ + while (rs.next()){ + String schema = rs.getString(1); + String catalog = rs.getString(2); + logger.info(String.format("%s,%s", schema, catalog)); + ret.add(schema); + } + SqlUtil.closeResources(con, null); + return ret; + } + + @Override + public List<String> listTables(String database) throws Exception { + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getTables(null, database, null, null); + List<String> ret = new ArrayList<String>(); + /* + - TABLE_CAT String => table catalog (may be null) + - TABLE_SCHEM String => table schema (may be null) + - TABLE_NAME String => table name + - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL + TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". + - REMARKS String => explanatory comment on the table + - TYPE_CAT String => the types catalog (may be null) + - TYPE_SCHEM String => the types schema (may be null) + - TYPE_NAME String => type name (may be null) + - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed + table (may be null) + - REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. + Values are "SYSTEM", "USER", "DERIVED". (may be null) + */ + while (rs.next()){ + String catalog = rs.getString(1); + String schema = rs.getString(2); + String name = rs.getString(3); + String type = rs.getString(4); + logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, type)); + ret.add(name); + } + SqlUtil.closeResources(con, null); + return ret; + } + + @Override + public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, String table) throws Exception { + + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase(database.toUpperCase()); + tableDesc.setName(table.toUpperCase()); + tableDesc.setUuid(UUID.randomUUID().toString()); + tableDesc.setLastModified(0); + + Connection con = SqlUtil.getConnection(dbconf); + DatabaseMetaData dbmd = con.getMetaData(); + ResultSet rs = dbmd.getTables(null, database, table, null); + String tableType=null; + while (rs.next()){ + tableType = rs.getString(4); + } + if (tableType!=null){ + tableDesc.setTableType(tableType); + }else{ + logger.error(String.format("table %s not found in schema:%s", table, database)); + } + /* + - 1. TABLE_CAT String => table catalog (may be null) + - 2. TABLE_SCHEM String => table schema (may be null) + - 3. TABLE_NAME String => table name + - 4. COLUMN_NAME String => column name + - 5. DATA_TYPE int => SQL type from java.sql.Types + - 6. TYPE_NAME String => Data source dependent type name, for a UDT the type name is fully qualified + - 7. COLUMN_SIZE int => column size. + - 8. BUFFER_LENGTH is not used. + - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is returned for data types where DECIMAL_DIGITS is not applicable. + - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) + - 11.NULLABLE int => is NULL allowed. + - columnNoNulls - might not allow NULL values + - columnNullable - definitely allows NULL values + - columnNullableUnknown - nullability unknown + - 12.REMARKS String => comment describing column (may be null) + - 13.COLUMN_DEF String => default value for the column, which should be interpreted as a string when the value is enclosed in single quotes (may be null) + - 14.SQL_DATA_TYPE int => unused + - 15.SQL_DATETIME_SUB int => unused + - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes in the column + - 17.ORDINAL_POSITION int => index of column in table (starting at 1) + - 18.IS_NULLABLE String => ISO rules are used to determine the nullability for a column. + - YES --- if the column can include NULLs + - NO --- if the column cannot include NULLs + - empty string --- if the nullability for the column is unknown + */ + List<ColumnDesc> columns = new ArrayList<ColumnDesc>(); + rs = dbmd.getColumns(null, database, table, null); + while (rs.next()){ + String tname = rs.getString(3); + String cname = rs.getString(4); + int type=rs.getInt(5); + String typeName=rs.getString(6); + int csize=rs.getInt(7); + int digits = rs.getInt(9); + int nullable = rs.getInt(11); + String comment = rs.getString(12); + int pos = rs.getInt(17); + logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, type, csize, digits, nullable, comment, pos)); + + ColumnDesc cdesc = new ColumnDesc(); + cdesc.setName(cname.toUpperCase()); + // use "double" in kylin for "float" + cdesc.setDatatype(typeName); + cdesc.setId(String.valueOf(pos)); + columns.add(cdesc); + } + + + tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()])); + + TableExtDesc tableExtDesc = new TableExtDesc(); + tableExtDesc.setName(table); + tableExtDesc.setUuid(UUID.randomUUID().toString()); + tableExtDesc.setLastModified(0); + tableExtDesc.init(); + + return Pair.newPair(tableDesc, tableExtDesc); + } + + @Override + public List<String> getRelatedKylinResources(TableDesc table) { + return Collections.emptyList(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java new file mode 100644 index 0000000..cc9cea8 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.source.SourcePartition; + +//used by reflection +public class JdbcSource implements ISource { + + @Override + public ISourceMetadataExplorer getSourceMetadataExplorer() { + return new JdbcExplorer(); + } + + @SuppressWarnings("unchecked") + @Override + public <I> I adaptToBuildEngine(Class<I> engineInterface) { + if (engineInterface == IMRInput.class) { + return (I) new HiveMRInput(); + } else { + throw new RuntimeException("Cannot adapt to " + engineInterface); + } + } + + @Override + public IReadableTable createReadableTable(TableDesc tableDesc) { + return new JdbcTable(tableDesc); + } + + @Override + public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition) { + SourcePartition result = SourcePartition.getCopyOf(srcPartition); + result.setStartOffset(0); + result.setEndOffset(0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java new file mode 100644 index 0000000..0871938 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import java.io.IOException; + + +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.IReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class JdbcTable implements IReadableTable { + + private static final Logger logger = LoggerFactory.getLogger(JdbcTable.class); + + final private String database; + final private String tableName; + + + public JdbcTable(TableDesc tableDesc) { + this.database = tableDesc.getDatabase(); + this.tableName = tableDesc.getName(); + } + + @Override + public TableReader getReader() throws IOException { + return new JdbcTableReader(database, tableName); + } + + @Override + public TableSignature getSignature() throws IOException { + String path = String.format("%s.%s", database, tableName); + long lastModified = System.currentTimeMillis(); // assume table is ever changing + int size=0; + return new TableSignature(path, size, lastModified); + } + + @Override + public boolean exists() { + return true; + } + + @Override + public String toString() { + return "database=[" + database + "], table=[" + tableName + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java new file mode 100644 index 0000000..8c70d61 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.source.IReadableTable.TableReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An implementation of TableReader with HCatalog for Hive table. + */ +public class JdbcTableReader implements TableReader { + private static final Logger logger = LoggerFactory.getLogger(JdbcTableReader.class); + + private String dbName; + private String tableName; + + private DBConnConf dbconf; + private String dialect; + private Connection jdbcCon; + private Statement statement; + private ResultSet rs; + private int colCount; + + /** + * Constructor for reading whole hive table + * @param dbName + * @param tableName + * @throws IOException + */ + public JdbcTableReader(String dbName, String tableName) throws IOException { + this.dbName = dbName; + this.tableName = tableName; + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String connectionUrl = config.getJdbcConnectionUrl(); + String driverClass = config.getJdbcDriver(); + String jdbcUser = config.getJdbcUser(); + String jdbcPass = config.getJdbcPass(); + dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, jdbcPass); + this.dialect = config.getJdbcDialect(); + jdbcCon = SqlUtil.getConnection(dbconf); + String sql = String.format("select * from %s.%s", dbName, tableName); + try { + statement = jdbcCon.createStatement(); + rs = statement.executeQuery(sql); + colCount = rs.getMetaData().getColumnCount(); + }catch(SQLException e){ + throw new IOException(String.format("error while exec %s", sql), e); + } + + } + + @Override + public boolean next() throws IOException { + try { + return rs.next(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @Override + public String[] getRow() { + String[] ret = new String[colCount]; + for (int i=1; i<=colCount; i++){ + try { + Object o = rs.getObject(i); + ret[i-1] = (o == null? null:o.toString()); + }catch(Exception e){ + logger.error("", e); + } + } + return ret; + } + + @Override + public void close() throws IOException { + SqlUtil.closeResources(jdbcCon, statement); + } + + public String toString() { + return "jdbc table reader for: " + dbName + "." + tableName; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java new file mode 100644 index 0000000..ba280df --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.hive; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Random; + +import javax.sql.DataSource; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +public class SqlUtil { + private static final Logger logger = LoggerFactory.getLogger(SqlUtil.class); + + public static void closeResources(Connection con, Statement statement){ + try{ + if (statement!=null && !statement.isClosed()){ + statement.close(); + } + }catch(Exception e){ + logger.error("", e); + } + + try{ + if (con!=null && !con.isClosed()){ + con.close(); + } + }catch(Exception e){ + logger.error("", e); + } + } + + + public static void execUpdateSQL(String sql, DataSource ds){ + Connection con = null; + try{ + con = ds.getConnection(); + execUpdateSQL(con, sql); + }catch(Exception e){ + logger.error("", e); + }finally{ + closeResources(con, null); + } + } + + public static void execUpdateSQL(Connection db, String sql){ + Statement statement=null; + try{ + statement = db.createStatement(); + statement.executeUpdate(sql); + }catch(Exception e){ + logger.error("", e); + }finally{ + closeResources(null, statement); + } + } + + public static int tryTimes=10; + public static Connection getConnection(DBConnConf dbconf){ + if (dbconf.getUrl()==null) + return null; + Connection con = null; + try { + Class.forName(dbconf.getDriver()); + }catch(Exception e){ + logger.error("", e); + } + boolean got=false; + int times=0; + Random r = new Random(); + while(!got && times<tryTimes){ + times++; + try { + con = DriverManager.getConnection(dbconf.getUrl(), dbconf.getUser(), dbconf.getPass()); + got = true; + }catch(Exception e){ + logger.warn("while use:" + dbconf, e); + try { + int rt = r.nextInt(10); + Thread.sleep(rt*1000); + } catch (InterruptedException e1) { + } + } + } + return con; + } +}