http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java new file mode 100644 index 0000000..3e7cd3e --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class); + + public UpdateCubeInfoAfterCheckpointStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + + Set<Long> recommendCuboids = cube.getCuboidsRecommend(); + try { + List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); + Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil + .readCuboidStatsFromSegments(recommendCuboids, newSegments); + if (recommendCuboidsWithStats == null) { + throw new RuntimeException("Fail to get statistics info for recommended cuboids after optimization!!!"); + } + cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats, + newSegments.toArray(new CubeSegment[newSegments.size()])); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (Exception e) { + logger.error("fail to update cube after build", e); + return new ExecuteResult(e, e.getLocalizedMessage()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java new file mode 100644 index 0000000..965111b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable { + + private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterOptimizeStep.class); + + public UpdateCubeInfoAfterOptimizeStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(segment); + long sourceCount = originalSegment.getInputRecords(); + long sourceSizeBytes = originalSegment.getInputRecordsSize(); + + CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + long cubeSizeBytes = cubingJob.findCubeSizeBytes(); + + segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams())); + segment.setLastBuildTime(System.currentTimeMillis()); + segment.setSizeKB(cubeSizeBytes / 1024); + segment.setInputRecords(sourceCount); + segment.setInputRecordsSize(sourceSizeBytes); + + try { + cubeManager.promoteNewlyOptimizeSegments(cube, segment); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); + } catch (IOException e) { + logger.error("fail to update cube after build", e); + return new ExecuteResult(e, e.getLocalizedMessage()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java new file mode 100644 index 0000000..0cd7264 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class UpdateOldCuboidShardJob extends AbstractHadoopJob { + + private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment optSegment = cube.getSegmentById(segmentID); + CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); + + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + // Mapper + job.setMapperClass(UpdateOldCuboidShardMapper.class); + + // Reducer + job.setNumReduceTasks(0); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // Input + job.setInputFormatClass(SequenceFileInputFormat.class); + FileInputFormat.setInputPaths(job, input); + // Output + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, output); + + // set job configuration + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + // add metadata to distributed cache + attachSegmentsMetadataWithDict(Lists.newArrayList(optSegment, originalSegment), job.getConfiguration()); + + this.deletePath(job.getConfiguration(), output); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in CuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java new file mode 100644 index 0000000..cf3c29e --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; +import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Text> { + + private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class); + + private MultipleOutputs mos; + private long baseCuboid; + + private CubeDesc cubeDesc; + private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; + + private Text outputKey = new Text(); + private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; + private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); + + @Override + protected void setup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + mos = new MultipleOutputs(context); + + String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); + String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeSegment cubeSegment = cube.getSegmentById(segmentID); + CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment); + + cubeDesc = cube.getDescriptor(); + baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + + rowKeySplitter = new RowKeySplitter(oldSegment, 65, 256); + rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); + } + + @Override + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { + long cuboidID = rowKeySplitter.split(key.getBytes()); + + Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID); + int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(newKeyBuf.array(), 0, fullKeySize); + + String baseOutputPath = PathNameCuboidOld; + if (cuboidID == baseCuboid) { + baseOutputPath = PathNameCuboidBase; + } + mos.write(outputKey, value, generateFileName(baseOutputPath)); + } + + private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) { + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid); + + int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId + int endIdx = startIdx + Long.bitCount(cuboid.getId()); + int offset = 0; + for (int i = startIdx; i < endIdx; i++) { + System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, splitBuffers[i].length); + offset += splitBuffers[i].length; + } + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); + + return fullKeySize; + } + + @Override + public void doCleanup(Context context) throws IOException, InterruptedException { + mos.close(); + + Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase); + FileSystem fs = FileSystem.get(context.getConfiguration()); + if (!fs.exists(outputDirBase)) { + fs.mkdirs(outputDirBase); + SequenceFile + .createWriter(context.getConfiguration(), + SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")), + SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)) + .close(); + } + } + + private String generateFileName(String subDir) { + return subDir + "/part"; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java index 08ed207..81d97b4 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.IBatchCubingEngine; +import org.apache.kylin.engine.mr.BatchOptimizeJobBuilder2; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -46,6 +47,11 @@ public class SparkBatchCubingEngine implements IBatchCubingEngine { } @Override + public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { + return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build(); + } + + @Override public Class<?> getSourceInterface() { return null; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index a370292..8f821dd 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -39,6 +39,7 @@ import org.apache.kylin.dimension.DimensionEncodingFactory; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.project.ProjectInstance; @@ -50,6 +51,7 @@ import org.apache.kylin.rest.exception.NotFoundException; import org.apache.kylin.rest.request.CubeRequest; import org.apache.kylin.rest.request.JobBuildRequest; import org.apache.kylin.rest.request.JobBuildRequest2; +import org.apache.kylin.rest.request.JobOptimizeRequest; import org.apache.kylin.rest.response.GeneralResponse; import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.service.CubeService; @@ -307,6 +309,64 @@ public class CubeController extends BasicController { } } + /** + * Send a optimize cube job + * + * @param cubeName Cube ID + * @return JobInstance of CheckpointExecutable + */ + @RequestMapping(value = "/{cubeName}/optimize", method = { RequestMethod.PUT }) + @ResponseBody + public JobInstance optimize(@PathVariable String cubeName, @RequestBody JobOptimizeRequest jobOptimizeRequest) { + try { + String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); + CubeInstance cube = jobService.getCubeManager().getCube(cubeName); + + if (cube == null) { + throw new InternalErrorException("Cannot find cube " + cubeName); + } + logger.info("cuboid recommend:" + jobOptimizeRequest.getCuboidsRecommend()); + return jobService.submitOptimizeJob(cube, jobOptimizeRequest.getCuboidsRecommend(), submitter).getFirst(); + } catch (JobException e) { + logger.error(e.getLocalizedMessage(), e); + throw new BadRequestException(e.getLocalizedMessage()); + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + throw new InternalErrorException(e.getLocalizedMessage()); + } + } + + /** + * Send a optimize cube segment job + * + * @param cubeName Cube ID + * @param segmentID for segment to be optimized + */ + @RequestMapping(value = "/{cubeName}/recover_segment_optimize/{segmentID}", method = { RequestMethod.PUT }) + @ResponseBody + public JobInstance recoverSegmentOptimize(@PathVariable String cubeName, @PathVariable String segmentID) { + try { + String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); + CubeInstance cube = jobService.getCubeManager().getCube(cubeName); + if (cube == null) { + throw new InternalErrorException("Cannot find cube " + cubeName); + } + + CubeSegment segment = cube.getSegmentById(segmentID); + if (segment == null) { + throw new InternalErrorException("Cannot find segment '" + segmentID + "'"); + } + + return jobService.submitRecoverSegmentOptimizeJob(segment, submitter); + } catch (JobException e) { + logger.error(e.getLocalizedMessage(), e); + throw new BadRequestException(e.getLocalizedMessage()); + } catch (Exception e) { + logger.error(e.getLocalizedMessage(), e); + throw new InternalErrorException(e.getLocalizedMessage()); + } + } + @RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT }, produces = { "application/json" }) @ResponseBody public CubeInstance disableCube(@PathVariable String cubeName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java new file mode 100644 index 0000000..51e8e7c --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.rest.request; + +import java.util.Set; + +public class JobOptimizeRequest { + + private Set<Long> cuboidsRecommend; + + public Set<Long> getCuboidsRecommend() { + return cuboidsRecommend; + } + + public void setCuboidsRecommend(Set<Long> cuboidsRecommend) { + this.cuboidsRecommend = cuboidsRecommend; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 4820ccd..93af53b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -70,6 +70,8 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * Stateless & lightweight service facade of cube management functions. @@ -509,6 +511,8 @@ public class CubeService extends BasicService { CubeUpdate update = new CubeUpdate(cube); update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); + update.setCuboids(Maps.<Long, Long> newHashMap()); + update.setCuboidsRecommend(Sets.<Long> newHashSet()); CubeManager.getInstance(getConfig()).updateCube(update); } http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 432d300..a437934 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -34,11 +34,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.directory.api.util.Strings; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.JobInfoConverter; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; @@ -48,6 +50,7 @@ import org.apache.kylin.job.SchedulerFactory; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobTimeFilterEnum; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.CheckpointExecutable; @@ -71,6 +74,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import com.google.common.base.Function; @@ -271,6 +275,142 @@ public class JobService extends BasicService implements InitializingBean { return jobInstance; } + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public Pair<JobInstance, List<JobInstance>> submitOptimizeJob(CubeInstance cube, Set<Long> cuboidsRecommend, + String submitter) throws IOException, JobException { + + Pair<JobInstance, List<JobInstance>> result = submitOptimizeJobInternal(cube, cuboidsRecommend, submitter); + accessService.init(result.getFirst(), null); + accessService.inherit(result.getFirst(), cube); + for (JobInstance jobInstance : result.getSecond()) { + accessService.init(jobInstance, null); + accessService.inherit(jobInstance, cube); + } + + return result; + } + + private Pair<JobInstance, List<JobInstance>> submitOptimizeJobInternal(CubeInstance cube, + Set<Long> cuboidsRecommend, String submitter) throws IOException { + Message msg = MsgPicker.getMsg(); + + if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { + throw new BadRequestException(String.format(msg.getBUILD_BROKEN_CUBE(), cube.getName())); + } + + checkCubeDescSignature(cube); + checkAllowOptimization(cube, cuboidsRecommend); + + CubeSegment[] optimizeSegments = null; + try { + /** Add optimize segments */ + optimizeSegments = getCubeManager().optimizeSegments(cube, cuboidsRecommend); + List<JobInstance> optimizeJobInstances = Lists.newLinkedList(); + + /** Add optimize jobs */ + List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length); + for (CubeSegment optimizeSegment : optimizeSegments) { + DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter); + getExecutableManager().addJob(optimizeJob); + + optimizeJobList.add(optimizeJob); + optimizeJobInstances.add(getSingleJobInstance(optimizeJob)); + } + + /** Add checkpoint job for batch jobs */ + CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cube, submitter).build(); + checkpointJob.addTaskListForCheck(optimizeJobList); + + getExecutableManager().addJob(checkpointJob); + + return new Pair(getCheckpointJobInstance(checkpointJob), optimizeJobInstances); + } catch (Exception e) { + if (optimizeSegments != null) { + logger.error("Job submission might failed for NEW segments {}, will clean the NEW segments from cube", + optimizeSegments); + try { + // Remove this segments + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToRemoveSegs(optimizeSegments); + getCubeManager().updateCube(cubeBuilder); + } catch (Exception ee) { + // swallow the exception + logger.error("Clean New segments failed, ignoring it", e); + } + } + throw e; + } + } + + @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')") + public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String submitter) + throws IOException, JobException { + CubeInstance cubeInstance = segment.getCubeInstance(); + + checkCubeDescSignature(cubeInstance); + + String cubeName = cubeInstance.getName(); + List<JobInstance> jobInstanceList = searchJobsByCubeName(cubeName, null, + Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.PENDING, JobStatusEnum.ERROR), + JobTimeFilterEnum.ALL, JobSearchMode.CHECKPOINT_ONLY); + if (jobInstanceList.size() > 1) { + throw new IllegalStateException("Exist more than one CheckpointExecutable for cube " + cubeName); + } else if (jobInstanceList.size() == 0) { + throw new IllegalStateException("There's no CheckpointExecutable for cube " + cubeName); + } + CheckpointExecutable checkpointExecutable = (CheckpointExecutable) getExecutableManager() + .getJob(jobInstanceList.get(0).getId()); + + AbstractExecutable toBeReplaced = null; + for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) { + if (taskForCheck instanceof CubingJob) { + CubingJob subCubingJob = (CubingJob) taskForCheck; + String segmentName = CubingExecutableUtil.getSegmentName(subCubingJob.getParams()); + if (segmentName != null && segmentName.equals(segment.getName())) { + String segmentID = CubingExecutableUtil.getSegmentId(subCubingJob.getParams()); + CubeSegment beingOptimizedSegment = cubeInstance.getSegmentById(segmentID); + if (beingOptimizedSegment != null) { // beingOptimizedSegment exists & should not be recovered + throw new IllegalStateException("Segment " + beingOptimizedSegment.getName() + "-" + + beingOptimizedSegment.getUuid() + + " still exists. Please delete it or discard the related optimize job first!!!"); + } + toBeReplaced = taskForCheck; + break; + } + } + } + if (toBeReplaced == null) { + throw new IllegalStateException("There's no CubingJob for segment " + segment.getName() + + " in CheckpointExecutable " + checkpointExecutable.getName()); + } + + /** Add CubingJob for the related segment **/ + CubeSegment optimizeSegment = getCubeManager().appendSegment(cubeInstance, segment.getDateRangeStart(), + segment.getDateRangeEnd()); + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeBuilder.setToAddSegs(optimizeSegment); + getCubeManager().updateCube(cubeBuilder); + + DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter); + + getExecutableManager().addJob(optimizeJob); + + JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob); + accessService.init(optimizeJobInstance, null); + accessService.inherit(optimizeJobInstance, cubeInstance); + + /** Update the checkpoint job */ + checkpointExecutable.getSubTasksForCheck().set(checkpointExecutable.getSubTasksForCheck().indexOf(toBeReplaced), + optimizeJob); + + getExecutableManager().updateCheckpointJob(checkpointExecutable.getId(), + checkpointExecutable.getSubTasksForCheck()); + + return optimizeJobInstance; + } + private void checkCubeDescSignature(CubeInstance cube) { Message msg = MsgPicker.getMsg(); @@ -279,8 +419,25 @@ public class JobService extends BasicService implements InitializingBean { String.format(msg.getINCONSISTENT_CUBE_DESC_SIGNATURE(), cube.getDescriptor())); } + private void checkAllowOptimization(CubeInstance cube, Set<Long> cuboidsRecommend) { + long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + if (!cuboidsRecommend.contains(baseCuboid)) { + throw new BadRequestException("The recommend cuboids should contain the base cuboid " + baseCuboid); + } + Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds(); + if (currentCuboidSet.equals(cuboidsRecommend)) { + throw new BadRequestException( + "The recommend cuboids are the same as the current cuboids. It's no need to do optimization."); + } + } + public JobInstance getJobInstance(String uuid) { - return getSingleJobInstance(getExecutableManager().getJob(uuid)); + AbstractExecutable job = getExecutableManager().getJob(uuid); + if (job instanceof CheckpointExecutable) { + return getCheckpointJobInstance(job); + } else { + return getSingleJobInstance(job); + } } public Output getOutput(String id) { @@ -362,21 +519,88 @@ public class JobService extends BasicService implements InitializingBean { getExecutableManager().discardJob(job.getId()); return job; } - CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube()); + + logger.info("Cancel job [" + job.getId() + "] trigger by " + + SecurityContextHolder.getContext().getAuthentication().getName()); + if (job.getStatus() == JobStatusEnum.FINISHED) { + throw new IllegalStateException( + "The job " + job.getId() + " has already been finished and cannot be discarded."); + } + if (job.getStatus() == JobStatusEnum.DISCARDED) { + return job; + } + + AbstractExecutable executable = getExecutableManager().getJob(job.getId()); + if (executable instanceof CubingJob) { + cancelCubingJobInner((CubingJob) executable); + } else if (executable instanceof CheckpointExecutable) { + cancelCheckpointJobInner((CheckpointExecutable) executable); + } else { + getExecutableManager().discardJob(executable.getId()); + } + return job; + } + + private void cancelCubingJobInner(CubingJob cubingJob) throws IOException { + CubeInstance cubeInstance = getCubeManager().getCube(CubingExecutableUtil.getCubeName(cubingJob.getParams())); // might not a cube job - final String segmentIds = job.getRelatedSegment(); - for (String segmentId : StringUtils.split(segmentIds)) { - final CubeSegment segment = cubeInstance.getSegmentById(segmentId); - if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getDateRangeEnd() == 0)) { - // Remove this segments + final String segmentIds = CubingExecutableUtil.getSegmentId(cubingJob.getParams()); + if (!StringUtils.isEmpty(segmentIds)) { + List<CubeSegment> toRemoveSegments = Lists.newLinkedList(); + for (String segmentId : StringUtils.split(segmentIds)) { + final CubeSegment segment = cubeInstance.getSegmentById(segmentId); + if (segment != null && segment.getStatus() != SegmentStatusEnum.READY) { + toRemoveSegments.add(segment); + } + } + if (!toRemoveSegments.isEmpty()) { CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToRemoveSegs(segment); + cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()])); getCubeManager().updateCube(cubeBuilder); } } - getExecutableManager().discardJob(job.getId()); + getExecutableManager().discardJob(cubingJob.getId()); + } - return job; + private void cancelCheckpointJobInner(CheckpointExecutable checkpointExecutable) throws IOException { + List<String> segmentIdList = Lists.newLinkedList(); + List<String> jobIdList = Lists.newLinkedList(); + jobIdList.add(checkpointExecutable.getId()); + setRelatedIdList(checkpointExecutable, segmentIdList, jobIdList); + + CubeInstance cubeInstance = getCubeManager() + .getCube(CubingExecutableUtil.getCubeName(checkpointExecutable.getParams())); + if (!segmentIdList.isEmpty()) { + List<CubeSegment> toRemoveSegments = Lists.newLinkedList(); + for (String segmentId : segmentIdList) { + final CubeSegment segment = cubeInstance.getSegmentById(segmentId); + if (segment != null && segment.getStatus() != SegmentStatusEnum.READY) { + toRemoveSegments.add(segment); + } + } + + CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); + cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()])); + cubeBuilder.setCuboidsRecommend(Sets.<Long> newHashSet()); //Set recommend cuboids to be null + getCubeManager().updateCube(cubeBuilder); + } + + for (String jobId : jobIdList) { + getExecutableManager().discardJob(jobId); + } + } + + private void setRelatedIdList(CheckpointExecutable checkpointExecutable, List<String> segmentIdList, + List<String> jobIdList) { + for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) { + jobIdList.add(taskForCheck.getId()); + if (taskForCheck instanceof CubingJob) { + segmentIdList.addAll(Lists + .newArrayList(StringUtils.split(CubingExecutableUtil.getSegmentId(taskForCheck.getParams())))); + } else if (taskForCheck instanceof CheckpointExecutable) { + setRelatedIdList((CheckpointExecutable) taskForCheck, segmentIdList, jobIdList); + } + } } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index fc52701..f2e2ddd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; @@ -66,6 +67,7 @@ public class CreateHTableJob extends AbstractHadoopJob { CubeInstance cube = null; CubeDesc cubeDesc = null; String segmentID = null; + String cuboidModeName = null; KylinConfig kylinConfig; Path partitionFilePath; @@ -77,6 +79,7 @@ public class CreateHTableJob extends AbstractHadoopJob { options.addOption(OPTION_SEGMENT_ID); options.addOption(OPTION_PARTITION_FILE_PATH); options.addOption(OPTION_STATISTICS_ENABLED); + options.addOption(OPTION_CUBOID_MODE); parseOptions(options, args); partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); @@ -88,13 +91,27 @@ public class CreateHTableJob extends AbstractHadoopJob { cubeDesc = cube.getDescriptor(); kylinConfig = cube.getConfig(); segmentID = getOptionValue(OPTION_SEGMENT_ID); + cuboidModeName = getOptionValue(OPTION_CUBOID_MODE); CubeSegment cubeSegment = cube.getSegmentById(segmentID); Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); byte[][] splitKeys; if (statsEnabled) { - final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap(); + Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap(); + Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName); + if (buildingCuboids != null && !buildingCuboids.isEmpty()) { + Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size()); + for (Long cuboid : buildingCuboids) { + Double cuboidSize = cuboidSizeMap.get(cuboid); + if (cuboidSize == null) { + logger.warn(cuboid + "cuboid's size is null will replace by 0"); + cuboidSize = 0.0; + } + optimizedCuboidSizeMap.put(cuboid, cuboidSize); + } + cuboidSizeMap = optimizedCuboidSizeMap; + } splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent()); } else { splitKeys = getRegionSplits(conf, partitionFilePath); http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java index 31cb189..e1cf7e0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java @@ -31,13 +31,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.engine.mr.IMROutput2; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.MapReduceUtil; import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper; import org.apache.kylin.engine.mr.steps.InMemCuboidMapper; import org.apache.kylin.engine.mr.steps.MergeCuboidJob; import org.apache.kylin.engine.mr.steps.NDCuboidMapper; -import org.apache.kylin.engine.mr.steps.ReducerNumSizing; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,13 +95,15 @@ public class HBaseMROutput2Transition implements IMROutput2 { } @Override - public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception { + public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler, + int level) throws Exception { int reducerNum = 1; Class mapperClass = job.getMapperClass(); if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) { - reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), level); + reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler, + AbstractHadoopJob.getTotalMapInputMB(job), level); } else if (mapperClass == InMemCuboidMapper.class) { - reducerNum = ReducerNumSizing.getInmemCubingReduceTaskNum(segment); + reducerNum = MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler); } Path outputPath = new Path(output); FileOutputFormat.setOutputPath(job, outputPath); @@ -149,7 +153,8 @@ public class HBaseMROutput2Transition implements IMROutput2 { @Override public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception { - int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), -1); + int reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, segment.getCuboidScheduler(), + AbstractHadoopJob.getTotalMapInputMB(job), -1); job.setNumReduceTasks(reducerNum); Path outputPath = new Path(output); @@ -185,4 +190,26 @@ public class HBaseMROutput2Transition implements IMROutput2 { throw new IllegalStateException("No merging segment's last build job ID equals " + jobID); } } + + public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) { + return new IMRBatchOptimizeOutputSide2() { + HBaseMRSteps steps = new HBaseMRSteps(seg); + + @Override + public void addStepPhase1_CreateHTable(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND)); + } + + @Override + public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) { + jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId())); + jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId())); + } + + @Override + public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) { + steps.addOptimizeGarbageCollectionSteps(jobFlow); + } + }; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/0c4b3ad5/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 6f69e8c..41e80e3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@ -32,6 +33,7 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.storage.hbase.HBaseConnection; import com.google.common.base.Preconditions; @@ -82,7 +84,15 @@ public class HBaseMRSteps extends JobBuilderSupport { return createCreateHTableStep(jobId, true); } + public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) { + return createCreateHTableStep(jobId, true, cuboidMode); + } + private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) { + return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT); + } + + private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); @@ -90,6 +100,7 @@ public class HBaseMRSteps extends JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000"); appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString()); createHtableStep.setJobParams(cmd.toString()); createHtableStep.setJobClass(CreateHTableJob.class); @@ -167,6 +178,35 @@ public class HBaseMRSteps extends JobBuilderSupport { return result; } + public MergeGCStep createOptimizeGCStep() { + MergeGCStep result = new MergeGCStep(); + result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + result.setOldHTables(getOptimizeHTables()); + return result; + } + + public List<CubeSegment> getOptimizeSegments() { + CubeInstance cube = (CubeInstance) seg.getRealization(); + List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING)); + List<CubeSegment> oldSegments = Lists.newArrayListWithExpectedSize(newSegments.size()); + for (CubeSegment segment : newSegments) { + oldSegments.add(cube.getOriginalSegmentToOptimize(segment)); + } + return oldSegments; + } + + public List<String> getOptimizeHTables() { + return getOldHTables(getOptimizeSegments()); + } + + public List<String> getOldHTables(final List<CubeSegment> oldSegments) { + final List<String> oldHTables = Lists.newArrayListWithExpectedSize(oldSegments.size()); + for (CubeSegment segment : oldSegments) { + oldHTables.add(segment.getStorageLocationIdentifier()); + } + return oldHTables; + } + public List<String> getMergingHTables() { final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg); Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg); @@ -187,6 +227,18 @@ public class HBaseMRSteps extends JobBuilderSupport { return mergingHDFSPaths; } + public List<String> getOptimizeHDFSPaths() { + return getOldHDFSPaths(getOptimizeSegments()); + } + + public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) { + final List<String> oldHDFSPaths = Lists.newArrayListWithExpectedSize(oldSegments.size()); + for (CubeSegment oldSegment : oldSegments) { + oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID())); + } + return oldHDFSPaths; + } + public String getHFilePath(String jobId) { return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/"); } @@ -195,6 +247,22 @@ public class HBaseMRSteps extends JobBuilderSupport { return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats"); } + public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { + String jobId = jobFlow.getId(); + + jobFlow.addTask(createOptimizeGCStep()); + + List<String> toDeletePaths = new ArrayList<>(); + toDeletePaths.addAll(getOptimizeHDFSPaths()); + + HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); + step.setDeletePaths(toDeletePaths); + step.setJobId(jobId); + + jobFlow.addTask(step); + } + public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) { String jobId = jobFlow.getId();