Repository: kylin Updated Branches: refs/heads/master 97950611f -> b044ed770
KYLIN-1706 should use cube specific KylinConfig Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b044ed77 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b044ed77 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b044ed77 Branch: refs/heads/master Commit: b044ed7703575e036e9b8c1d0516d022c19a57fc Parents: 9795061 Author: gaodayue <gaoda...@meituan.com> Authored: Wed Jun 8 19:11:37 2016 +0800 Committer: gaodayue <gaoda...@meituan.com> Committed: Thu Jun 9 20:41:48 2016 +0800 ---------------------------------------------------------------------- .../engine/mr/common/AbstractHadoopJob.java | 6 ++---- .../mr/invertedindex/InvertedIndexJob.java | 2 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 16 ++++++---------- .../engine/mr/steps/FactDistinctColumnsJob.java | 8 +++----- .../kylin/engine/mr/steps/InMemCuboidJob.java | 6 ++---- .../kylin/engine/mr/steps/MergeCuboidJob.java | 9 ++++----- .../mr/steps/RowKeyDistributionCheckerJob.java | 3 ++- .../cardinality/HiveColumnCardinalityJob.java | 7 ++++--- .../storage/hbase/ii/IICreateHFileJob.java | 9 ++++++++- .../kylin/storage/hbase/steps/CubeHFileJob.java | 2 +- .../hbase/steps/RangeKeyDistributionJob.java | 20 +++++++++++--------- 11 files changed, 44 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 737ef0e..5472928 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -154,9 +154,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return retVal; } - protected void setJobClasspath(Job job) { - KylinConfig kylinConf = KylinConfig.getInstanceFromEnv(); - + protected void setJobClasspath(Job job, KylinConfig kylinConf) { String jarPath = kylinConf.getKylinJobJarPath(); File jarFile = new File(jarPath); if (jarFile.exists()) { @@ -239,7 +237,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } setJobTmpJarsAndFiles(job, kylinDependency.toString()); - + overrideJobConfig(job.getConfiguration(), kylinConf.getMRConfigOverride()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java index 9ea2411..f7adf6b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java @@ -66,7 +66,7 @@ public class InvertedIndexJob extends AbstractHadoopJob { IIInstance ii = getII(iiname); short sharding = ii.getDescriptor().getSharding(); - setJobClasspath(job); + setJobClasspath(job, ii.getConfig()); setupMapper(ii.getFirstSegment()); setupReducer(output, sharding); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index a445f71..85ae9c7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -100,10 +100,9 @@ public class CuboidJob extends AbstractHadoopJob { String segmentName = getOptionValue(OPTION_SEGMENT_NAME); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - + if (checkSkip(cubingJobId)) { logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeName + "[" + segmentName + "]"); return 0; @@ -112,7 +111,7 @@ public class CuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); - setJobClasspath(job); + setJobClasspath(job, cube.getConfig()); // Mapper configureMapperInputFormat(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); @@ -136,7 +135,7 @@ public class CuboidJob extends AbstractHadoopJob { // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); - setReduceTaskNum(job, config, cubeName, nCuboidLevel); + setReduceTaskNum(job, cube.getDescriptor(), nCuboidLevel); this.deletePath(job.getConfiguration(), output); @@ -169,12 +168,9 @@ public class CuboidJob extends AbstractHadoopJob { } } - protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { + protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { Configuration jobConf = job.getConfiguration(); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - - CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor(); - kylinConfig = cubeDesc.getConfig(); + KylinConfig kylinConfig = cubeDesc.getConfig(); double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 2dabb7a..90253ba 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -73,11 +73,9 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { // ---------------------------------------------------------------------------- // add metadata to distributed cache - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - CubeDesc cubeDesc = cube.getDescriptor(); - List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cubeDesc); + List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor()); job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); @@ -86,7 +84,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); logger.info("Starting: " + job.getJobName()); - setJobClasspath(job); + setJobClasspath(job, cube.getConfig()); setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW)); setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 1 : columnsNeedDict.size()); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 258ec95..4b2ff37 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -86,10 +86,8 @@ public class InMemCuboidJob extends AbstractHadoopJob { String segmentName = getOptionValue(OPTION_SEGMENT_NAME); String output = getOptionValue(OPTION_OUTPUT_PATH); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - config = cube.getConfig(); CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW); String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID); @@ -101,7 +99,7 @@ public class InMemCuboidJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); - setJobClasspath(job); + setJobClasspath(job, cube.getConfig()); // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java index a391f07..be1a1c1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java @@ -46,17 +46,16 @@ public class MergeCuboidJob extends CuboidJob { String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); - config = cube.getConfig(); // start job String jobName = getOptionValue(OPTION_JOB_NAME); logger.info("Starting: " + jobName); job = Job.getInstance(getConf(), jobName); - setJobClasspath(job); + setJobClasspath(job, cube.getConfig()); // set inputs addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); @@ -83,7 +82,7 @@ public class MergeCuboidJob extends CuboidJob { // add metadata to distributed cache attachKylinPropsAndMetadata(cube, job.getConfiguration()); - setReduceTaskNum(job, config, cubeName, 0); + setReduceTaskNum(job, cube.getDescriptor(), 0); this.deletePath(job.getConfiguration(), output); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java index 41712d9..c5a0ff2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; /** @@ -58,7 +59,7 @@ public class RowKeyDistributionCheckerJob extends AbstractHadoopJob { String jobName = getOptionValue(OPTION_JOB_NAME); job = Job.getInstance(getConf(), jobName); - setJobClasspath(job); + setJobClasspath(job, KylinConfig.getInstanceFromEnv()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index c613ef6..d03350e 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -70,12 +70,13 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { logger.info("Starting: " + jobName); Configuration conf = getConf(); - JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); job = Job.getInstance(conf, jobName); - setJobClasspath(job); + setJobClasspath(job, kylinConfig); String table = getOptionValue(OPTION_TABLE); job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table); @@ -103,7 +104,7 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'"); - TableDesc tableDesc = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(table); + TableDesc tableDesc = MetadataManager.getInstance(kylinConfig).getTableDesc(table); attachKylinPropsAndMetadata(tableDesc, job.getConfiguration()); int result = waitForCompletion(job); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java index 4781f2b..30dca8e 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java @@ -28,7 +28,10 @@ import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +58,11 @@ public class IICreateHFileJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - setJobClasspath(job); + String iiName = getOptionValue(OPTION_II_NAME); + IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); + IIInstance ii = mgr.getII(iiName); + + setJobClasspath(job, ii.getConfig()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index b92481e..19f37ec 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -74,7 +74,7 @@ public class CubeHFileJob extends AbstractHadoopJob { CubeInstance cube = cubeMgr.getCube(cubeName); job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - setJobClasspath(job); + setJobClasspath(job, cube.getConfig()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); http://git-wip-us.apache.org/repos/asf/kylin/blob/b044ed77/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java index 9a34da3..ffcbb12 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionJob.java @@ -69,7 +69,12 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { String jobName = getOptionValue(OPTION_JOB_NAME); job = Job.getInstance(getConf(), jobName); - setJobClasspath(job); + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + KylinConfig kylinConfig = cube.getConfig(); + + setJobClasspath(job, kylinConfig); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); @@ -92,20 +97,17 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { this.deletePath(job.getConfiguration(), output); - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - CubeInstance cube = cubeMgr.getCube(cubeName); - KylinConfig config = cube.getConfig(); - float hfileSizeGB = config.getHBaseHFileSizeGB(); - float regionSplitSize = config.getKylinHBaseRegionCut(); + + float hfileSizeGB = kylinConfig.getHBaseHFileSizeGB(); + float regionSplitSize = kylinConfig.getKylinHBaseRegionCut(); int compactionThreshold = Integer.valueOf(HBaseConnection.getCurrentHBaseConfiguration().get("hbase.hstore.compactionThreshold", "3")); if (hfileSizeGB > 0 && hfileSizeGB * compactionThreshold < regionSplitSize) { hfileSizeGB = regionSplitSize / compactionThreshold; logger.info("Adjust hfile size' to " + hfileSizeGB); } - int maxRegionCount = config.getHBaseRegionCountMax(); - int minRegionCount = config.getHBaseRegionCountMin(); + int maxRegionCount = kylinConfig.getHBaseRegionCountMax(); + int minRegionCount = kylinConfig.getHBaseRegionCountMin(); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); job.getConfiguration().set(BatchConstants.CFG_HFILE_SIZE_GB, String.valueOf(hfileSizeGB)); job.getConfiguration().set(BatchConstants.CFG_REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));