KYLIN-2165 fix IT
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2e87fb41 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2e87fb41 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2e87fb41 Branch: refs/heads/master-hbase0.98 Commit: 2e87fb41fe36aff015950bad3a88e7ac525debd6 Parents: 93cc5ab Author: gaodayue <gaoda...@meituan.com> Authored: Fri Apr 7 17:01:24 2017 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Fri Apr 7 19:10:38 2017 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/job/JoinedFlatTable.java | 32 +++---- .../kylin/job/engine/JobEngineConfig.java | 2 +- .../apache/kylin/job/JoinedFlatTableTest.java | 4 +- .../apache/kylin/source/hive/HiveMRInput.java | 95 ++++++++------------ 4 files changed, 58 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 9ed563f..5553d34 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -20,12 +20,12 @@ package org.apache.kylin.job; import java.io.File; import java.util.HashSet; +import java.util.Map; import java.util.Set; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; -import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.JoinDesc; @@ -46,30 +46,35 @@ public class JoinedFlatTable { return storageDfsDir + "/" + flatDesc.getTableName(); } - public static String generateHiveSetStatements(JobEngineConfig engineConfig) { + public static String generateHiveInitStatements( + String flatTableDatabase, String kylinHiveFile, Map<String, String> cubeOverrides) { + StringBuilder buffer = new StringBuilder(); + buffer.append("USE ").append(flatTableDatabase).append(";\n"); try { - File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath()); - - if (hadoopPropertiesFile.exists()) { + File file = new File(kylinHiveFile); + if (file.exists()) { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); - DocumentBuilder builder; - Document doc; - builder = factory.newDocumentBuilder(); - doc = builder.parse(hadoopPropertiesFile); + DocumentBuilder builder = factory.newDocumentBuilder(); + Document doc = builder.parse(file); NodeList nl = doc.getElementsByTagName("property"); for (int i = 0; i < nl.getLength(); i++) { String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue(); String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue(); if (!name.equals("tmpjars")) { - buffer.append("SET " + name + "=" + value + ";\n"); + buffer.append("SET ").append(name).append("=").append(value).append(";\n"); } } } } catch (Exception e) { throw new RuntimeException("Failed to parse hive conf file ", e); } + + for (Map.Entry<String, String> entry : cubeOverrides.entrySet()) { + buffer.append("SET ").append(entry.getKey()).append("=").append(entry.getValue()).append(";\n"); + } + return buffer.toString(); } @@ -98,11 +103,8 @@ public class JoinedFlatTable { return ddl.toString(); } - public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig) { - StringBuilder sql = new StringBuilder(); - sql.append(generateHiveSetStatements(engineConfig)); - sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";").append("\n"); - return sql.toString(); + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) { + return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";\n"; } public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java index 0f5b7dd..8859527 100644 --- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java +++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java @@ -95,7 +95,7 @@ public class JobEngineConfig { return path; } - public String getHiveConfFilePath() throws IOException { + public String getHiveConfFilePath() { String hiveConfFile = (HIVE_CONF_FILENAME + ".xml"); File jobConfig = getJobConfig(hiveConfFile); http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java index 0faf22a..65169c9 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java @@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.junit.After; import org.junit.Before; @@ -77,7 +75,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenerateInsertSql() throws IOException { - String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc); System.out.println(sqls); int length = sqls.length(); http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 2f966ab..418fcfc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,7 +19,6 @@ package org.apache.kylin.source.hive; import java.io.IOException; -import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; @@ -127,51 +126,58 @@ public class HiveMRInput implements IMRInput { public static class BatchCubingInputSide implements IMRBatchCubingInputSide { - JobEngineConfig conf; final IJoinedFlatTableDesc flatDesc; + final String flatTableDatabase; + final String hdfsWorkingDir; + String hiveViewIntermediateTables = ""; public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + KylinConfig config = KylinConfig.getInstanceFromEnv(); this.flatDesc = flatDesc; + this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable(); + this.hdfsWorkingDir = config.getHdfsWorkingDirectory(); } @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final KylinConfig kylinConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); - this.conf = new JobEngineConfig(kylinConfig); + final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig(); + JobEngineConfig conf = new JobEngineConfig(cubeConfig); + + final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements( + flatTableDatabase, conf.getHiveConfFilePath(), cubeConfig.getHiveConfigOverride() + ) ; + final String jobWorkingDir = getJobWorkingDir(jobFlow); // create flat table first, then count and redistribute - jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); - if (kylinConfig.isHiveRedistributeEnabled() == true) { - jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); + jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName)); + if (cubeConfig.isHiveRedistributeEnabled() == true) { + jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName)); } - AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); + AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir); if (task != null) { jobFlow.addTask(task); } } - public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { - StringBuilder hiveInitBuf = new StringBuilder(); - hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); - hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); - final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - appendHiveOverrideProperties(kylinConfig, hiveInitBuf); + private String getJobWorkingDir(DefaultChainedExecutable jobFlow) { + return JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId()); + } + private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); - step.setInitStatement(hiveInitBuf.toString()); - step.setIntermediateTable(flatTableDesc.getTableName()); - step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc)); + step.setInitStatement(hiveInitStatements); + step.setIntermediateTable(flatDesc.getTableName()); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc)); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); return step; } - public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { + private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig); @@ -187,16 +193,15 @@ public class HiveMRInput implements IMRInput { if (lookupViewsTables.size() == 0) { return null; } - appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder); - final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; - hiveCmdBuilder.addStatement(useDatabaseHql); - hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); + + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(hiveInitStatements); for (TableDesc lookUpTableDesc : lookupViewsTables) { if (lookUpTableDesc.isView()) { StringBuilder createIntermediateTableHql = new StringBuilder(); createIntermediateTableHql.append("DROP TABLE IF EXISTS " + lookUpTableDesc.getMaterializedName() + ";\n"); createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + lookUpTableDesc.getMaterializedName() + "\n"); - createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" + lookUpTableDesc.getMaterializedName() + "'\n"); + createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + lookUpTableDesc.getMaterializedName() + "'\n"); createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n"); hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getMaterializedName() + ";"; @@ -209,19 +214,14 @@ public class HiveMRInput implements IMRInput { return step; } - public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { - StringBuilder hiveInitBuf = new StringBuilder(); - hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); - final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - appendHiveOverrideProperties(kylinConfig, hiveInitBuf); - final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; - final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); - final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf); + private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) { + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc); CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); - step.setInitStatement(hiveInitBuf.toString()); - step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls); + step.setInitStatement(hiveInitStatements); + step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE); return step; @@ -229,10 +229,12 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + final String jobWorkingDir = getJobWorkingDir(jobFlow); + GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP); step.setIntermediateTableIdentity(getIntermediateTableIdentity()); - step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); + step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir)); step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables); jobFlow.addTask(step); } @@ -243,7 +245,7 @@ public class HiveMRInput implements IMRInput { } private String getIntermediateTableIdentity() { - return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName(); + return flatTableDatabase + "." + flatDesc.getTableName(); } } @@ -413,24 +415,5 @@ public class HiveMRInput implements IMRInput { public void setHiveViewIntermediateTableIdentities(String tableIdentities) { setParam("oldHiveViewIntermediateTables", tableIdentities); } - - } - - private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) { - final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride(); - if (hiveConfOverride.isEmpty() == false) { - for (String key : hiveConfOverride.keySet()) { - hiveCmd.append("SET ").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n"); - } - } - } - - private static void appendHiveOverrideProperties2(final KylinConfig kylinConfig, HiveCmdBuilder hiveCmdBuilder) { - final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride(); - if (hiveConfOverride.isEmpty() == false) { - for (String key : hiveConfOverride.keySet()) { - hiveCmdBuilder.addStatement("SET " + key + "=" + hiveConfOverride.get(key) + ";\n"); - } - } } }