hive job use overrided MR job configuration by cube properties Signed-off-by: shaofengshi <shaofeng...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ac356f01 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ac356f01 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ac356f01 Branch: refs/heads/KYLIN-1971 Commit: ac356f014d52c4b13ad72e9d6a537e50e9ace5fb Parents: c92f79a Author: lijieliang <lijieli...@cmss.chinamobile.com> Authored: Fri Oct 14 13:01:32 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Oct 17 18:25:26 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/hive/HiveMRInput.java | 24 +++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ac356f01/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 202e480..2ec1fbb 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -35,6 +35,7 @@ import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; @@ -60,6 +61,8 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { + private static final String MR_OVERRIDE_JOB_QUEUENAME = "mapreduce.job.queuename"; + @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { return new BatchCubingInputSide(flatDesc); @@ -154,7 +157,10 @@ public class HiveMRInput implements IMRInput { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); - + final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); + if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { + hiveInitBuf.append("SET mapreduce.job.queuename=").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n"); + } String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); @@ -172,6 +178,11 @@ public class HiveMRInput implements IMRInput { final ShellExecutable step = new ShellExecutable(); final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); + if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { + hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" + + kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n"); + } hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir)); @@ -187,7 +198,7 @@ public class HiveMRInput implements IMRInput { step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig); final Set<TableDesc> lookupViewsTables = Sets.newHashSet(); @@ -201,6 +212,10 @@ public class HiveMRInput implements IMRInput { if (lookupViewsTables.size() == 0) { return null; } + if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { + hiveCmdBuilder.addStatement("SET mapreduce.job.queuename =" + + kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) + ";\n"); + } final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; hiveCmdBuilder.addStatement(useDatabaseHql); hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); @@ -225,7 +240,10 @@ public class HiveMRInput implements IMRInput { public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); - + final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); + if (kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME) != null) { + hiveInitBuf.append("SET mapreduce.job.queuename =").append(kylinConfig.getMRConfigOverride().get(MR_OVERRIDE_JOB_QUEUENAME)).append(";\n"); + } final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));