Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2788 97ce9192b -> d3285691d


KYLIN-2788 reorg CubeHFileJob


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d3285691
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d3285691
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d3285691

Branch: refs/heads/KYLIN-2788
Commit: d3285691de8d58a90e4f4e59c7e3a8e5a7539421
Parents: 97ce919
Author: shaofengshi <shaofeng...@apache.org>
Authored: Sun Aug 20 22:41:39 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Sun Aug 20 22:41:39 2017 +0800

----------------------------------------------------------------------
 .../test_case_data/localmeta/kylin.properties   |  1 -
 .../kylin/storage/hbase/steps/CubeHFileJob.java | 53 +++-----------------
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  1 -
 3 files changed, 8 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/examples/test_case_data/localmeta/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kylin.properties 
b/examples/test_case_data/localmeta/kylin.properties
index c7dda3f..a5ca53d 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -137,7 +137,6 @@ kylin.security.saml.context-path=/kylin
 kylin.test.bcc.new.key=some-value
 kylin.engine.mr.config-override.test1=test1
 kylin.engine.mr.config-override.test2=test2
-kylin.job.lock=org.apache.kylin.job.lock.MockJobLockDup
 kylin.job.lock=org.apache.kylin.job.lock.MockJobLock
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index 1a624c4..6cbabd1 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -18,24 +18,16 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
-
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -59,14 +51,11 @@ public class CubeHFileJob extends AbstractHadoopJob {
         try {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_PARTITION_FILE_PATH);
             options.addOption(OPTION_INPUT_PATH);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_HTABLE_NAME);
             parseOptions(options, args);
 
-            Path partitionFilePath = new 
Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
-
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
 
@@ -80,7 +69,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
             
HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration());
 
             addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-            FileOutputFormat.setOutputPath(job, output);
+            HFileOutputFormat2.setOutputPath(job, output);
 
             job.setInputFormatClass(SequenceFileInputFormat.class);
             job.setMapperClass(CubeHFileMapper.class);
@@ -91,15 +80,14 @@ public class CubeHFileJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             attachCubeMetadata(cube, job.getConfiguration());
 
-            Configuration hbaseConf = HBaseConfiguration.create(getConf());
-            HTable htable = new HTable(hbaseConf, 
getOptionValue(OPTION_HTABLE_NAME).toUpperCase());
+            Connection conn = 
HBaseConnection.get(KylinConfig.getInstanceFromEnv().getMetadataUrl());
+            HTable htable = 
(HTable)conn.getTable(TableName.valueOf(getOptionValue(OPTION_HTABLE_NAME)));
 
             // Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
-            reconfigurePartitions(hbaseConf, partitionFilePath);
+            HFileOutputFormat2.configureIncrementalLoad(job, 
htable.getTableDescriptor(), htable.getRegionLocator());
 
             // set block replication to 3 for hfiles
-            hbaseConf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
 
             this.deletePath(job.getConfiguration(), output);
 
@@ -110,31 +98,6 @@ public class CubeHFileJob extends AbstractHadoopJob {
         }
     }
 
-    /**
-     * Check if there's partition files for hfile, if yes replace the table 
splits, to make the job more reducers
-     * @param conf the job configuration
-     * @param path the hfile partition file
-     * @throws IOException
-     */
-    @SuppressWarnings("deprecation")
-    private void reconfigurePartitions(Configuration conf, Path path) throws 
IOException {
-        FileSystem fs = path.getFileSystem(conf);
-        if (fs.exists(path)) {
-            try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
path, conf)) {
-                int partitionCount = 0;
-                Writable key = (Writable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                Writable value = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                while (reader.next(key, value)) {
-                    partitionCount++;
-                }
-                TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), 
path);
-                // The reduce tasks should be one more than partition keys
-                job.setNumReduceTasks(partitionCount + 1);
-            }
-        } else {
-            logger.info("File '" + path.toString() + " doesn't exist, will not 
reconfigure hfile Partitions");
-        }
-    }
 
     public static void main(String[] args) throws Exception {
         int exitCode = ToolRunner.run(new CubeHFileJob(), args);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d3285691/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 6f69e8c..1f454e0 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -132,7 +132,6 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getRealization().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, 
getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
         appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
getHFilePath(jobId));
         appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, 
seg.getStorageLocationIdentifier());

Reply via email to