Repository: kylin Updated Branches: refs/heads/KYLIN-1077 [created] 66ad13860
KYLIN-1566 use a separate kylin_job_conf.xml for in-mem cubing Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/209068b9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/209068b9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/209068b9 Branch: refs/heads/KYLIN-1077 Commit: 209068b943bf4a90efe4df618e1aaf5cbfe49cde Parents: 1b54a40 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Apr 15 16:11:44 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sat Apr 16 09:05:40 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin_job_conf_inmem.xml | 98 ++++++++++++++++++++ .../apache/kylin/common/KylinConfigBase.java | 19 ---- .../kylin/job/engine/JobEngineConfig.java | 44 ++++++--- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 3 +- .../kylin/engine/mr/JobBuilderSupport.java | 13 ++- .../kylin/engine/mr/steps/InMemCuboidJob.java | 11 --- .../cardinality/HiveColumnCardinalityJob.java | 2 +- 7 files changed, 140 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/build/conf/kylin_job_conf_inmem.xml ---------------------------------------------------------------------- diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml new file mode 100644 index 0000000..55bf9ed --- /dev/null +++ b/build/conf/kylin_job_conf_inmem.xml @@ -0,0 +1,98 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <property> + <name>mapreduce.job.split.metainfo.maxsize</name> + <value>-1</value> + <description>The maximum permissible size of the split metainfo file. + The JobTracker won't attempt to read split metainfo files bigger than + the configured value. No limits if set to -1. + </description> + </property> + + <property> + <name>mapred.compress.map.output</name> + <value>true</value> + <description>Compress map outputs</description> + </property> + + <property> + <name>mapred.map.output.compression.codec</name> + <value>org.apache.hadoop.io.compress.SnappyCodec</value> + <description>The compression codec to use for map outputs + </description> + </property> + + <property> + <name>mapred.output.compress</name> + <value>true</value> + <description>Compress the output of a MapReduce job</description> + </property> + + <property> + <name>mapred.output.compression.codec</name> + <value>org.apache.hadoop.io.compress.SnappyCodec</value> + <description>The compression codec to use for job outputs + </description> + </property> + + <property> + <name>mapred.output.compression.type</name> + <value>BLOCK</value> + <description>The compression type to use for job outputs</description> + </property> + + + <property> + <name>mapreduce.job.max.split.locations</name> + <value>2000</value> + <description>No description</description> + </property> + + <property> + <name>dfs.replication</name> + <value>2</value> + <description>Block replication</description> + </property> + + <property> + <name>mapred.task.timeout</name> + <value>3600000</value> + <description>Set task timeout to 1 hour</description> + </property> + + <!--Additional config for in-mem cubing, giving mapper more memory --> + <property> + <name>mapreduce.map.memory.mb</name> + <value>3072</value> + <description></description> + </property> + + <property> + <name>mapreduce.map.java.opts</name> + <value>-Xmx2700m</value> + <description></description> + </property> + + <property> + <name>mapreduce.task.io.sort.mb</name> + <value>200</value> + <description></description> + </property> + +</configuration> http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 51aa8aa..4d65c1d 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -531,25 +531,6 @@ abstract public class KylinConfigBase implements Serializable { return percent; } - public Map<String, String> getCubingInMemMRJobConfOverride() { - // in-mem cubing requires big memory, however dev env (sandbox) may not have that much - String defaultOverride = isDevEnv() ? "" : "mapreduce.map.java.opts=-Xmx2700m; mapreduce.map.memory.mb=3072; mapreduce.task.io.sort.mb=200"; - String override = getOptional("kylin.job.cubing.inmem.mrjob_conf_override", defaultOverride); - - Map<String, String> result = Maps.newHashMap(); - for (String pair : override.split(";")) { - int cut = pair.indexOf('='); - if (cut < 0) - continue; - String k = pair.substring(0, cut).trim(); - String v = pair.substring(cut + 1).trim(); - if (k.isEmpty() || v.isEmpty()) - continue; - result.put(k, v); - } - return result; - } - public String getHbaseDefaultCompressionCodec() { return getOptional("kylin.hbase.default.compression.codec", ""); } http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java index 546c033..fb4ce68 100644 --- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java +++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java @@ -33,8 +33,10 @@ import org.slf4j.LoggerFactory; */ public class JobEngineConfig { private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class); - public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf"; - public static String HIVE_CONF_FILENAME = "kylin_hive_conf"; + public static final String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf"; + public static final String HIVE_CONF_FILENAME = "kylin_hive_conf"; + public static final String DEFAUL_JOB_CONF_SUFFIX = ""; + public static final String IN_MEM_JOB_CONF_SUFFIX = "inmem"; private static File getJobConfig(String fileName) { String path = System.getProperty(KylinConfig.KYLIN_CONF); @@ -49,10 +51,10 @@ public class JobEngineConfig { return null; } - private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException { + private String getHadoopJobConfFilePath(String suffix, boolean appendSuffix) throws IOException { String hadoopJobConfFile; - if (capaticy != null && appendSuffix) { - hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml"); + if (suffix != null && appendSuffix) { + hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + suffix.toLowerCase() + ".xml"); } else { hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml"); } @@ -69,19 +71,31 @@ public class JobEngineConfig { return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath()); } - public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException { - String path = getHadoopJobConfFilePath(capaticy, true); - if (!StringUtils.isEmpty(path)) { - logger.info("Chosen job conf is : " + path); - return path; + /** + * + * @param suffix job config file suffix name; if be null, will use the default job conf + * @return the job config file path + * @throws IOException + */ + public String getHadoopJobConfFilePath(String jobType, String capacity) throws IOException { + String suffix; + if(!StringUtils.isEmpty(jobType)) { + suffix = jobType + "_" + capacity; } else { - path = getHadoopJobConfFilePath(capaticy, false); - if (!StringUtils.isEmpty(path)) { - logger.info("Chosen job conf is : " + path); - return path; + suffix = capacity; + } + String path = getHadoopJobConfFilePath(suffix, true); + if (StringUtils.isEmpty(path)) { + path = getHadoopJobConfFilePath(jobType, true); + if (StringUtils.isEmpty(path)) { + path = getHadoopJobConfFilePath(jobType, false); + if (StringUtils.isEmpty(path)) { + path = ""; + } } } - return ""; + logger.info("Chosen job conf is : " + path); + return path; } public String getHiveConfFilePath() throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 0b1bd90..a1c9cd9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -30,6 +30,7 @@ import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,7 +109,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { MapReduceExecutable cubeStep = new MapReduceExecutable(); StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); + appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX, ((CubeSegment) seg).getCubeDesc().getModel()); cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE); http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index c4fc6b9..841c402 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -21,7 +21,6 @@ package org.apache.kylin.engine.mr; import java.io.IOException; import java.util.List; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; @@ -162,9 +161,17 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/secondary_index/"; } - public void appendMapReduceParameters(StringBuilder buf, DataModelDesc modelDesc) { + public void appendMapReduceParameters(StringBuilder buf, DataModelDesc dataModelDesc) { + appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX, dataModelDesc.getCapacity().toString()); + } + + public void appendMapReduceParameters(StringBuilder buf, String jobType, DataModelDesc dataModelDesc) { + appendMapReduceParameters(buf, jobType, dataModelDesc.getCapacity().toString()); + } + + public void appendMapReduceParameters(StringBuilder buf, String jobType, String capacity) { try { - String jobConf = config.getHadoopJobConfFilePath(modelDesc.getCapacity()); + String jobConf = config.getHadoopJobConfFilePath(jobType, capacity); if (jobConf != null && jobConf.length() > 0) { buf.append(" -conf ").append(jobConf); } http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index f440b22..e7bbdf1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -101,9 +101,6 @@ public class InMemCuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); - // some special tuning for in-mem MR job - overrideJobConf(job.getConfiguration(), config); - setJobClasspath(job); // add metadata to distributed cache @@ -112,8 +109,6 @@ public class InMemCuboidJob extends AbstractHadoopJob { // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - long timeout = 1000 * 60 * 60L; // 1 hour - job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout)); // set input IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); @@ -149,12 +144,6 @@ public class InMemCuboidJob extends AbstractHadoopJob { } } - private void overrideJobConf(Configuration jobConf, KylinConfig kylinConfig) { - for (Entry<String, String> entry : kylinConfig.getCubingInMemMRJobConfOverride().entrySet()) { - jobConf.set(entry.getKey(), entry.getValue()); - } - } - private int calculateReducerNum(CubeSegment cubeSeg) throws IOException { KylinConfig kylinConfig = cubeSeg.getConfig(); http://git-wip-us.apache.org/repos/asf/kylin/blob/209068b9/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index 9162208..3ce0ab2 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -71,7 +71,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { Configuration conf = getConf(); JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); - conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null)); + conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null, null)); job = Job.getInstance(conf, jobName);