This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 76c9c960be542c919301c72b34c7ae5ce6f1ec1c Author: Yichen Zhou <zhouy...@gmail.com> AuthorDate: Wed Aug 8 09:53:29 2018 +0800 KYLIN-3446 Connect to HBase out of Spark Signed-off-by: shaofengshi <shaofeng...@apache.org> --- .../apache/kylin/engine/mr/JobBuilderSupport.java | 4 ++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 3 +- .../kylin/engine/mr/common/BatchConstants.java | 2 +- .../kylin/storage/hbase/steps/CreateHTableJob.java | 68 ++++++++++++++++++---- .../kylin/storage/hbase/steps/HBaseJobSteps.java | 28 ++++++--- .../kylin/storage/hbase/steps/HBaseSparkSteps.java | 3 +- .../kylin/storage/hbase/steps/SparkCubeHFile.java | 29 +++++---- 7 files changed, 97 insertions(+), 40 deletions(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 649b4c3..c6abf16 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -311,6 +311,10 @@ public class JobBuilderSupport { return getOptimizationRootPath(jobId) + "/cuboid/"; } + public String getHBaseConfFilePath(String jobId) { + return getJobWorkingDir(jobId) + "/hbase-conf.xml"; + } + // ============================================================================ // static methods also shared by other job flow participant // ---------------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 329dd56..2976080 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -129,7 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID); protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL) .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); - + public static final Option OPTION_HBASE_CONF_PATH = OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg() + .isRequired(true).withDescription("HBase config file path").create(BatchConstants.ARG_HBASE_CONF_PATH); private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index a4a52ad..6fe55e2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -104,7 +104,7 @@ public interface BatchConstants { String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID"; String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots"; String ARG_META_URL = "metadataUrl"; - + String ARG_HBASE_CONF_PATH = "hbaseConfPath"; /** * logger and counter */ diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 68aa172..5e17a4c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -26,12 +26,17 @@ import java.util.Map; import java.util.Set; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -46,6 +51,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.CuboidShardUtil; +import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +69,7 @@ public class CreateHTableJob extends AbstractHadoopJob { CubeDesc cubeDesc = null; String segmentID = null; String cuboidModeName = null; + String hbaseConfPath = null; KylinConfig kylinConfig; Path partitionFilePath; @@ -74,6 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_CUBOID_MODE); + options.addOption(OPTION_HBASE_CONF_PATH); parseOptions(options, args); partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); @@ -85,11 +93,12 @@ public class CreateHTableJob extends AbstractHadoopJob { kylinConfig = cube.getConfig(); segmentID = getOptionValue(OPTION_SEGMENT_ID); cuboidModeName = getOptionValue(OPTION_CUBOID_MODE); + hbaseConfPath = getOptionValue(OPTION_HBASE_CONF_PATH); CubeSegment cubeSegment = cube.getSegmentById(segmentID); byte[][] splitKeys; Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); - + // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName); if (buildingCuboids != null && !buildingCuboids.isEmpty()) { @@ -104,14 +113,35 @@ public class CreateHTableJob extends AbstractHadoopJob { } cuboidSizeMap = optimizedCuboidSizeMap; } - + splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent()); CubeHTableUtil.createHTable(cubeSegment, splitKeys); + exportHBaseConfiguration(cubeSegment.getStorageLocationIdentifier()); return 0; } + private void exportHBaseConfiguration(String hbaseTableName) throws Exception { + + Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); + HadoopUtil.healSickConfig(hbaseConf); + Job job = Job.getInstance(hbaseConf, hbaseTableName); + HTable table = new HTable(hbaseConf, hbaseTableName); + HFileOutputFormat2.configureIncrementalLoadMap(job, table); + + logger.info("Saving HBase configuration to " + hbaseConfPath); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + FSDataOutputStream out = null; + try { + out = fs.create(new Path(hbaseConfPath)); + job.getConfiguration().writeXml(out); + } catch (IOException e) { + throw new ExecuteException("Write hbase configuration failed", e); + } finally { + IOUtils.closeQuietly(out); + } + } //one region for one shard private static byte[][] getSplitsByRegionCount(int regionCount) { @@ -124,7 +154,9 @@ public class CreateHTableJob extends AbstractHadoopJob { return result; } - public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) throws IOException { + public static byte[][] getRegionSplitsFromCuboidStatistics(final Map<Long, Double> cubeSizeMap, + final KylinConfig kylinConfig, final CubeSegment cubeSegment, final Path hfileSplitsOutputFolder) + throws IOException { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); float cut = cubeDesc.getConfig().getKylinHBaseRegionCut(); @@ -157,7 +189,8 @@ public class CreateHTableJob extends AbstractHadoopJob { } if (nRegion != original) { - logger.info("Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); + logger.info( + "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding"); } } @@ -188,10 +221,13 @@ public class CreateHTableJob extends AbstractHadoopJob { } if (shardNum > nRegion) { - logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", cuboidId, estimatedSize, shardNum, nRegion)); + logger.info( + String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions, reduce to %d", + cuboidId, estimatedSize, shardNum, nRegion)); shardNum = nRegion; } else { - logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, estimatedSize, shardNum)); + logger.info(String.format("Cuboid %d 's estimated size %.2f MB will generate %d regions", cuboidId, + estimatedSize, shardNum)); } cuboidShards.put(cuboidId, (short) shardNum); @@ -204,7 +240,8 @@ public class CreateHTableJob extends AbstractHadoopJob { } for (int i = 0; i < nRegion; ++i) { - logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); + logger.info(String.format("Region %d's estimated size is %.2f MB, accounting for %.2f percent", i, + regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM)); } CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion); @@ -222,7 +259,8 @@ public class CreateHTableJob extends AbstractHadoopJob { if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) { // if the size already bigger than threshold, or it will exceed by 20%, cut for next region regionSplit.add(cuboidId); - logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids"); + logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + + " (" + cuboidCount + ") cuboids"); size = 0; cuboidCount = 0; regionIndex++; @@ -240,7 +278,8 @@ public class CreateHTableJob extends AbstractHadoopJob { } } - protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, final Path outputFolder, final KylinConfig kylinConfig) throws IOException { + protected static void saveHFileSplits(final List<HashMap<Long, Double>> innerRegionSplits, int mbPerRegion, + final Path outputFolder, final KylinConfig kylinConfig) throws IOException { if (outputFolder == null) { logger.warn("outputFolder for hfile split file is null, skip inner region split"); @@ -300,7 +339,8 @@ public class CreateHTableJob extends AbstractHadoopJob { logger.info(String.format("Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize)); byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN]; BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN); - System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); + System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN, + RowConstants.ROWKEY_CUBOIDID_LEN); splits.add(split); accumulatedSize = 0; j++; @@ -310,11 +350,15 @@ public class CreateHTableJob extends AbstractHadoopJob { } - SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), SequenceFile.Writer.valueClass(NullWritable.class)); + SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf, + SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class), + SequenceFile.Writer.valueClass(NullWritable.class)); for (int i = 0; i < splits.size(); i++) { //when we compare the rowkey, we compare the row firstly. - hfilePartitionWriter.append(new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), NullWritable.get()); + hfilePartitionWriter.append( + new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()), + NullWritable.get()); } hfilePartitionWriter.close(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java index 4fda139..e48090d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java @@ -59,8 +59,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); - appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); + appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, + getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); + appendExecCmdParameters(cmd, BatchConstants.ARG_HBASE_CONF_PATH, getHBaseConfFilePath(jobId)); createHtableStep.setJobParams(cmd.toString()); createHtableStep.setJobClass(CreateHTableJob.class); @@ -69,7 +71,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { } // TODO make it abstract - public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID, Class<? extends AbstractHadoopJob> clazz) { + public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, + String jobID, Class<? extends AbstractHadoopJob> clazz) { final List<String> mergingCuboidPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingCuboidPaths.add(getCuboidRootPath(merging) + "*"); @@ -86,7 +89,8 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath); appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); - appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); mergeCuboidDataStep.setMapReduceJobClass(clazz); @@ -148,8 +152,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { } public List<String> getMergingHTables() { - final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); - Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) + .getMergingSegments((CubeSegment) seg); + Preconditions.checkState(mergingSegments.size() > 1, + "there should be more than 2 segments to merge, target segment " + seg); final List<String> mergingHTables = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingHTables.add(merging.getStorageLocationIdentifier()); @@ -158,8 +164,10 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { } public List<String> getMergingHDFSPaths() { - final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); - Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); + final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()) + .getMergingSegments((CubeSegment) seg); + Preconditions.checkState(mergingSegments.size() > 1, + "there should be more than 2 segments to merge, target segment " + seg); final List<String> mergingHDFSPaths = Lists.newArrayList(); for (CubeSegment merging : mergingSegments) { mergingHDFSPaths.add(getJobWorkingDir(merging.getLastBuildJobID())); @@ -180,11 +188,13 @@ public abstract class HBaseJobSteps extends JobBuilderSupport { } public String getHFilePath(String jobId) { - return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/"); + return HBaseConnection.makeQualifiedPathInHBaseCluster( + getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/"); } public String getRowkeyDistributionOutputPath(String jobId) { - return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats"); + return HBaseConnection.makeQualifiedPathInHBaseCluster( + getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats"); } public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java index 622a0e8..be230f0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java @@ -22,6 +22,7 @@ import org.apache.kylin.common.util.ClassUtil; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2; import org.apache.kylin.engine.spark.SparkExecutable; import org.apache.kylin.job.constant.ExecutableConstants; @@ -48,7 +49,7 @@ public class HBaseSparkSteps extends HBaseJobSteps { sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId)); sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile"); - + sparkExecutable.setParam(AbstractHadoopJob.OPTION_HBASE_CONF_PATH.getOpt(), getHBaseConfFilePath(jobId)); sparkExecutable.setJobId(jobId); StringBuilder jars = new StringBuilder(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java index fd8459f..c87a739 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java @@ -17,7 +17,6 @@ */ package org.apache.kylin.storage.hbase.steps; -import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,12 +29,11 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -58,7 +56,6 @@ import org.apache.kylin.engine.mr.common.SerializableConfiguration; import org.apache.kylin.engine.spark.KylinSparkJobListener; import org.apache.kylin.engine.spark.SparkUtil; import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; @@ -102,6 +99,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable options.addOption(OPTION_META_URL); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_PARTITION_FILE_PATH); + options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH); } @Override @@ -117,11 +115,12 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); final Path partitionFilePath = new Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH)); + final String hbaseConfFile = optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH); Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class, KeyValue.class, RowKeyWritable.class }; - SparkConf conf = new SparkConf().setAppName("Covnerting Hfile for:" + cubeName + " segment " + segmentId); + SparkConf conf = new SparkConf().setAppName("Converting HFile for:" + cubeName + " segment " + segmentId); //serialization conf conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator"); @@ -171,17 +170,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable } logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles"); - Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration(); - HadoopUtil.healSickConfig(hbaseConf); - Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier()); - job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3 - HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier()); - try { - HFileOutputFormat2.configureIncrementalLoadMap(job, table); - } catch (IOException ioe) { - // this can be ignored. - logger.debug(ioe.getMessage(), ioe); - } + + //HBase conf + logger.info("Loading HBase configuration from:" + hbaseConfFile); + FSDataInputStream confInput = fs.open(new Path(hbaseConfFile)); + + Configuration hbaseJobConf = new Configuration(); + hbaseJobConf.addResource(confInput); + hbaseJobConf.set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3 + Job job = new Job(hbaseJobConf, cubeSegment.getStorageLocationIdentifier()); FileOutputFormat.setOutputPath(job, new Path(outputPath));