KYLIN-1323 Improve performance of converting data to hfile
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/07ad8777 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/07ad8777 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/07ad8777 Branch: refs/heads/1.x-HBase1.1.3 Commit: 07ad8777eaa2788de48290a020590bab80f4d42c Parents: 63c59fe Author: sunyerui <[email protected]> Authored: Mon Feb 22 16:05:17 2016 +0800 Committer: shaofengshi <[email protected]> Committed: Thu Feb 25 18:01:14 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 10 ++++ .../common/util/HBaseRegionSizeCalculator.java | 8 ++++ conf/kylin.properties | 4 ++ .../kylin/job/constant/BatchConstants.java | 1 + .../apache/kylin/job/cube/CubingJobBuilder.java | 3 +- .../kylin/job/hadoop/AbstractHadoopJob.java | 2 +- .../kylin/job/hadoop/cube/CubeHFileJob.java | 37 ++++++++++++++- .../hadoop/cube/RangeKeyDistributionJob.java | 13 ++++++ .../hadoop/cube/RangeKeyDistributionMapper.java | 9 +++- .../cube/RangeKeyDistributionReducer.java | 49 +++++++++++++++++--- .../kylin/job/BuildCubeWithEngineTest.java | 42 +++++++++++++++++ 11 files changed, 166 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 7203065..9c813cd 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -131,6 +131,7 @@ public class KylinConfig { public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec"; + public static final String HBASE_HFILE_SIZE_GB = "kylin.hbase.hfile.size.gb"; public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small"; public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium"; public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large"; @@ -723,6 +724,15 @@ public class KylinConfig { kylinConfig.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v); } + // for test + public void setHBaseHFileSizeGB(int size) { + kylinConfig.setProperty(HBASE_HFILE_SIZE_GB, String.valueOf(size)); + } + + public int getHBaseHFileSizeGB() { + return Integer.parseInt(getOptional(HBASE_HFILE_SIZE_GB, "0")); + } + public int getHBaseRegionCountMin() { return Integer.parseInt(getOptional(HBASE_REGION_COUNT_MIN, "1")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java index 093ac9e..fccd042 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java +++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java @@ -48,6 +48,9 @@ public class HBaseRegionSizeCalculator { **/ private final Map<byte[], Long> sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + private final Map<byte[], Pair<Integer, Integer>> countMap = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable"; /** @@ -92,6 +95,7 @@ public class HBaseRegionSizeCalculator { long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte; sizeMap.put(regionId, regionSizeBytes); + countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles())); // logger.info("Region " + regionLoad.getNameAsString() // + " has size " + regionSizeBytes); @@ -124,4 +128,8 @@ public class HBaseRegionSizeCalculator { public Map<byte[], Long> getRegionSizeMap() { return Collections.unmodifiableMap(sizeMap); } + + public Map<byte[], Pair<Integer, Integer>> getRegionHFileCountMap() { + return Collections.unmodifiableMap(countMap); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/conf/kylin.properties b/conf/kylin.properties index e0727ed..bbcf948 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -83,6 +83,10 @@ kylin.hbase.region.cut.large=100 kylin.hbase.region.count.min=1 kylin.hbase.region.count.max=500 +# The hfile size of GB, smaller hfile leading to the converting hfile MR has more reducers and be faster +# set to 0 or comment this config to disable this optimization +kylin.hbase.hfile.size.gb=0 + # Enable/disable ACL check for cube query kylin.query.security.enabled=true http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java index 38f4a87..a3c4c32 100644 --- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java +++ b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java @@ -47,6 +47,7 @@ public interface BatchConstants { String REGION_NUMBER_MIN = "region.number.min"; String REGION_NUMBER_MAX = "region.number.max"; String REGION_SPLIT_SIZE = "region.split.size"; + String HFILE_SIZE_GB = "hfile.size.gb"; String CUBE_CAPACITY = "cube.capacity"; String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/"; http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index 80c030f..14bb649 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -372,7 +372,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); @@ -388,6 +388,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { appendMapReduceParameters(cmd, seg); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, "partitions", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000_hfile"); appendExecCmdParameters(cmd, "input", inputPath); appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index 698a978..a7d107d 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -83,7 +83,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim"); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level"); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input"); + protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions"); protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename"); protected String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 3c1e4a5..d5b83ef 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -18,17 +18,24 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.IOException; + import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; @@ -51,11 +58,14 @@ public class CubeHFileJob extends AbstractHadoopJob { try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_INPUT_PATH); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); + Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); @@ -82,8 +92,9 @@ public class CubeHFileJob extends AbstractHadoopJob { String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); HTable htable = new HTable(conf, tableName); - //Automatic config ! + // Automatic config ! HFileOutputFormat.configureIncrementalLoad(job, htable); + reconfigurePartitions(conf, partitionFilePath); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -101,6 +112,30 @@ public class CubeHFileJob extends AbstractHadoopJob { } } + /** + * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers + * @param conf the job configuration + * @param path the hfile partition file + * @throws IOException + */ + @SuppressWarnings("deprecation") + private void reconfigurePartitions(Configuration conf, Path path) throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) { + int partitionCount = 0; + Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); + Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); + while (reader.next(key, value)) { + partitionCount++; + } + TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path); + // The reduce tasks should be one more than partition keys + job.setNumReduceTasks(partitionCount+1); + } + } + } + public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new CubeHFileJob(), args); System.exit(exitCode); http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java index 9c50122..a7832fa 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java @@ -20,7 +20,10 @@ package org.apache.kylin.job.hadoop.cube; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -92,13 +95,23 @@ public class RangeKeyDistributionJob extends AbstractHadoopJob { String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); + int hfileSizeGB = KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB(); DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity(); int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString()); int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax(); int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin(); + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + job.getConfiguration().set(BatchConstants.HFILE_SIZE_GB, String.valueOf(hfileSizeGB)); job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize)); job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount)); job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount)); + // The partition file for hfile is sequenece file consists of ImmutableBytesWritable and NullWritable + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), ImmutableBytesWritable.class, NullWritable.class); + + // Passed the sandbox property to mapper, to simulate large dataset + if (System.getProperty("useSandbox") != null && System.getProperty("useSandbox").equals("true")) { + job.getConfiguration().setBoolean("useSandbox", true); + } return waitForCompletion(job); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java index 33baf45..41856b4 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java @@ -38,9 +38,14 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo private Text lastKey; + private Long scaleFactorForSandbox = 1L; + @Override protected void setup(Context context) throws IOException { super.publishConfiguration(context.getConfiguration()); + if (context.getConfiguration().getBoolean("useSandbox", false)) { + scaleFactorForSandbox = 1024L; + } } @Override @@ -50,8 +55,8 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo int bytesLength = key.getLength() + value.getLength(); bytesRead += bytesLength; - if (bytesRead >= ONE_MEGA_BYTES) { - outputValue.set(bytesRead); + if ((bytesRead * scaleFactorForSandbox) >= ONE_MEGA_BYTES) { + outputValue.set(bytesRead * scaleFactorForSandbox); context.write(key, outputValue); // reset bytesRead http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java index b3ab4db..f9c46bb 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java @@ -22,7 +22,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; import org.apache.kylin.common.mr.KylinReducer; @@ -46,13 +50,23 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable private int minRegionCount = 1; private int maxRegionCount = 500; private int cut = 10; + private int hfileSizeGB = 1; private long bytesRead = 0; private List<Text> gbPoints = new ArrayList<Text>(); + private String output = null; @Override protected void setup(Context context) throws IOException { super.publishConfiguration(context.getConfiguration()); + if (context.getConfiguration().get(BatchConstants.OUTPUT_PATH) != null) { + output = context.getConfiguration().get(BatchConstants.OUTPUT_PATH); + } + + if (context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB) != null) { + hfileSizeGB = Integer.valueOf(context.getConfiguration().get(BatchConstants.HFILE_SIZE_GB)); + } + if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) { cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE)); } @@ -65,7 +79,11 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX)); } - logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount); + logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + + ", min region count=" + minRegionCount + ", hfile size=" + hfileSizeGB); + + // add empty key at position 0 + gbPoints.add(new Text()); } @Override @@ -89,14 +107,31 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable int gbPerRegion = gbPoints.size() / nRegion; gbPerRegion = Math.max(1, gbPerRegion); + if (hfileSizeGB <= 0) { + hfileSizeGB = gbPerRegion; + } + int hfilePerRegion = gbPerRegion / hfileSizeGB; + hfilePerRegion = Math.max(1, hfilePerRegion); System.out.println(nRegion + " regions"); System.out.println(gbPerRegion + " GB per region"); - - for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) { - Text key = gbPoints.get(i); - outputValue.set(i); - System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); - context.write(key, outputValue); + System.out.println(hfilePerRegion + " hfile per region"); + + Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile"); + try (SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer( + hfilePartitionFile.getFileSystem(context.getConfiguration()), + context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class)) { + int hfileCountInOneRegion = 0; + for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) { + hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get()); + if (++hfileCountInOneRegion >= hfilePerRegion) { + Text key = gbPoints.get(i); + outputValue.set(i); + System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get()); + context.write(key, outputValue); + + hfileCountInOneRegion = 0; + } + } } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/07ad8777/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java ---------------------------------------------------------------------- diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java index f02aa7a..78e6ab3 100644 --- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java +++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java @@ -21,9 +21,11 @@ package org.apache.kylin.job; import static org.junit.Assert.assertEquals; import java.io.File; +import java.io.IOException; import java.lang.reflect.Method; import java.text.SimpleDateFormat; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -33,11 +35,16 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.lock.ZookeeperJobLock; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.HBaseMetadataTestCase; +import org.apache.kylin.common.util.HBaseRegionSizeCalculator; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -129,9 +136,11 @@ public class BuildCubeWithEngineTest { } private void testInner() throws Exception { + KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1); DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty"); String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", }; runTestAndAssertSucceed(testCase); + KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0); } private void testLeft() throws Exception { @@ -280,6 +289,7 @@ public class BuildCubeWithEngineTest { CubingJob job = cubingJobBuilder.buildJob(segment); jobService.addJob(job); waitForJob(job.getId()); + checkHFilesInHBase(segment); return job.getId(); } @@ -295,4 +305,36 @@ public class BuildCubeWithEngineTest { return jobId; } + + private void checkHFilesInHBase(CubeSegment segment) throws IOException { + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + String tableName = segment.getStorageLocationIdentifier(); + HTable table = new HTable(conf, tableName); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); + Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); + long totalSize = 0; + for (Long size : sizeMap.values()) { + totalSize += size; + } + if (totalSize == 0) { + return; + } + Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap(); + // check if there's region contains more than one hfile, which means the hfile config take effects + boolean hasMultiHFileRegions = false; + for (Pair<Integer, Integer> count : countMap.values()) { + // check if hfile count is greater than store count + if (count.getSecond() > count.getFirst()) { + hasMultiHFileRegions = true; + break; + } + } + if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) { + throw new IOException("hfile size set to 0, but found region contains more than one hfiles"); + } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) { + throw new IOException("hfile size set greater than 0, but all regions still has only one hfile"); + } + } + } +
