Repository: kylin
Updated Branches:
  refs/heads/KYLIN-1351 [created] 5e1c9be45


KYLIN-1351 Support general RDBMS data source through JDBC


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c7f32b83
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c7f32b83
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c7f32b83

Branch: refs/heads/KYLIN-1351
Commit: c7f32b83a1c114014a30d295117dcba1b1dc7e81
Parents: c2b2cea
Author: chengyi <phillipchen...@gmail.com>
Authored: Mon Jun 5 15:49:37 2017 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Tue Jun 6 16:36:43 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  28 ++
 .../java/org/apache/kylin/cube/CubeManager.java |   4 +-
 .../kylin/cube/cli/DictionaryGeneratorCLI.java  |   5 +-
 .../InMemCubeBuilderInputConverter.java         |   4 +
 .../org/apache/kylin/job/JoinedFlatTable.java   |  61 +++-
 .../kylin/metadata/model/ISourceAware.java      |   2 +
 .../source/datagen/ModelDataGenerator.java      |  13 +-
 .../kylin/source/hive/BeelineHiveClient.java    |  23 +-
 .../apache/kylin/source/hive/CLIHiveClient.java |  55 ++++
 .../org/apache/kylin/source/hive/CmdStep.java   |  69 +++++
 .../apache/kylin/source/hive/DBConnConf.java    |  78 +++++
 .../kylin/source/hive/HiveClientFactory.java    |  14 +
 .../apache/kylin/source/hive/HiveCmdStep.java   |  77 +++++
 .../apache/kylin/source/hive/HiveMRInput.java   |  53 +++-
 .../apache/kylin/source/hive/IHiveClient.java   |   8 +-
 .../apache/kylin/source/hive/IJDBCExecutor.java |  39 +++
 .../apache/kylin/source/hive/JdbcExplorer.java  | 289 +++++++++++++++++++
 .../apache/kylin/source/hive/JdbcSource.java    |  60 ++++
 .../org/apache/kylin/source/hive/JdbcTable.java |  67 +++++
 .../kylin/source/hive/JdbcTableReader.java      | 106 +++++++
 .../org/apache/kylin/source/hive/SqlUtil.java   | 106 +++++++
 21 files changed, 1133 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c83c546..a776a61 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -588,6 +588,34 @@ abstract public class KylinConfigBase implements 
Serializable {
     public Map<String, String> getKafkaConfigOverride() {
         return getPropertiesByPrefix("kylin.source.kafka.config-override.");
     }
+    
+    // 
============================================================================
+    // SOURCE.JDBC
+    // 
============================================================================
+
+    public String getJdbcConnectionUrl() {
+        return getOptional("kylin.source.jdbc.connectionUrl");
+    }
+    
+    public String getJdbcDriver() {
+        return getOptional("kylin.source.jdbc.driver");
+    }
+    
+    public String getJdbcDialect() {
+        return getOptional("kylin.source.jdbc.dialect");
+    }
+    
+    public String getJdbcUser() {
+        return getOptional("kylin.source.jdbc.user");
+    }
+    
+    public String getJdbcPass() {
+        return getOptional("kylin.source.jdbc.pass");
+    }
+    
+    public String getSqoopHome() {
+        return getOptional("kylin.source.sqoop.home");
+    }
 
     // 
============================================================================
     // STORAGE.HBASE

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 32e2316..c348055 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -54,6 +54,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
@@ -282,7 +283,8 @@ public class CubeManager implements IRealizationProvider {
         SnapshotManager snapshotMgr = getSnapshotManager();
 
         TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable));
-        if (tableDesc.isView()) {
+        if (tableDesc.isView() &&
+                tableDesc.getSourceType()!=ISourceAware.ID_JDBC) {
             String tableName = tableDesc.getMaterializedName();
             
tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
             tableDesc.setName(tableName);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index e7368e8..9555bc3 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -32,6 +32,7 @@ import org.apache.kylin.dict.DictionaryProvider;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TableRef;
@@ -111,7 +112,8 @@ public class DictionaryGeneratorCLI {
         } else {
             MetadataManager metadataManager = 
MetadataManager.getInstance(config);
             TableDesc tableDesc = new 
TableDesc(metadataManager.getTableDesc(srcTable));
-            if (tableDesc.isView()) {
+            if (tableDesc.isView() &&
+                    tableDesc.getSourceType()!=ISourceAware.ID_JDBC) {
                 TableDesc materializedTbl = new TableDesc();
                 
materializedTbl.setDatabase(config.getHiveDatabaseForIntermediateTable());
                 materializedTbl.setName(tableDesc.getMaterializedName());
@@ -120,7 +122,6 @@ public class DictionaryGeneratorCLI {
                 inpTable = SourceFactory.createReadableTable(tableDesc);
             }
         }
-
         return inpTable;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index ab44f63..387feb7 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -32,6 +32,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
@@ -39,6 +41,8 @@ import com.google.common.collect.Lists;
  */
 public class InMemCubeBuilderInputConverter {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(InMemCubeBuilderInputConverter.class);
+    
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
 
     private final CubeJoinedFlatTableEnrich flatDesc;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 4665465..f9ee1b1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -19,7 +19,10 @@
 package org.apache.kylin.job;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
@@ -51,6 +54,10 @@ public class JoinedFlatTable {
     }
 
     public static String generateCreateTableStatement(IJoinedFlatTableDesc 
flatDesc, String storageDfsDir) {
+        return generateCreateTableStatement(flatDesc, storageDfsDir, 
"SEQUENCEFILE");
+    }
+    
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc 
flatDesc, String storageDfsDir, String format) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + 
flatDesc.getTableName() + "\n");
@@ -64,7 +71,10 @@ public class JoinedFlatTable {
             ddl.append(colName(col) + " " + getHiveDataType(col.getDatatype()) 
+ "\n");
         }
         ddl.append(")" + "\n");
-        ddl.append("STORED AS SEQUENCEFILE" + "\n");
+        if ("TEXTFILE".equals(format)){
+            ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
+        }
+        ddl.append("STORED AS " + format + "\n");
         ddl.append("LOCATION '" + getTableDir(flatDesc, storageDfsDir) + 
"';").append("\n");
         return ddl.toString();
     }
@@ -102,17 +112,38 @@ public class JoinedFlatTable {
     }
 
     public static String generateSelectDataStatement(IJoinedFlatTableDesc 
flatDesc) {
+        return generateSelectDataStatement(flatDesc, false);
+    }
+    
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc 
flatDesc, boolean singleLine) {
+        return generateSelectDataStatement(flatDesc, singleLine, null);
+    }
+    
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc 
flatDesc, boolean singleLine, String[] skipAs) {
+        String sep = "\n";
+        if (singleLine) sep=" ";
+        List<String> skipAsList = new ArrayList<String>();
+        if (skipAs!=null){
+            skipAsList = Arrays.asList(skipAs);
+        }
+        
         StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + "\n");
+        sql.append("SELECT" + sep);
+        
         for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
             TblColRef col = flatDesc.getAllColumns().get(i);
             if (i > 0) {
                 sql.append(",");
             }
-            sql.append(col.getExpressionInSourceDB() + "\n");
+            String colTotalName = String.format("%s.%s", 
col.getTableRef().getTableName(), col.getName());
+            if (skipAsList.contains(colTotalName)){
+                sql.append(col.getExpressionInSourceDB() + sep);
+            }else{
+                sql.append(col.getExpressionInSourceDB() + " as " + 
colName(col) + sep);
+            }
         }
-        appendJoinStatement(flatDesc, sql);
-        appendWhereStatement(flatDesc, sql);
+        appendJoinStatement(flatDesc, sql, singleLine);
+        appendWhereStatement(flatDesc, sql, singleLine);
         return sql.toString();
     }
 
@@ -124,13 +155,15 @@ public class JoinedFlatTable {
         appendWhereStatement(flatDesc, sql);
         return sql.toString();
     }
-
-    private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql) {
+    
+    private static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql, boolean singleLine) {
+        String sep="\n";
+        if (singleLine) sep=" ";
         Set<TableRef> dimTableCache = new HashSet<>();
 
         DataModelDesc model = flatDesc.getDataModel();
         TableRef rootTable = model.getRootFactTable();
-        sql.append("FROM " + rootTable.getTableIdentity() + " as " + 
rootTable.getAlias() + " \n");
+        sql.append("FROM " + rootTable.getTableIdentity() + " as " + 
rootTable.getAlias() + " " + sep);
 
         for (JoinTableDesc lookupDesc : model.getJoinTables()) {
             JoinDesc join = lookupDesc.getJoin();
@@ -143,7 +176,7 @@ public class JoinedFlatTable {
                     if (pk.length != fk.length) {
                         throw new RuntimeException("Invalid join condition of 
lookup table:" + lookupDesc);
                     }
-                    sql.append(joinType + " JOIN " + 
dimTable.getTableIdentity() + " as " + dimTable.getAlias() + "\n");
+                    sql.append(joinType + " JOIN " + 
dimTable.getTableIdentity() + " as " + dimTable.getAlias() + sep);
                     sql.append("ON ");
                     for (int i = 0; i < pk.length; i++) {
                         if (i > 0) {
@@ -151,7 +184,7 @@ public class JoinedFlatTable {
                         }
                         sql.append(fk[i].getIdentity() + " = " + 
pk[i].getIdentity());
                     }
-                    sql.append("\n");
+                    sql.append(sep);
 
                     dimTableCache.add(dimTable);
                 }
@@ -172,6 +205,12 @@ public class JoinedFlatTable {
     }
 
     private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql) {
+        appendWhereStatement(flatDesc, sql, false);
+    }
+    private static void appendWhereStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql, boolean singleLine) {
+        String sep="\n";
+        if (singleLine) sep=" ";
+        
         boolean hasCondition = false;
         StringBuilder whereBuilder = new StringBuilder();
         whereBuilder.append("WHERE");
@@ -192,7 +231,7 @@ public class JoinedFlatTable {
                 if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
                     whereBuilder.append(hasCondition ? " AND (" : " (");
                     
whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
 dateStart, dateEnd));
-                    whereBuilder.append(")\n");
+                    whereBuilder.append(")" + sep);
                     hasCondition = true;
                 }
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 0f98d5d..50ca773 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -24,6 +24,8 @@ public interface ISourceAware {
     public static final int ID_STREAMING = 1;
     public static final int ID_SPARKSQL = 5;
     public static final int ID_EXTERNAL = 7;
+    
+    public static final int ID_JDBC = 8;
 
     int getSourceType();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
index 3caf2f4..07bbab0 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/source/datagen/ModelDataGenerator.java
@@ -46,11 +46,14 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
 public class ModelDataGenerator {
-
+    private static final Logger logger = 
LoggerFactory.getLogger(ModelDataGenerator.class);
+    private MetadataManager mdMgr;//
     final private DataModelDesc model;
     final private int targetRows;
     final private ResourceStore outputStore;
@@ -58,6 +61,11 @@ public class ModelDataGenerator {
 
     boolean outprint = false; // for debug
 
+    public ModelDataGenerator(MetadataManager mdMgr, String modelName, int 
nRows){
+        this(mdMgr.getDataModelDesc(modelName), nRows);
+        this.mdMgr = mdMgr;
+    }
+    
     public ModelDataGenerator(DataModelDesc model, int nRows) {
         this(model, nRows, ResourceStore.getStore(model.getConfig()));
     }
@@ -78,13 +86,14 @@ public class ModelDataGenerator {
         Set<TableDesc> allTableDesc = new LinkedHashSet<>();
 
         JoinTableDesc[] allTables = model.getJoinTables();
-        for (int i = allTables.length - 1; i >= -1; i--) {
+        for (int i = allTables.length - 1; i >= -1; i--) {//reverse order 
needed for FK generation
             TableDesc table = (i == -1) ? 
model.getRootFactTable().getTableDesc() : 
allTables[i].getTableRef().getTableDesc();
             allTableDesc.add(table);
             
             if (generated.contains(table))
                 continue;
 
+            logger.info(String.format("generating data for %s", table));
             boolean gen = generateTable(table);
 
             if (gen)

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index 468ccb1..81a2ec2 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -30,6 +30,7 @@ import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.kylin.common.util.DBUtils;
+import org.apache.kylin.metadata.model.TableDesc;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -115,7 +116,7 @@ public class BeelineHiveClient implements IHiveClient {
         }
         return count;
     }
-
+    
     @Override
     public void executeHQL(String hql) throws CommandNeedRetryException, 
IOException {
         throw new UnsupportedOperationException();
@@ -220,4 +221,24 @@ public class BeelineHiveClient implements IHiveClient {
         System.out.println(hiveTableMeta);
         loader.close();
     }
+
+    @Override
+    public String generateCreateSchemaSql(String schemaName) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String generateLoadDataSql(String tableName, String tableFileDir) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String[] generateCreateTableSql(TableDesc tableDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String[] generateCreateViewSql(String viewName, String tableName) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
index e8a93bd..f8e4c29 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 
 import com.google.common.collect.Lists;
 
@@ -50,6 +52,59 @@ public class CLIHiveClient implements IHiveClient {
         hiveConf = new HiveConf(CLIHiveClient.class);
     }
 
+    private static String getHiveDataType(String javaDataType) {
+        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") 
? "string" : javaDataType;
+        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? 
"int" : hiveDataType;
+
+        return hiveDataType.toLowerCase();
+    }
+    
+    @Override
+    public String generateCreateSchemaSql(String schemaName){
+        return String.format("CREATE DATABASE IF NOT EXISTS %s", schemaName);
+    }
+    
+    @Override
+    public String generateLoadDataSql(String tableName, String tableFileDir) {
+        return "LOAD DATA LOCAL INPATH '" + tableFileDir + "/" + tableName + 
".csv' OVERWRITE INTO TABLE " + tableName;
+    }
+
+    @Override
+    public String[] generateCreateTableSql(TableDesc tableDesc) {
+
+        String dropsql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
+        String dropsql2 = "DROP VIEW IF EXISTS " + tableDesc.getIdentity();
+
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + 
getHiveDataType((col.getDatatype())) + "\n");
+        }
+
+        ddl.append(")" + "\n");
+        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
+        ddl.append("STORED AS TEXTFILE");
+
+        return new String[] { dropsql, dropsql2, ddl.toString() };
+    }
+
+    @Override
+    public String[] generateCreateViewSql(String viewName, String tableName) {
+
+        String dropView = "DROP VIEW IF EXISTS " + viewName;
+        String dropTable = "DROP TABLE IF EXISTS " + viewName;
+
+        String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + 
tableName);
+
+        return new String[] { dropView, dropTable, createSql };
+    }
+    
     /**
      * only used by Deploy Util
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java
new file mode 100644
index 0000000..a38e0d9
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CmdStep.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class CmdStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CmdStep.class);
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    public void setCmd(String cmd) {
+        setParam("cmd", cmd);
+    }
+
+    public CmdStep(){
+    }
+    
+    protected void sqoopFlatHiveTable(KylinConfig config) throws IOException {
+        String cmd = getParam("cmd");
+        logger.info(String.format("exe cmd:%s", cmd));
+        Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to create flat hive table, 
error code " + response.getFirst());
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        try {
+            sqoopFlatHiveTable(config);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with 
exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java
new file mode 100644
index 0000000..0e0d58b
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/DBConnConf.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+public class DBConnConf {
+    public static final String KEY_DRIVER="driver";
+    public static final String KEY_URL="url";
+    public static final String KEY_USER="user";
+    public static final String KEY_PASS="pass";
+    
+    private String driver;
+    private String url;
+    private String user;
+    private String pass;
+    
+    public DBConnConf(){    
+    }
+    
+    public DBConnConf(String prefix, PropertiesConfiguration pc){
+        driver = pc.getString(prefix + KEY_DRIVER);
+        url = pc.getString(prefix + KEY_URL);
+        user = pc.getString(prefix + KEY_USER);
+        pass = pc.getString(prefix + KEY_PASS);
+    }
+    
+    public DBConnConf(String driver, String url, String user, String pass){
+        this.driver = driver;
+        this.url = url;
+        this.user = user;
+        this.pass = pass;
+    }
+    
+    public String toString(){
+        return String.format("%s,%s,%s,%s", driver, url, user, pass);
+    }
+    public String getDriver() {
+        return driver;
+    }
+    public void setDriver(String driver) {
+        this.driver = driver;
+    }
+    public String getUrl() {
+        return url;
+    }
+    public void setUrl(String url) {
+        this.url = url;
+    }
+    public String getUser() {
+        return user;
+    }
+    public void setUser(String user) {
+        this.user = user;
+    }
+    public String getPass() {
+        return pass;
+    }
+    public void setPass(String pass) {
+        this.pass = pass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
index 8c883af..5f8d536 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
@@ -19,8 +19,22 @@
 package org.apache.kylin.source.hive;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.ISourceAware;
 
 public class HiveClientFactory {
+    
+    public static IJDBCExecutor getJDBCExector(String modelName) {
+        MetadataManager mgr = 
MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        DataModelDesc model = mgr.getDataModelDesc(modelName);
+        if 
(model.getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){
+            return new JdbcExplorer();
+        }else{
+            return getHiveClient();
+        }
+    }
+    
     public static IHiveClient getHiveClient() {
         if 
("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
             return new CLIHiveClient();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java
new file mode 100644
index 0000000..ee6fa1e
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdStep.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HiveCmdStep extends AbstractExecutable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HiveCmdStep.class);
+    private final PatternedLogger stepLogger = new PatternedLogger(logger);
+
+    protected void createFlatHiveTable(KylinConfig config) throws IOException {
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
+        hiveCmdBuilder.addStatement(getCmd());
+        final String cmd = hiveCmdBuilder.toString();
+
+        stepLogger.log("cmd: ");
+        stepLogger.log(cmd);
+
+        Pair<Integer, String> response = 
config.getCliCommandExecutor().execute(cmd, stepLogger);
+        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        if (response.getFirst() != 0) {
+            throw new RuntimeException("Failed to create flat hive table, 
error code " + response.getFirst());
+        }
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        try {
+            createFlatHiveTable(config);
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
+
+        } catch (Exception e) {
+            logger.error("job:" + getId() + " execute finished with 
exception", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+        }
+    }
+
+    public void setCmd(String sql) {
+        setParam("cmd", sql);
+    }
+
+    public String getCmd() {
+        return getParam("cmd");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 15d4456..1a7ddd6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -52,6 +52,7 @@ import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
@@ -61,6 +62,8 @@ import com.google.common.collect.Sets;
 
 public class HiveMRInput implements IMRInput {
 
+    private static final Logger logger = 
LoggerFactory.getLogger(HiveMRInput.class);
+    
     public static String getTableNameForHCat(TableDesc table) {
         String tableName = (table.isView()) ? table.getMaterializedName() : 
table.getName();
         return String.format("%s.%s", table.getDatabase(), 
tableName).toUpperCase();
@@ -144,13 +147,23 @@ public class HiveMRInput implements IMRInput {
             final String jobWorkingDir = getJobWorkingDir(jobFlow);
 
             // create flat table first, then count and redistribute
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, 
jobWorkingDir, cubeName));
+            if 
(flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){
+                jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, 
cubeName));
+                
jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, 
jobWorkingDir));
+            }else{
+                jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, 
jobWorkingDir, cubeName));
+            }
             if (cubeConfig.isHiveRedistributeEnabled() == true) {
                 
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, 
cubeName));
             }
-            AbstractExecutable task = 
createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
-            if (task != null) {
-                jobFlow.addTask(task);
+            logger.info(String.format("source aware:%d", 
flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()));
+            if 
(flatDesc.getDataModel().getRootFactTable().getTableDesc().getSourceType()==ISourceAware.ID_JDBC){
+                logger.info(String.format("skip 
createLookupHiveViewMaterializationStep"));
+            }else{
+                AbstractExecutable task = 
createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
+                if (task != null) {
+                    jobFlow.addTask(task);
+                }
             }
         }
 
@@ -209,7 +222,39 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
+        private AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
+            KylinConfig config = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
+            String partCol = 
flatDesc.getDataModel().getPartitionDesc().getPartitionDateColumn();//tablename.colname
+            //using sqoop to extract data from jdbc source and dump them to 
hive
+            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new 
String[]{partCol});
+            String hiveTable = flatDesc.getTableName();
+            String connectionUrl = config.getJdbcConnectionUrl();
+            String driverClass = config.getJdbcDriver();
+            String jdbcUser = config.getJdbcUser();
+            String jdbcPass = config.getJdbcPass();
+            String sqoopHome = config.getSqoopHome();
+            String cmd= String.format(String.format("%s/sqoop import "
+                    + "--connect %s --driver %s --username %s --password %s 
--query \"%s AND \\$CONDITIONS\" "
+                    + "--target-dir %s/%s --split-by %s", sqoopHome, 
connectionUrl, driverClass, jdbcUser, 
+                    jdbcPass, selectSql, jobWorkingDir, hiveTable, partCol));
+            logger.info(String.format("sqoop cmd:%s", cmd));
+            CmdStep step = new CmdStep();
+            step.setCmd(cmd);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+            return step;
+        }
+        
+        private AbstractExecutable createFlatHiveTableFromFiles(String 
hiveInitStatements, String jobWorkingDir) {
+            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir, 
"TEXTFILE");
+            
+            HiveCmdStep step = new HiveCmdStep();
+            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
+            return step;
+        }
+        
         private AbstractExecutable createFlatHiveTableStep(String 
hiveInitStatements, String jobWorkingDir, String cubeName) {
+            //from hive to hive
             final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
             final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
             String insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(flatDesc);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
index 22bea46..8d4034c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java
@@ -18,15 +18,9 @@
 
 package org.apache.kylin.source.hive;
 
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-
-import java.io.IOException;
 import java.util.List;
 
-public interface IHiveClient {
-    void executeHQL(String hql) throws CommandNeedRetryException, IOException;
-
-    void executeHQL(String[] hqls) throws CommandNeedRetryException, 
IOException;
+public interface IHiveClient extends IJDBCExecutor{
 
     HiveTableMeta getHiveTableMeta(String database, String tableName) throws 
Exception;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java
new file mode 100644
index 0000000..521ea87
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IJDBCExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.hive;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.kylin.metadata.model.TableDesc;
+
+import java.io.IOException;
+
+public interface IJDBCExecutor {
+    
+    void executeHQL(String hql) throws CommandNeedRetryException, IOException;
+
+    void executeHQL(String[] hqls) throws CommandNeedRetryException, 
IOException;
+    
+    public String generateCreateSchemaSql(String schemaName);
+    
+    public String generateLoadDataSql(String tableName, String tableFileDir);
+    
+    public String[] generateCreateTableSql(TableDesc tableDesc);
+    
+    public String[] generateCreateViewSql(String viewName, String tableName);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java
new file mode 100644
index 0000000..193a37f
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcExplorer.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JdbcExplorer implements ISourceMetadataExplorer, IJDBCExecutor{
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcExplorer.class);
+    
+    public static final String DIALECT_VERTICA="vertica";
+    public static final String DIALECT_ORACLE="oracle";
+    public static final String DIALECT_MYSQL="mysql";
+    public static final String DIALECT_HIVE="hive";
+    
+    public static final String TABLE_TYPE_TABLE="TABLE";
+    public static final String TABLE_TYPE_VIEW="VIEW";
+    
+    private KylinConfig config;
+    private DBConnConf dbconf;
+    private String dialect;
+
+    public JdbcExplorer() {
+        config = KylinConfig.getInstanceFromEnv();
+        String connectionUrl = config.getJdbcConnectionUrl();
+        String driverClass = config.getJdbcDriver();
+        String jdbcUser = config.getJdbcUser();
+        String jdbcPass = config.getJdbcPass();
+        dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, 
jdbcPass);
+        this.dialect = config.getJdbcDialect();
+    }
+    
+    private String getSqlDataType(String javaDataType) {
+        if (DIALECT_VERTICA.equals(dialect)){
+            if (javaDataType.toLowerCase().equals("double")){
+                return "float";
+            }
+        }
+
+        return javaDataType.toLowerCase();
+    }
+    
+    @Override
+    public String generateCreateSchemaSql(String schemaName){
+        if (DIALECT_VERTICA.equals(dialect)){
+            return String.format("CREATE schema IF NOT EXISTS %s", schemaName);
+        }else{
+            logger.error(String.format("unsupported dialect %s.", dialect));
+            return null;
+        }
+    }
+    
+    @Override
+    public String generateLoadDataSql(String tableName, String tableFileDir) {
+        if (DIALECT_VERTICA.equals(dialect)){
+            return String.format("copy %s from local '%s/%s.csv' delimiter as 
',';", tableName, tableFileDir, tableName);
+        }else{
+            logger.error(String.format("unsupported dialect %s.", dialect));
+            return null;
+        }
+    }
+
+    @Override
+    public String[] generateCreateTableSql(TableDesc tableDesc) {
+        logger.info(String.format("gen create table sql:%s", tableDesc));
+        String tableIdentity = String.format("%s.%s", 
tableDesc.getDatabase().toUpperCase(), tableDesc.getName()).toUpperCase();
+        String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
+        String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
+
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("CREATE TABLE " + tableIdentity + "\n");
+        ddl.append("(" + "\n");
+
+        for (int i = 0; i < tableDesc.getColumns().length; i++) {
+            ColumnDesc col = tableDesc.getColumns()[i];
+            if (i > 0) {
+                ddl.append(",");
+            }
+            ddl.append(col.getName() + " " + 
getSqlDataType((col.getDatatype())) + "\n");
+        }
+
+        ddl.append(")");
+
+        return new String[] { dropsql, dropsql2, ddl.toString() };
+    }
+
+    @Override
+    public String[] generateCreateViewSql(String viewName, String tableName) {
+
+        String dropView = "DROP VIEW IF EXISTS " + viewName;
+        String dropTable = "DROP TABLE IF EXISTS " + viewName;
+
+        String createSql = ("CREATE VIEW " + viewName + " AS SELECT * FROM " + 
tableName);
+
+        return new String[] { dropView, dropTable, createSql };
+    }
+
+    @Override
+    public void executeHQL(String sql) throws CommandNeedRetryException, 
IOException {
+        Connection con = SqlUtil.getConnection(dbconf);
+        logger.info(String.format(sql));
+        SqlUtil.execUpdateSQL(con, sql);
+        SqlUtil.closeResources(con, null);
+    }
+
+    @Override
+    public void executeHQL(String[] sqls) throws CommandNeedRetryException, 
IOException {
+        Connection con = SqlUtil.getConnection(dbconf);
+        for (String sql : sqls){
+            logger.info(String.format(sql));
+            SqlUtil.execUpdateSQL(con, sql);
+        }
+        SqlUtil.closeResources(con, null);
+    }
+
+    @Override
+    public List<String> listDatabases() throws Exception {
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getSchemas();
+        List<String> ret = new ArrayList<String>();
+        /*
+        The schema columns are: 
+            - TABLE_SCHEM String => schema name 
+            - TABLE_CATALOG String => catalog name (may be null) 
+        */
+        while (rs.next()){
+            String schema = rs.getString(1);
+            String catalog = rs.getString(2);
+            logger.info(String.format("%s,%s", schema, catalog));
+            ret.add(schema);
+        }
+        SqlUtil.closeResources(con, null);
+        return ret;
+    }
+
+    @Override
+    public List<String> listTables(String database) throws Exception {
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getTables(null, database, null, null);
+        List<String> ret = new ArrayList<String>();
+        /*
+    - TABLE_CAT String => table catalog (may be null) 
+    - TABLE_SCHEM String => table schema (may be null) 
+    - TABLE_NAME String => table name 
+    - TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", 
"SYSTEM TABLE", "GLOBAL 
+     TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". 
+    - REMARKS String => explanatory comment on the table 
+    - TYPE_CAT String => the types catalog (may be null) 
+    - TYPE_SCHEM String => the types schema (may be null) 
+    - TYPE_NAME String => type name (may be null) 
+    - SELF_REFERENCING_COL_NAME String => name of the designated "identifier" 
column of a typed 
+     table (may be null) 
+    - REF_GENERATION String => specifies how values in 
SELF_REFERENCING_COL_NAME are created. 
+     Values are "SYSTEM", "USER", "DERIVED". (may be null) 
+         */
+        while (rs.next()){
+            String catalog = rs.getString(1);
+            String schema = rs.getString(2);
+            String name = rs.getString(3);
+            String type = rs.getString(4);
+            logger.info(String.format("%s,%s,%s,%s", schema, catalog, name, 
type));
+            ret.add(name);
+        }
+        SqlUtil.closeResources(con, null);
+        return ret;
+    }
+
+    @Override
+    public Pair<TableDesc, TableExtDesc> loadTableMetadata(String database, 
String table) throws Exception {
+
+        TableDesc tableDesc = new TableDesc();
+        tableDesc.setDatabase(database.toUpperCase());
+        tableDesc.setName(table.toUpperCase());
+        tableDesc.setUuid(UUID.randomUUID().toString());
+        tableDesc.setLastModified(0);
+        
+        Connection con = SqlUtil.getConnection(dbconf);
+        DatabaseMetaData dbmd = con.getMetaData();
+        ResultSet rs = dbmd.getTables(null, database, table, null);
+        String tableType=null;
+        while (rs.next()){
+            tableType = rs.getString(4);
+        }
+        if (tableType!=null){
+            tableDesc.setTableType(tableType);
+        }else{
+            logger.error(String.format("table %s not found in schema:%s", 
table, database));
+        }
+        /*
+    - 1. TABLE_CAT String => table catalog (may be null) 
+    - 2. TABLE_SCHEM String => table schema (may be null) 
+    - 3. TABLE_NAME String => table name 
+    - 4. COLUMN_NAME String => column name 
+    - 5. DATA_TYPE int => SQL type from java.sql.Types 
+    - 6. TYPE_NAME String => Data source dependent type name, for a UDT the 
type name is fully qualified 
+    - 7. COLUMN_SIZE int => column size. 
+    - 8. BUFFER_LENGTH is not used. 
+    - 9. DECIMAL_DIGITS int => the number of fractional digits. Null is 
returned for data types where DECIMAL_DIGITS is not applicable. 
+    - 10.NUM_PREC_RADIX int => Radix (typically either 10 or 2) 
+    - 11.NULLABLE int => is NULL allowed. 
+        - columnNoNulls - might not allow NULL values 
+        - columnNullable - definitely allows NULL values 
+        - columnNullableUnknown - nullability unknown 
+    - 12.REMARKS String => comment describing column (may be null) 
+    - 13.COLUMN_DEF String => default value for the column, which should be 
interpreted as a string when the value is enclosed in single quotes (may be 
null) 
+    - 14.SQL_DATA_TYPE int => unused 
+    - 15.SQL_DATETIME_SUB int => unused 
+    - 16.CHAR_OCTET_LENGTH int => for char types the maximum number of bytes 
in the column 
+    - 17.ORDINAL_POSITION int => index of column in table (starting at 1) 
+    - 18.IS_NULLABLE String => ISO rules are used to determine the nullability 
for a column. 
+        - YES --- if the column can include NULLs 
+        - NO --- if the column cannot include NULLs 
+        - empty string --- if the nullability for the column is unknown
+         */
+        List<ColumnDesc> columns = new ArrayList<ColumnDesc>();
+        rs = dbmd.getColumns(null, database, table, null);
+        while (rs.next()){
+            String tname = rs.getString(3);
+            String cname = rs.getString(4);
+            int type=rs.getInt(5);
+            String typeName=rs.getString(6);
+            int csize=rs.getInt(7);
+            int digits = rs.getInt(9);
+            int nullable = rs.getInt(11);
+            String comment = rs.getString(12);
+            int pos = rs.getInt(17);
+            logger.info(String.format("%s,%s,%d,%d,%d,%d,%s,%d", tname, cname, 
type, csize, digits, nullable, comment, pos));
+            
+            ColumnDesc cdesc = new ColumnDesc();
+            cdesc.setName(cname.toUpperCase());
+            // use "double" in kylin for "float"
+            cdesc.setDatatype(typeName);
+            cdesc.setId(String.valueOf(pos));
+            columns.add(cdesc);
+        }
+        
+        
+        tableDesc.setColumns(columns.toArray(new ColumnDesc[columns.size()]));
+
+        TableExtDesc tableExtDesc = new TableExtDesc();
+        tableExtDesc.setName(table);
+        tableExtDesc.setUuid(UUID.randomUUID().toString());
+        tableExtDesc.setLastModified(0);
+        tableExtDesc.init();
+
+        return Pair.newPair(tableDesc, tableExtDesc);
+    }
+
+    @Override
+    public List<String> getRelatedKylinResources(TableDesc table) {
+        return Collections.emptyList();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java
new file mode 100644
index 0000000..cc9cea8
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.source.SourcePartition;
+
+//used by reflection
+public class JdbcSource implements ISource {
+
+    @Override
+    public ISourceMetadataExplorer getSourceMetadataExplorer() {
+        return new JdbcExplorer();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == IMRInput.class) {
+            return (I) new HiveMRInput();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+
+    @Override
+    public IReadableTable createReadableTable(TableDesc tableDesc) {
+        return new JdbcTable(tableDesc);
+    }
+
+    @Override
+    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable 
buildable, SourcePartition srcPartition) {
+        SourcePartition result = SourcePartition.getCopyOf(srcPartition);
+        result.setStartOffset(0);
+        result.setEndOffset(0);
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java
new file mode 100644
index 0000000..0871938
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTable.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class JdbcTable implements IReadableTable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcTable.class);
+
+    final private String database;
+    final private String tableName;
+
+
+    public JdbcTable(TableDesc tableDesc) {
+        this.database = tableDesc.getDatabase();
+        this.tableName = tableDesc.getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new JdbcTableReader(database, tableName);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        String path = String.format("%s.%s", database, tableName);
+        long lastModified = System.currentTimeMillis(); // assume table is 
ever changing
+        int size=0;
+        return new TableSignature(path, size, lastModified);
+    }
+    
+    @Override
+    public boolean exists() {
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "database=[" + database + "], table=[" + tableName + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java
new file mode 100644
index 0000000..8c70d61
--- /dev/null
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/JdbcTableReader.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.source.IReadableTable.TableReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of TableReader with HCatalog for Hive table.
+ */
+public class JdbcTableReader implements TableReader {
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcTableReader.class);
+    
+    private String dbName;
+    private String tableName;
+
+    private DBConnConf dbconf;
+    private String dialect;
+    private Connection jdbcCon;
+    private Statement statement;
+    private ResultSet rs;
+    private int colCount;
+
+    /**
+     * Constructor for reading whole hive table
+     * @param dbName
+     * @param tableName
+     * @throws IOException
+     */
+    public JdbcTableReader(String dbName, String tableName) throws IOException 
{
+        this.dbName = dbName;
+        this.tableName = tableName;
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        String connectionUrl = config.getJdbcConnectionUrl();
+        String driverClass = config.getJdbcDriver();
+        String jdbcUser = config.getJdbcUser();
+        String jdbcPass = config.getJdbcPass();
+        dbconf = new DBConnConf(driverClass, connectionUrl, jdbcUser, 
jdbcPass);
+        this.dialect = config.getJdbcDialect();
+        jdbcCon = SqlUtil.getConnection(dbconf);
+        String sql = String.format("select * from %s.%s", dbName, tableName);
+        try {
+            statement = jdbcCon.createStatement();
+            rs = statement.executeQuery(sql);
+            colCount = rs.getMetaData().getColumnCount();
+        }catch(SQLException e){
+            throw new IOException(String.format("error while exec %s", sql), 
e);
+        }
+        
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        try {
+            return rs.next();
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public String[] getRow() {
+        String[] ret = new String[colCount];
+        for (int i=1; i<=colCount; i++){
+            try {
+                Object o = rs.getObject(i);
+                ret[i-1] = (o == null? null:o.toString());
+            }catch(Exception e){
+                logger.error("", e);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+        SqlUtil.closeResources(jdbcCon, statement);
+    }
+
+    public String toString() {
+        return "jdbc table reader for: " + dbName + "." + tableName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c7f32b83/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
new file mode 100644
index 0000000..ba280df
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/SqlUtil.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.util.Random;
+
+import javax.sql.DataSource;
+
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+public class SqlUtil {
+    private static final Logger logger = 
LoggerFactory.getLogger(SqlUtil.class);
+
+    public static void closeResources(Connection con, Statement statement){
+        try{
+            if (statement!=null && !statement.isClosed()){
+                statement.close();
+            }
+        }catch(Exception e){
+            logger.error("", e);
+        }
+        
+        try{
+            if (con!=null && !con.isClosed()){
+                con.close();
+            }
+        }catch(Exception e){
+            logger.error("", e);
+        }
+    }
+    
+    
+    public static void execUpdateSQL(String sql, DataSource ds){
+        Connection con = null;
+        try{
+            con = ds.getConnection();
+            execUpdateSQL(con, sql);
+        }catch(Exception e){
+            logger.error("", e);
+        }finally{
+            closeResources(con, null);
+        }
+    }
+    
+    public static void execUpdateSQL(Connection db, String sql){
+        Statement statement=null;
+        try{
+            statement = db.createStatement();
+            statement.executeUpdate(sql);            
+        }catch(Exception e){
+            logger.error("", e);
+        }finally{
+            closeResources(null, statement);
+        }
+    }
+    
+    public static int tryTimes=10;
+    public static Connection getConnection(DBConnConf dbconf){
+        if (dbconf.getUrl()==null)
+            return null;
+        Connection con = null;
+        try {
+            Class.forName(dbconf.getDriver());
+        }catch(Exception e){
+            logger.error("", e);
+        }
+        boolean got=false;
+        int times=0;
+        Random r = new Random();
+        while(!got && times<tryTimes){
+            times++;
+            try {
+                con = DriverManager.getConnection(dbconf.getUrl(), 
dbconf.getUser(), dbconf.getPass());
+                got = true;
+            }catch(Exception e){
+                logger.warn("while use:" + dbconf, e);
+                try {
+                    int rt = r.nextInt(10);
+                    Thread.sleep(rt*1000);
+                } catch (InterruptedException e1) {
+                }
+            }
+        }
+        return con;
+    }
+}

Reply via email to