Repository: kylin
Updated Branches:
  refs/heads/master 3ba180a4e -> 9284b47c3


KYLIN-1077 Support Hive View as Lookup Table

Signed-off-by: wangxianbin1987 <wangxianbin1...@gmail.com>
Signed-off-by: shaofengshi <shaofeng...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c80ccd01
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c80ccd01
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c80ccd01

Branch: refs/heads/master
Commit: c80ccd010a9717b88c8d0e604e13e1e72e7783ba
Parents: 3ba180a
Author: wangxianbin1987 <wangxianbin1...@gmail.com>
Authored: Sun Apr 17 10:05:44 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Fri Apr 29 12:04:55 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java |  12 +-
 .../apache/kylin/dict/DictionaryManager.java    |  13 +-
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/metadata/model/TableDesc.java  |  33 +++++
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |   2 +-
 .../apache/kylin/source/hive/HiveMRInput.java   | 119 ++++++++++++++++---
 .../source/hive/HiveSourceTableLoader.java      |   6 +
 7 files changed, 164 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 87a866a..cbb36d9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -47,9 +47,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -164,7 +162,7 @@ public class CubeManager implements IRealizationProvider {
             return null;
 
         DictionaryManager dictMgr = getDictionaryManager();
-        DictionaryInfo dictInfo = 
dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider);
+        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), 
true, col, factTableValueProvider);
 
         if (dictInfo != null) {
             Dictionary dict = dictInfo.getDictionaryObject();
@@ -206,7 +204,13 @@ public class CubeManager implements IRealizationProvider {
         MetadataManager metaMgr = getMetadataManager();
         SnapshotManager snapshotMgr = getSnapshotManager();
 
-        TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+        TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable));
+        if (tableDesc.isSourceTableHiveView()) {
+            
tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+            String tableName = tableDesc.getHiveViewIntermediateTableName();
+            tableDesc.setName(tableName);
+        }
+
         ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, 
tableDesc);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 015c79f..12e347a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.source.ReadableTable;
 import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.apache.kylin.source.SourceFactory;
@@ -273,8 +274,16 @@ public class DictionaryManager {
         if (model.isFactTable(srcTable)) {
             inpTable = factTableValueProvider.getDistinctValuesFor(srcCol);
         } else {
-            TableDesc tableDesc = 
MetadataManager.getInstance(config).getTableDesc(srcTable);
-            inpTable = SourceFactory.createReadableTable(tableDesc);
+            MetadataManager metadataManager = 
MetadataManager.getInstance(config);
+            TableDesc tableDesc = new 
TableDesc(metadataManager.getTableDesc(srcTable));
+            if (tableDesc.isSourceTableHiveView()) {
+                
tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable());
+                String tableName = 
tableDesc.getHiveViewIntermediateTableName();
+                tableDesc.setName(tableName);
+                inpTable = SourceFactory.createReadableTable(tableDesc);
+            } else {
+                inpTable = SourceFactory.createReadableTable(tableDesc);
+            }
         }
 
         TableSignature inputSig = inpTable.getSignature();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f619a68..d47d550 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,7 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension 
Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create 
Intermediate Flat Hive Table";
+    public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = 
"Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact 
Table Distinct Columns";
     public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base 
Cuboid Data";
     public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 9d016d3..ec0baf4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -42,11 +42,24 @@ public class TableDesc extends RootPersistentEntity 
implements ISourceAware {
     private ColumnDesc[] columns;
     @JsonProperty("source_type")
     private int sourceType = ISourceAware.ID_HIVE;
+    @JsonProperty("source_table_type")
+    private boolean sourceTableHiveViewFlag =  false;
+    @JsonProperty("hive_view__table_name_prefix")
+    private String hiveViewIntermediateTableNamePrefix = "kylin_intermediate_";
 
     private DatabaseDesc database = new DatabaseDesc();
 
     private String identity = null;
 
+    public TableDesc() {
+    }
+
+    public TableDesc(TableDesc other) {
+        this.name = other.getName();
+        this.columns = other.getColumns();
+        this.database.setName(other.getDatabase());
+    }
+
     public ColumnDesc findColumnByName(String name) {
         //ignore the db name and table name if exists
         int lastIndexOfDot = name.lastIndexOf(".");
@@ -181,6 +194,18 @@ public class TableDesc extends RootPersistentEntity 
implements ISourceAware {
         return getIdentity().equals(other.getIdentity());
     }
 
+    public void setSourceTableHiveViewFlag(boolean sourceTableHiveViewFlag) {
+        this.sourceTableHiveViewFlag = sourceTableHiveViewFlag;
+    }
+
+    public boolean isSourceTableHiveView(){
+        return sourceTableHiveViewFlag;
+    }
+
+    public String getHiveViewIntermediateTableName() {
+        return hiveViewIntermediateTableNamePrefix + "_" + database.getName() 
+ "_" + name;
+    }
+
     @Override
     public String toString() {
         return "TableDesc [database=" + getDatabase() + " name=" + name + "]";
@@ -201,4 +226,12 @@ public class TableDesc extends RootPersistentEntity 
implements ISourceAware {
     public void setSourceType(int sourceType) {
         this.sourceType = sourceType;
     }
+
+    public String getHiveViewIntermediateTableNamePrefix() {
+        return hiveViewIntermediateTableNamePrefix;
+    }
+
+    public void setHiveViewIntermediateTableNamePrefix(String 
hiveViewIntermediateTableNamePrefix) {
+        this.hiveViewIntermediateTableNamePrefix = 
hiveViewIntermediateTableNamePrefix;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index a1c9cd9..853eca0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -53,7 +53,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport 
{
         final String jobId = result.getId();
         final String cuboidRootPath = getCuboidRootPath(jobId);
 
-        // Phase 1: Create Flat Table
+        // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
         // Phase 2: Build Dictionary

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 7088168..d90ed60 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
@@ -93,6 +95,7 @@ public class HiveMRInput implements IMRInput {
         final JobEngineConfig conf;
         final IRealizationSegment seg;
         final IJoinedFlatTableDesc flatHiveTableDesc;
+        String hiveViewIntermediateTables = "";
 
         public BatchCubingInputSide(IRealizationSegment seg) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -103,6 +106,10 @@ public class HiveMRInput implements IMRInput {
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
             jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, 
jobFlow.getId()));
+            AbstractExecutable task = 
createLookupHiveViewMaterializationStep(jobFlow.getId());
+            if(task != null) {
+                jobFlow.addTask(task);
+            }
         }
 
         public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId) {
@@ -131,12 +138,53 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
+
+        public ShellExecutable createLookupHiveViewMaterializationStep(String 
jobId) {
+            boolean findHiveViewLookUpTable = false;
+            ShellExecutable step = new ShellExecutable();;
+            
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
+            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+            String cubeName = seg.getRealization().getName();
+            CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+
+            final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
+            hiveCmdBuilder.addStatement(useDatabaseHql);
+            for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) {
+                if (lookUpTableDesc.isSourceTableHiveView()) {
+                    findHiveViewLookUpTable = true;
+                    
lookUpTableDesc.setHiveViewIntermediateTableNamePrefix("kylin_intermediate_" + 
jobId);
+                    StringBuilder createIntermediateTableHql = new 
StringBuilder();
+                    createIntermediateTableHql.append("CREATE TABLE IF NOT 
EXISTS " +
+                            lookUpTableDesc.getHiveViewIntermediateTableName() 
+ "\n");
+                    createIntermediateTableHql.append("LOCATION '" + 
JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" +
+                            lookUpTableDesc.getHiveViewIntermediateTableName() 
+ "'\n");
+                    createIntermediateTableHql.append("AS SELECT * FROM " + 
lookUpTableDesc.getIdentity() + ";\n");
+                    
hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
+                    hiveViewIntermediateTables = hiveViewIntermediateTables + 
lookUpTableDesc.getHiveViewIntermediateTableName() + ";";
+                }
+                if (findHiveViewLookUpTable) {
+                    hiveViewIntermediateTables= 
hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length()-1);
+                }
+            }
+
+            if(findHiveViewLookUpTable) {
+                step.setCmd(hiveCmdBuilder.build());
+                return step;
+            } else {
+                return null;
+            }
+        }
+
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
             
step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            
step.setHiveViewIntermediateTableIdentitys(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
 
@@ -155,29 +203,62 @@ public class HiveMRInput implements IMRInput {
         protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
             KylinConfig config = context.getConfig();
             StringBuffer output = new StringBuffer();
+            try {
+                output.append(cleanUpIntermediateFlatTable(config));
+                output.append(cleanUpHiveViewIntermediateTable(config));
+            } catch (IOException e) {
+                logger.error("job:" + getId() + " execute finished with 
exception", e);
+                return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getMessage());
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());
+        }
 
+        private String cleanUpIntermediateFlatTable(KylinConfig config) throws 
IOException {
+            StringBuffer output = new StringBuffer();
             final String hiveTable = this.getIntermediateTableIdentity();
             if (config.isHiveKeepFlatTable() == false && 
StringUtils.isNotEmpty(hiveTable)) {
                 final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
                 hiveCmdBuilder.addStatement("USE " + 
config.getHiveDatabaseForIntermediateTable() + ";");
                 hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + 
hiveTable + ";");
-                try {
-                    
config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-                    output.append("Hive table " + hiveTable + " is dropped. 
\n");
-
-                    Path externalDataPath = new Path(getExternalDataPath());
-                    FileSystem fs = FileSystem.get(externalDataPath.toUri(), 
HadoopUtil.getCurrentConfiguration());
-                    if (fs.exists(externalDataPath)) {
-                        fs.delete(externalDataPath, true);
-                        output.append("Hive table " + hiveTable + " external 
data path " + externalDataPath + " is deleted. \n");
-                    }
-                } catch (IOException e) {
-                    logger.error("job:" + getId() + " execute finished with 
exception", e);
-                    return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getMessage());
-                }
+
+                config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+                output.append("Hive table " + hiveTable + " is dropped. \n");
+
+                rmdirOnHDFS(getExternalDataPath());
+                output.append("Hive table " + hiveTable + " external data path 
" + getExternalDataPath() + " is deleted. \n");
+            }
+            return output.toString();
+        }
+
+        private void mkdirOnHDFS(String path) throws IOException {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = FileSystem.get(externalDataPath.toUri(), 
HadoopUtil.getCurrentConfiguration());
+            if (!fs.exists(externalDataPath)) {
+                fs.mkdirs(externalDataPath);
             }
+        }
 
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, 
output.toString());
+        private void rmdirOnHDFS(String path) throws IOException {
+            Path externalDataPath = new Path(path);
+            FileSystem fs = FileSystem.get(externalDataPath.toUri(), 
HadoopUtil.getCurrentConfiguration());
+            if (fs.exists(externalDataPath)) {
+                fs.delete(externalDataPath, true);
+            }
+        }
+
+        private String cleanUpHiveViewIntermediateTable(KylinConfig config) 
throws IOException {
+            StringBuffer output = new StringBuffer();
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement("USE " + 
config.getHiveDatabaseForIntermediateTable() + ";");
+            if (!getHiveViewIntermediateTableIdentitys().isEmpty()) {
+                for(String hiveTableName : 
getHiveViewIntermediateTableIdentitys().split(";")) {
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + 
hiveTableName + ";");
+                }
+            }
+            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+            output.append("hive view intermediate tables: " + 
getHiveViewIntermediateTableIdentitys() + " is dropped. \n");
+            return output.toString();
         }
 
         public void setIntermediateTableIdentity(String tableIdentity) {
@@ -195,6 +276,14 @@ public class HiveMRInput implements IMRInput {
         private String getExternalDataPath() {
             return getParam("externalDataPath");
         }
+
+        public void setHiveViewIntermediateTableIdentitys(String 
tableIdentitys) {
+            setParam("oldHiveViewIntermediateTables", tableIdentitys);
+        }
+
+        private String getHiveViewIntermediateTableIdentitys() {
+            return getParam("oldHiveViewIntermediateTables");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 2aef4e6..6860f91 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.kylin.common.KylinConfig;
@@ -116,6 +117,11 @@ public class HiveSourceTableLoader {
                 tableDesc.setUuid(UUID.randomUUID().toString());
                 tableDesc.setLastModified(0);
             }
+            if(table.getTableType().equals(TableType.VIRTUAL_VIEW.toString())) 
{
+                tableDesc.setSourceTableHiveViewFlag(true);
+            } else {
+                tableDesc.setSourceTableHiveViewFlag(false);
+            }
 
             int columnNumber = fields.size();
             List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);

Reply via email to