This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 5316e19 KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob & UpdateOldCuboidShardJob to avoid generating small hdfs files 5316e19 is described below commit 5316e190acd85f52205b0849a0d8689004900c1b Author: kyotoYaho <nju_y...@apache.org> AuthorDate: Mon Apr 1 15:45:34 2019 +0800 KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob & UpdateOldCuboidShardJob to avoid generating small hdfs files --- .../apache/kylin/cube/common/RowKeySplitter.java | 5 ++ .../kylin/engine/mr/common/BatchConstants.java | 2 + .../engine/mr/common/ConvergeCuboidDataUtil.java | 57 ++++++++++++++++++ .../engine/mr/common/CuboidStatsReaderUtil.java | 9 ++- .../kylin/engine/mr/common/MapReduceUtil.java | 32 +++++++++++ .../mr/steps/ConvergeCuboidDataPartitioner.java | 67 ++++++++++++++++++++++ ...aMapper.java => ConvergeCuboidDataReducer.java} | 48 +++++++--------- .../mr/steps/FilterRecommendCuboidDataJob.java | 22 +++---- .../mr/steps/FilterRecommendCuboidDataMapper.java | 43 ++------------ .../engine/mr/steps/UpdateOldCuboidShardJob.java | 20 +++---- .../mr/steps/UpdateOldCuboidShardMapper.java | 42 +------------- 11 files changed, 213 insertions(+), 134 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java index 264c7a5..1e09442 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java @@ -90,9 +90,14 @@ public class RowKeySplitter implements java.io.Serializable { public long parseCuboid(byte[] bytes) { + return getCuboidId(bytes, enableSharding); + } + + public static long getCuboidId(byte[] bytes, boolean enableSharding) { int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0; return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN); } + /** * @param bytes * @return cuboid ID diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 66da1b2..af11bb6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -77,6 +77,8 @@ public interface BatchConstants { String CFG_SHARD_NUM = "shard.num"; + String CFG_CONVERGE_CUBOID_PARTITION_PARAM = "converge.cuboid.partition.param"; + /** * command line ARGuments */ diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java new file mode 100644 index 0000000..87f2a28 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.common; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataPartitioner; +import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataReducer; + +public class ConvergeCuboidDataUtil { + + public static void setupReducer(Job job, CubeSegment cubeSegment, Path output) throws IOException { + // Output + //// prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + FileOutputFormat.setOutputPath(job, output); + + // Reducer + job.setReducerClass(ConvergeCuboidDataReducer.class); + job.setPartitionerClass(ConvergeCuboidDataPartitioner.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + Pair<Integer, Integer> numReduceTasks = MapReduceUtil.getConvergeCuboidDataReduceTaskNums(cubeSegment); + job.setNumReduceTasks(numReduceTasks.getFirst()); + + int nBaseReduceTasks = numReduceTasks.getSecond(); + boolean enableSharding = cubeSegment.isEnableSharding(); + long baseCuboidId = cubeSegment.getCuboidScheduler().getBaseCuboidId(); + String partiParams = enableSharding + "," + baseCuboidId + "," + nBaseReduceTasks; + job.getConfiguration().set(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM, partiParams); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java index ee615c3..2ef70f8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java @@ -135,6 +135,12 @@ public class CuboidStatsReaderUtil { public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> cuboidIds, CubeSegment cubeSegment) throws IOException { + Pair<Map<Long, Long>, Long> stats = readCuboidStatsWithSourceFromSegment(cuboidIds, cubeSegment); + return stats == null ? null : stats.getFirst(); + } + + public static Pair<Map<Long, Long>, Long> readCuboidStatsWithSourceFromSegment(Set<Long> cuboidIds, + CubeSegment cubeSegment) throws IOException { if (cubeSegment == null) { logger.warn("The cube segment can not be " + null); return null; @@ -157,7 +163,6 @@ public class CuboidStatsReaderUtil { cuboidsWithStats.put(cuboid, rowEstimate); } } - return cuboidsWithStats; + return new Pair<>(cuboidsWithStats, cubeStatsReader.sourceRowCount); } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java index 8fc26b4..ecde4aa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java @@ -20,9 +20,11 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; import java.util.Map; +import java.util.Set; import org.apache.hadoop.mapreduce.Reducer; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidScheduler; @@ -31,6 +33,8 @@ import org.apache.kylin.job.exception.JobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + public class MapReduceUtil { private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class); @@ -112,7 +116,35 @@ public class MapReduceUtil { for (Double cuboidSize : cubeSizeMap.values()) { totalSizeInM += cuboidSize; } + return getReduceTaskNum(totalSizeInM, kylinConfig); + } + + // @return the first indicates the total reducer number, the second indicates the reducer number for base cuboid + public static Pair<Integer, Integer> getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException { + long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId(); + + Set<Long> overlapCuboids = Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds()); + overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend()); + overlapCuboids.add(baseCuboidId); + + Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil + .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg); + Map<Long, Double> cubeSizeMap = CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(), + cuboidStats.getSecond()); + double totalSizeInM = 0; + for (Double cuboidSize : cubeSizeMap.values()) { + totalSizeInM += cuboidSize; + } + + double baseSizeInM = cubeSizeMap.get(baseCuboidId); + + KylinConfig kylinConfig = cubeSeg.getConfig(); + int nBase = getReduceTaskNum(baseSizeInM, kylinConfig); + int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig); + return new Pair<>(nBase + nOther, nBase); + } + private static int getReduceTaskNum(double totalSizeInM, KylinConfig kylinConfig) { double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java new file mode 100644 index 0000000..605905a --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.util.Random; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.engine.mr.common.BatchConstants; + +import com.google.common.base.Preconditions; + +public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> implements Configurable { + + private Random rand = new Random(); + + private Configuration conf; + private boolean enableSharding; + private long baseCuboidID; + private int numReduceBaseCuboid; + + @Override + public int getPartition(Text key, Text value, int numReduceTasks) { + long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding); + // the first numReduceBaseCuboid are for base cuboid + if (cuboidID == baseCuboidID) { + return rand.nextInt(numReduceBaseCuboid); + } else { + return numReduceBaseCuboid + rand.nextInt(numReduceTasks - numReduceBaseCuboid); + } + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + String partiParam = conf.get(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM); + String[] params = partiParam.split(","); + Preconditions.checkArgument(params.length >= 3); + this.enableSharding = Boolean.parseBoolean(params[0]); + this.baseCuboidID = Long.parseLong(params[1]); + this.numReduceBaseCuboid = Integer.parseInt(params[2]); + } + + @Override + public Configuration getConf() { + return conf; + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java similarity index 69% copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java index 2bb8349..78860bf 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,7 +22,6 @@ import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; import java.io.IOException; -import java.util.Set; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,19 +34,16 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; -import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.KylinReducer; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import com.google.common.base.Preconditions; - -public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Text, Text> { +public class ConvergeCuboidDataReducer extends KylinReducer<Text, Text, Text, Text> { private MultipleOutputs mos; - private RowKeySplitter rowKeySplitter; + private boolean enableSharding; private long baseCuboid; - private Set<Long> recommendCuboids; @Override protected void doSetup(Context context) throws IOException { @@ -59,30 +55,28 @@ public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Tex KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - CubeManager cubeManager = CubeManager.getInstance(config); - CubeInstance cube = cubeManager.getCube(cubeName); - CubeSegment optSegment = cube.getSegmentById(segmentID); - CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); - - rowKeySplitter = new RowKeySplitter(originalSegment); - baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + CubeSegment cubeSegment = cube.getSegmentById(segmentID); + CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment); - recommendCuboids = cube.getCuboidsRecommend(); - Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null"); + this.enableSharding = oldSegment.isEnableSharding(); + this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); } @Override - public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidID = rowKeySplitter.split(key.getBytes()); - if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) { - return; + public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding); + + String baseOutputPath = cuboidID == baseCuboid ? PathNameCuboidBase : PathNameCuboidOld; + int n = 0; + for (Text value : values) { + mos.write(key, value, generateFileName(baseOutputPath)); + n++; } - - String baseOutputPath = PathNameCuboidOld; - if (cuboidID == baseCuboid) { - baseOutputPath = PathNameCuboidBase; + if (n > 1) { + throw new RuntimeException( + "multiple records share the same key in aggregated cuboid data for cuboid " + cuboidID); } - mos.write(key, value, generateFileName(baseOutputPath)); } @Override diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java index 2fbbc73..1b8bf58 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java @@ -19,21 +19,20 @@ package org.apache.kylin.engine.mr.steps; import java.util.Locale; + import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,26 +68,21 @@ public class FilterRecommendCuboidDataJob extends AbstractHadoopJob { // Mapper job.setMapperClass(FilterRecommendCuboidDataMapper.class); - - // Reducer - job.setNumReduceTasks(0); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); // Input job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.setInputPaths(job, input); - // Output - //// prevent to create zero-sized default output - LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); - FileOutputFormat.setOutputPath(job, output); + + // Reducer + ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); // add metadata to distributed cache - attachSegmentMetadataWithDict(originalSegment, job.getConfiguration()); + attachSegmentMetadata(originalSegment, job.getConfiguration(), false, false); this.deletePath(job.getConfiguration(), output); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java index 2bb8349..2fad4e9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java @@ -18,18 +18,10 @@ package org.apache.kylin.engine.mr.steps; -import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; -import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; - import java.io.IOException; import java.util.Set; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -43,16 +35,13 @@ import com.google.common.base.Preconditions; public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Text, Text> { - private MultipleOutputs mos; - - private RowKeySplitter rowKeySplitter; + private boolean enableSharding; private long baseCuboid; private Set<Long> recommendCuboids; @Override protected void doSetup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - mos = new MultipleOutputs(context); String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); @@ -64,7 +53,7 @@ public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Tex CubeSegment optSegment = cube.getSegmentById(segmentID); CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment); - rowKeySplitter = new RowKeySplitter(originalSegment); + enableSharding = originalSegment.isEnableSharding(); baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); recommendCuboids = cube.getCuboidsRecommend(); @@ -73,35 +62,11 @@ public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Tex @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { - long cuboidID = rowKeySplitter.split(key.getBytes()); + long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), enableSharding); if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) { return; } - String baseOutputPath = PathNameCuboidOld; - if (cuboidID == baseCuboid) { - baseOutputPath = PathNameCuboidBase; - } - mos.write(key, value, generateFileName(baseOutputPath)); - } - - @Override - public void doCleanup(Context context) throws IOException, InterruptedException { - mos.close(); - - Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase); - FileSystem fs = FileSystem.get(context.getConfiguration()); - if (!fs.exists(outputDirBase)) { - fs.mkdirs(outputDirBase); - SequenceFile - .createWriter(context.getConfiguration(), - SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")), - SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)) - .close(); - } - } - - private String generateFileName(String subDir) { - return subDir + "/part"; + context.write(key, value); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java index 80c483e..4012393 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java @@ -19,21 +19,20 @@ package org.apache.kylin.engine.mr.steps; import java.util.Locale; + import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,20 +70,15 @@ public class UpdateOldCuboidShardJob extends AbstractHadoopJob { // Mapper job.setMapperClass(UpdateOldCuboidShardMapper.class); - - // Reducer - job.setNumReduceTasks(0); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); // Input job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.setInputPaths(job, input); - // Output - //// prevent to create zero-sized default output - LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); - FileOutputFormat.setOutputPath(job, output); + + // Reducer + ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output); // set job configuration job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java index 3d18bd6..ac1d499 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java @@ -18,17 +18,9 @@ package org.apache.kylin.engine.mr.steps; -import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase; -import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld; - import java.io.IOException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.cube.CubeInstance; @@ -50,9 +42,6 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class); - private MultipleOutputs mos; - private long baseCuboid; - private CubeDesc cubeDesc; private RowKeySplitter rowKeySplitter; private RowKeyEncoderProvider rowKeyEncoderProvider; @@ -64,7 +53,6 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te @Override protected void doSetup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - mos = new MultipleOutputs(context); String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); @@ -76,7 +64,6 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment); cubeDesc = cube.getDescriptor(); - baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); rowKeySplitter = new RowKeySplitter(oldSegment); rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); @@ -90,11 +77,7 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers()); outputKey.set(newKeyBuf.array(), 0, fullKeySize); - String baseOutputPath = PathNameCuboidOld; - if (cuboidID == baseCuboid) { - baseOutputPath = PathNameCuboidBase; - } - mos.write(outputKey, value, generateFileName(baseOutputPath)); + context.write(outputKey, value); } private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) { @@ -104,7 +87,8 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te int endIdx = startIdx + Long.bitCount(cuboid.getId()); int offset = 0; for (int i = startIdx; i < endIdx; i++) { - System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset, splitBuffers[i].length()); + System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset, + splitBuffers[i].length()); offset += splitBuffers[i].length(); } @@ -118,24 +102,4 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te return fullKeySize; } - - @Override - public void doCleanup(Context context) throws IOException, InterruptedException { - mos.close(); - - Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase); - FileSystem fs = FileSystem.get(context.getConfiguration()); - if (!fs.exists(outputDirBase)) { - fs.mkdirs(outputDirBase); - SequenceFile - .createWriter(context.getConfiguration(), - SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")), - SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)) - .close(); - } - } - - private String generateFileName(String subDir) { - return subDir + "/part"; - } }