KYLIN-1855 revert to original, all joins appear in flat table
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ad143782 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ad143782 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ad143782 Branch: refs/heads/yang21-cdh5.7 Commit: ad143782b729f3a4d115c1d16ba5950a52f10c01 Parents: 5207686 Author: Li Yang <liy...@apache.org> Authored: Fri Nov 4 16:00:23 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Fri Nov 4 16:00:45 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/JoinedFlatTable.java | 34 +++++++------------- .../apache/kylin/source/hive/HiveMRInput.java | 27 +--------------- 2 files changed, 13 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ad143782/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 55d9665..aa3f217 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -39,7 +39,6 @@ import org.w3c.dom.Document; import org.w3c.dom.NodeList; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** * @@ -47,8 +46,8 @@ import com.google.common.collect.Sets; public class JoinedFlatTable { - public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) { - return storageDfsDir + "/" + intermediateTableDesc.getTableName(); + public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) { + return storageDfsDir + "/" + flatDesc.getTableName(); } public static String generateHiveSetStatements(JobEngineConfig engineConfig) { @@ -101,16 +100,16 @@ public class JoinedFlatTable { return ddl.toString(); } - public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) { + public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); - ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n"); + ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n"); return ddl.toString(); } - public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig, boolean redistribute) { + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig, boolean redistribute) { StringBuilder sql = new StringBuilder(); sql.append(generateHiveSetStatements(engineConfig)); - sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc, redistribute) + ";").append("\n"); + sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc, redistribute) + ";").append("\n"); return sql.toString(); } @@ -222,19 +221,10 @@ public class JoinedFlatTable { } private static List<JoinDesc> getUsedJoinsSet(IJoinedFlatTableDesc flatDesc) { - Set<String> usedTableIdentities = Sets.newHashSet(); - for (TblColRef col : flatDesc.getAllColumns()) { - usedTableIdentities.add(col.getTable()); - } - List<JoinDesc> result = Lists.newArrayList(); for (LookupDesc lookup : flatDesc.getDataModel().getLookups()) { - String table = lookup.getTableDesc().getIdentity(); - if (usedTableIdentities.contains(table)) { - result.add(lookup.getJoin()); - } + result.add(lookup.getJoin()); } - return result; } @@ -287,20 +277,20 @@ public class JoinedFlatTable { return hiveDataType.toLowerCase(); } - public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { + public static String generateSelectRowCountStatement(IJoinedFlatTableDesc flatDesc, String outputDir) { StringBuilder sql = new StringBuilder(); sql.append("set hive.exec.compress.output=false;\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"); + sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + flatDesc.getTableName() + ";\n"); return sql.toString(); } - public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc intermediateTableDesc) { - final String tableName = intermediateTableDesc.getTableName(); + public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { + final String tableName = flatDesc.getTableName(); StringBuilder sql = new StringBuilder(); sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName); String redistributeCol = null; - TblColRef distDcol = intermediateTableDesc.getDistributedBy(); + TblColRef distDcol = flatDesc.getDistributedBy(); if (distDcol != null) { redistributeCol = colName(distDcol.getCanonicalName()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/ad143782/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 363a8d9..95828a0 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -273,7 +273,7 @@ public class HiveMRInput implements IMRInput { FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); InputStream in = fs.open(file); try { - String content = IOUtils.toString(in); + String content = IOUtils.toString(in, "UTF-8"); return Long.valueOf(content.trim()); // strip the '\n' character } finally { @@ -407,14 +407,6 @@ public class HiveMRInput implements IMRInput { return output.toString(); } - private void mkdirOnHDFS(String path) throws IOException { - Path externalDataPath = new Path(path); - FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); - if (!fs.exists(externalDataPath)) { - fs.mkdirs(externalDataPath); - } - } - private void rmdirOnHDFS(String path) throws IOException { Path externalDataPath = new Path(path); FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); @@ -423,20 +415,6 @@ public class HiveMRInput implements IMRInput { } } - private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException { - StringBuffer output = new StringBuffer(); - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); - if (getHiveViewIntermediateTableIdentities() != null && !getHiveViewIntermediateTableIdentities().isEmpty()) { - for (String hiveTableName : getHiveViewIntermediateTableIdentities().split(";")) { - hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTableName + ";"); - } - } - config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); - output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentities() + " is dropped. \n"); - return output.toString(); - } - public void setIntermediateTableIdentity(String tableIdentity) { setParam("oldHiveTable", tableIdentity); } @@ -457,9 +435,6 @@ public class HiveMRInput implements IMRInput { setParam("oldHiveViewIntermediateTables", tableIdentities); } - private String getHiveViewIntermediateTableIdentities() { - return getParam("oldHiveViewIntermediateTables"); - } } }