Repository: kylin Updated Branches: refs/heads/KYLIN-2788 97ce9192b -> d3285691d
KYLIN-2788 reorg CubeHFileJob Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3285691 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3285691 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3285691 Branch: refs/heads/KYLIN-2788 Commit: d3285691de8d58a90e4f4e59c7e3a8e5a7539421 Parents: 97ce919 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Aug 20 22:41:39 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Aug 20 22:41:39 2017 +0800 ---------------------------------------------------------------------- .../test_case_data/localmeta/kylin.properties | 1 - .../kylin/storage/hbase/steps/CubeHFileJob.java | 53 +++----------------- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 1 - 3 files changed, 8 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/examples/test_case_data/localmeta/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties index c7dda3f..a5ca53d 100644 --- a/examples/test_case_data/localmeta/kylin.properties +++ b/examples/test_case_data/localmeta/kylin.properties @@ -137,7 +137,6 @@ kylin.security.saml.context-path=/kylin kylin.test.bcc.new.key=some-value kylin.engine.mr.config-override.test1=test1 kylin.engine.mr.config-override.test2=test2 -kylin.job.lock=org.apache.kylin.job.lock.MockJobLockDup kylin.job.lock=org.apache.kylin.job.lock.MockJobLock http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 1a624c4..6cbabd1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -18,24 +18,16 @@ package org.apache.kylin.storage.hbase.steps; -import java.io.IOException; - import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -59,14 +51,11 @@ public class CubeHFileJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); - Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -80,7 +69,7 @@ public class CubeHFileJob extends AbstractHadoopJob { HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - FileOutputFormat.setOutputPath(job, output); + HFileOutputFormat2.setOutputPath(job, output); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(CubeHFileMapper.class); @@ -91,15 +80,14 @@ public class CubeHFileJob extends AbstractHadoopJob { // add metadata to distributed cache attachCubeMetadata(cube, job.getConfiguration()); - Configuration hbaseConf = HBaseConfiguration.create(getConf()); - HTable htable = new HTable(hbaseConf, getOptionValue(OPTION_HTABLE_NAME).toUpperCase()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getMetadataUrl()); + HTable htable = (HTable)conn.getTable(TableName.valueOf(getOptionValue(OPTION_HTABLE_NAME))); // Automatic config ! - HFileOutputFormat.configureIncrementalLoad(job, htable); - reconfigurePartitions(hbaseConf, partitionFilePath); + HFileOutputFormat2.configureIncrementalLoad(job, htable.getTableDescriptor(), htable.getRegionLocator()); // set block replication to 3 for hfiles - hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); + job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); this.deletePath(job.getConfiguration(), output); @@ -110,31 +98,6 @@ public class CubeHFileJob extends AbstractHadoopJob { } } - /** - * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers - * @param conf the job configuration - * @param path the hfile partition file - * @throws IOException - */ - @SuppressWarnings("deprecation") - private void reconfigurePartitions(Configuration conf, Path path) throws IOException { - FileSystem fs = path.getFileSystem(conf); - if (fs.exists(path)) { - try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) { - int partitionCount = 0; - Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - partitionCount++; - } - TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path); - // The reduce tasks should be one more than partition keys - job.setNumReduceTasks(partitionCount + 1); - } - } else { - logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions"); - } - } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CubeHFileJob(), args); http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 6f69e8c..1f454e0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -132,7 +132,6 @@ public class HBaseMRSteps extends JobBuilderSupport { appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); - appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getHFilePath(jobId)); appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());