KYLIN-2095 Allow cube to override Hive job configuration by properties Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07e81fd0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07e81fd0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07e81fd0
Branch: refs/heads/master-hbase1.x Commit: 07e81fd0b744e782d84383e327b1923cfc178d42 Parents: cc2b59f Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Oct 17 22:07:57 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Oct 17 22:07:57 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 ++ .../apache/kylin/source/hive/HiveMRInput.java | 42 ++++++++++++-------- 2 files changed, 30 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 73ac788..5a06813 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -250,6 +250,10 @@ abstract public class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.job.mr.config.override."); } + public Map<String, String> getHiveConfigOverride() { + return getPropertiesByPrefix("kylin.hive.config.override."); + } + public String getKylinSparkJobJarPath() { final String jobJar = getOptional("kylin.job.jar.spark"); if (StringUtils.isNotEmpty(jobJar)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/07e81fd0/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 4ec8d3d..f3fceb1 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -20,6 +20,8 @@ package org.apache.kylin.source.hive; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; @@ -61,7 +63,6 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { - private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename"; @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { @@ -157,9 +158,7 @@ public class HiveMRInput implements IMRInput { hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { - hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n"); - } + appendHiveOverrideProperties(kylinConfig, hiveInitBuf); String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); @@ -178,10 +177,7 @@ public class HiveMRInput implements IMRInput { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { - hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" + - kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n"); - } + appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder); hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir)); @@ -192,6 +188,7 @@ public class HiveMRInput implements IMRInput { return step; } + public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -211,10 +208,7 @@ public class HiveMRInput implements IMRInput { if (lookupViewsTables.size() == 0) { return null; } - if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { - hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" + - kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n"); - } + appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder); final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; hiveCmdBuilder.addStatement(useDatabaseHql); hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); @@ -240,9 +234,7 @@ public class HiveMRInput implements IMRInput { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); - if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { - hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n"); - } + appendHiveOverrideProperties(kylinConfig, hiveInitBuf); final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); @@ -301,7 +293,7 @@ public class HiveMRInput implements IMRInput { FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); InputStream in = fs.open(file); try { - String content = IOUtils.toString(in); + String content = IOUtils.toString(in, Charset.defaultCharset()); return Long.valueOf(content.trim()); // strip the '\n' character } finally { @@ -490,4 +482,22 @@ public class HiveMRInput implements IMRInput { } } + + private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) { + final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride(); + if (hiveConfOverride.isEmpty() == false) { + for (String key : hiveConfOverride.keySet()) { + hiveCmd.append("SET ").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n"); + } + } + } + + private static void appendHiveOverrideProperties2(final KylinConfig kylinConfig, HiveCmdBuilder hiveCmdBuilder) { + final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride(); + if (hiveConfOverride.isEmpty() == false) { + for (String key : hiveConfOverride.keySet()) { + hiveCmdBuilder.addStatement("SET " + key + "=" + hiveConfOverride.get(key) + ";\n"); + } + } + } }