Repository: kylin Updated Branches: refs/heads/master 3ba180a4e -> 9284b47c3
KYLIN-1077 Support Hive View as Lookup Table Signed-off-by: wangxianbin1987 <wangxianbin1...@gmail.com> Signed-off-by: shaofengshi <shaofeng...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c80ccd01 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c80ccd01 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c80ccd01 Branch: refs/heads/master Commit: c80ccd010a9717b88c8d0e604e13e1e72e7783ba Parents: 3ba180a Author: wangxianbin1987 <wangxianbin1...@gmail.com> Authored: Sun Apr 17 10:05:44 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Apr 29 12:04:55 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 12 +- .../apache/kylin/dict/DictionaryManager.java | 13 +- .../kylin/job/constant/ExecutableConstants.java | 1 + .../apache/kylin/metadata/model/TableDesc.java | 33 +++++ .../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +- .../apache/kylin/source/hive/HiveMRInput.java | 119 ++++++++++++++++--- .../source/hive/HiveSourceTableLoader.java | 6 + 7 files changed, 164 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 87a866a..cbb36d9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -47,9 +47,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; -import org.apache.kylin.metadata.project.RealizationEntry; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.metadata.realization.IRealizationProvider; @@ -164,7 +162,7 @@ public class CubeManager implements IRealizationProvider { return null; DictionaryManager dictMgr = getDictionaryManager(); - DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider); + DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, factTableValueProvider); if (dictInfo != null) { Dictionary dict = dictInfo.getDictionaryObject(); @@ -206,7 +204,13 @@ public class CubeManager implements IRealizationProvider { MetadataManager metaMgr = getMetadataManager(); SnapshotManager snapshotMgr = getSnapshotManager(); - TableDesc tableDesc = metaMgr.getTableDesc(lookupTable); + TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable)); + if (tableDesc.isSourceTableHiveView()) { + tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); + String tableName = tableDesc.getHiveViewIntermediateTableName(); + tableDesc.setName(tableName); + } + ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 015c79f..12e347a 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -28,6 +28,7 @@ import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.IRealizationSegment; import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.ReadableTable.TableSignature; import org.apache.kylin.source.SourceFactory; @@ -273,8 +274,16 @@ public class DictionaryManager { if (model.isFactTable(srcTable)) { inpTable = factTableValueProvider.getDistinctValuesFor(srcCol); } else { - TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable); - inpTable = SourceFactory.createReadableTable(tableDesc); + MetadataManager metadataManager = MetadataManager.getInstance(config); + TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable)); + if (tableDesc.isSourceTableHiveView()) { + tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); + String tableName = tableDesc.getHiveViewIntermediateTableName(); + tableDesc.setName(tableName); + inpTable = SourceFactory.createReadableTable(tableDesc); + } else { + inpTable = SourceFactory.createReadableTable(tableDesc); + } } TableSignature inputSig = inpTable.getSignature(); http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index f619a68..d47d550 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -36,6 +36,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube"; http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 9d016d3..ec0baf4 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -42,11 +42,24 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { private ColumnDesc[] columns; @JsonProperty("source_type") private int sourceType = ISourceAware.ID_HIVE; + @JsonProperty("source_table_type") + private boolean sourceTableHiveViewFlag = false; + @JsonProperty("hive_view__table_name_prefix") + private String hiveViewIntermediateTableNamePrefix = "kylin_intermediate_"; private DatabaseDesc database = new DatabaseDesc(); private String identity = null; + public TableDesc() { + } + + public TableDesc(TableDesc other) { + this.name = other.getName(); + this.columns = other.getColumns(); + this.database.setName(other.getDatabase()); + } + public ColumnDesc findColumnByName(String name) { //ignore the db name and table name if exists int lastIndexOfDot = name.lastIndexOf("."); @@ -181,6 +194,18 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { return getIdentity().equals(other.getIdentity()); } + public void setSourceTableHiveViewFlag(boolean sourceTableHiveViewFlag) { + this.sourceTableHiveViewFlag = sourceTableHiveViewFlag; + } + + public boolean isSourceTableHiveView(){ + return sourceTableHiveViewFlag; + } + + public String getHiveViewIntermediateTableName() { + return hiveViewIntermediateTableNamePrefix + "_" + database.getName() + "_" + name; + } + @Override public String toString() { return "TableDesc [database=" + getDatabase() + " name=" + name + "]"; @@ -201,4 +226,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { public void setSourceType(int sourceType) { this.sourceType = sourceType; } + + public String getHiveViewIntermediateTableNamePrefix() { + return hiveViewIntermediateTableNamePrefix; + } + + public void setHiveViewIntermediateTableNamePrefix(String hiveViewIntermediateTableNamePrefix) { + this.hiveViewIntermediateTableNamePrefix = hiveViewIntermediateTableNamePrefix; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index a1c9cd9..853eca0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -53,7 +53,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { final String jobId = result.getId(); final String cuboidRootPath = getCuboidRootPath(jobId); - // Phase 1: Create Flat Table + // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables inputSide.addStepPhase1_CreateFlatTable(result); // Phase 2: Build Dictionary http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 7088168..d90ed60 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -27,6 +27,8 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; @@ -93,6 +95,7 @@ public class HiveMRInput implements IMRInput { final JobEngineConfig conf; final IRealizationSegment seg; final IJoinedFlatTableDesc flatHiveTableDesc; + String hiveViewIntermediateTables = ""; public BatchCubingInputSide(IRealizationSegment seg) { this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); @@ -103,6 +106,10 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId())); + AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); + if(task != null) { + jobFlow.addTask(task); + } } public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) { @@ -131,12 +138,53 @@ public class HiveMRInput implements IMRInput { return step; } + + public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { + boolean findHiveViewLookUpTable = false; + ShellExecutable step = new ShellExecutable();; + step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); + String cubeName = seg.getRealization().getName(); + CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor(); + + final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; + hiveCmdBuilder.addStatement(useDatabaseHql); + for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) { + if (lookUpTableDesc.isSourceTableHiveView()) { + findHiveViewLookUpTable = true; + lookUpTableDesc.setHiveViewIntermediateTableNamePrefix("kylin_intermediate_" + jobId); + StringBuilder createIntermediateTableHql = new StringBuilder(); + createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + + lookUpTableDesc.getHiveViewIntermediateTableName() + "\n"); + createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" + + lookUpTableDesc.getHiveViewIntermediateTableName() + "'\n"); + createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n"); + hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); + hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getHiveViewIntermediateTableName() + ";"; + } + if (findHiveViewLookUpTable) { + hiveViewIntermediateTables= hiveViewIntermediateTables.substring(0, hiveViewIntermediateTables.length()-1); + } + } + + if(findHiveViewLookUpTable) { + step.setCmd(hiveCmdBuilder.build()); + return step; + } else { + return null; + } + } + @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); step.setIntermediateTableIdentity(getIntermediateTableIdentity()); step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); + step.setHiveViewIntermediateTableIdentitys(hiveViewIntermediateTables); jobFlow.addTask(step); } @@ -155,29 +203,62 @@ public class HiveMRInput implements IMRInput { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = context.getConfig(); StringBuffer output = new StringBuffer(); + try { + output.append(cleanUpIntermediateFlatTable(config)); + output.append(cleanUpHiveViewIntermediateTable(config)); + } catch (IOException e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + } + private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { + StringBuffer output = new StringBuffer(); final String hiveTable = this.getIntermediateTableIdentity(); if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); - try { - config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); - output.append("Hive table " + hiveTable + " is dropped. \n"); - - Path externalDataPath = new Path(getExternalDataPath()); - FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); - output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n"); - } - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); - } + + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + output.append("Hive table " + hiveTable + " is dropped. \n"); + + rmdirOnHDFS(getExternalDataPath()); + output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); + } + return output.toString(); + } + + private void mkdirOnHDFS(String path) throws IOException { + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (!fs.exists(externalDataPath)) { + fs.mkdirs(externalDataPath); } + } - return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + private void rmdirOnHDFS(String path) throws IOException { + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + } + } + + private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException { + StringBuffer output = new StringBuffer(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); + if (!getHiveViewIntermediateTableIdentitys().isEmpty()) { + for(String hiveTableName : getHiveViewIntermediateTableIdentitys().split(";")) { + hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTableName + ";"); + } + } + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentitys() + " is dropped. \n"); + return output.toString(); } public void setIntermediateTableIdentity(String tableIdentity) { @@ -195,6 +276,14 @@ public class HiveMRInput implements IMRInput { private String getExternalDataPath() { return getParam("externalDataPath"); } + + public void setHiveViewIntermediateTableIdentitys(String tableIdentitys) { + setParam("oldHiveViewIntermediateTables", tableIdentitys); + } + + private String getHiveViewIntermediateTableIdentitys() { + return getParam("oldHiveViewIntermediateTables"); + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/c80ccd01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 2aef4e6..6860f91 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.kylin.common.KylinConfig; @@ -116,6 +117,11 @@ public class HiveSourceTableLoader { tableDesc.setUuid(UUID.randomUUID().toString()); tableDesc.setLastModified(0); } + if(table.getTableType().equals(TableType.VIRTUAL_VIEW.toString())) { + tableDesc.setSourceTableHiveViewFlag(true); + } else { + tableDesc.setSourceTableHiveViewFlag(false); + } int columnNumber = fields.size(); List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);