This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5316e19  KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob & 
UpdateOldCuboidShardJob to avoid generating small hdfs files
5316e19 is described below

commit 5316e190acd85f52205b0849a0d8689004900c1b
Author: kyotoYaho <nju_y...@apache.org>
AuthorDate: Mon Apr 1 15:45:34 2019 +0800

    KYLIN-3925 Add reduce step for FilterRecommendCuboidDataJob & 
UpdateOldCuboidShardJob to avoid generating small hdfs files
---
 .../apache/kylin/cube/common/RowKeySplitter.java   |  5 ++
 .../kylin/engine/mr/common/BatchConstants.java     |  2 +
 .../engine/mr/common/ConvergeCuboidDataUtil.java   | 57 ++++++++++++++++++
 .../engine/mr/common/CuboidStatsReaderUtil.java    |  9 ++-
 .../kylin/engine/mr/common/MapReduceUtil.java      | 32 +++++++++++
 .../mr/steps/ConvergeCuboidDataPartitioner.java    | 67 ++++++++++++++++++++++
 ...aMapper.java => ConvergeCuboidDataReducer.java} | 48 +++++++---------
 .../mr/steps/FilterRecommendCuboidDataJob.java     | 22 +++----
 .../mr/steps/FilterRecommendCuboidDataMapper.java  | 43 ++------------
 .../engine/mr/steps/UpdateOldCuboidShardJob.java   | 20 +++----
 .../mr/steps/UpdateOldCuboidShardMapper.java       | 42 +-------------
 11 files changed, 213 insertions(+), 134 deletions(-)

diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java 
b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index 264c7a5..1e09442 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -90,9 +90,14 @@ public class RowKeySplitter implements java.io.Serializable {
 
 
     public long parseCuboid(byte[] bytes) {
+        return getCuboidId(bytes, enableSharding);
+    }
+
+    public static long getCuboidId(byte[] bytes, boolean enableSharding) {
         int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0;
         return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
     }
+
     /**
      * @param bytes
      * @return cuboid ID
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 66da1b2..af11bb6 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -77,6 +77,8 @@ public interface BatchConstants {
 
     String CFG_SHARD_NUM = "shard.num";
 
+    String CFG_CONVERGE_CUBOID_PARTITION_PARAM = 
"converge.cuboid.partition.param";
+
     /**
      * command line ARGuments
      */
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
new file mode 100644
index 0000000..87f2a28
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/ConvergeCuboidDataUtil.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataPartitioner;
+import org.apache.kylin.engine.mr.steps.ConvergeCuboidDataReducer;
+
+public class ConvergeCuboidDataUtil {
+
+    public static void setupReducer(Job job, CubeSegment cubeSegment, Path 
output) throws IOException {
+        // Output
+        //// prevent to create zero-sized default output
+        LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+        FileOutputFormat.setOutputPath(job, output);
+
+        // Reducer
+        job.setReducerClass(ConvergeCuboidDataReducer.class);
+        job.setPartitionerClass(ConvergeCuboidDataPartitioner.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+
+        Pair<Integer, Integer> numReduceTasks = 
MapReduceUtil.getConvergeCuboidDataReduceTaskNums(cubeSegment);
+        job.setNumReduceTasks(numReduceTasks.getFirst());
+
+        int nBaseReduceTasks = numReduceTasks.getSecond();
+        boolean enableSharding = cubeSegment.isEnableSharding();
+        long baseCuboidId = cubeSegment.getCuboidScheduler().getBaseCuboidId();
+        String partiParams = enableSharding + "," + baseCuboidId + "," + 
nBaseReduceTasks;
+        
job.getConfiguration().set(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM, 
partiParams);
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index ee615c3..2ef70f8 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -135,6 +135,12 @@ public class CuboidStatsReaderUtil {
 
     public static Map<Long, Long> readCuboidStatsFromSegment(Set<Long> 
cuboidIds, CubeSegment cubeSegment)
             throws IOException {
+        Pair<Map<Long, Long>, Long> stats = 
readCuboidStatsWithSourceFromSegment(cuboidIds, cubeSegment);
+        return stats == null ? null : stats.getFirst();
+    }
+
+    public static Pair<Map<Long, Long>, Long> 
readCuboidStatsWithSourceFromSegment(Set<Long> cuboidIds,
+            CubeSegment cubeSegment) throws IOException {
         if (cubeSegment == null) {
             logger.warn("The cube segment can not be " + null);
             return null;
@@ -157,7 +163,6 @@ public class CuboidStatsReaderUtil {
                 cuboidsWithStats.put(cuboid, rowEstimate);
             }
         }
-        return cuboidsWithStats;
+        return new Pair<>(cuboidsWithStats, cubeStatsReader.sourceRowCount);
     }
-
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index 8fc26b4..ecde4aa 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -20,9 +20,11 @@ package org.apache.kylin.engine.mr.common;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -31,6 +33,8 @@ import org.apache.kylin.job.exception.JobException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 public class MapReduceUtil {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MapReduceUtil.class);
@@ -112,7 +116,35 @@ public class MapReduceUtil {
         for (Double cuboidSize : cubeSizeMap.values()) {
             totalSizeInM += cuboidSize;
         }
+        return getReduceTaskNum(totalSizeInM, kylinConfig);
+    }
+
+    // @return the first indicates the total reducer number, the second 
indicates the reducer number for base cuboid
+    public static Pair<Integer, Integer> 
getConvergeCuboidDataReduceTaskNums(CubeSegment cubeSeg) throws IOException {
+        long baseCuboidId = cubeSeg.getCuboidScheduler().getBaseCuboidId();
+
+        Set<Long> overlapCuboids = 
Sets.newHashSet(cubeSeg.getCuboidScheduler().getAllCuboidIds());
+        
overlapCuboids.retainAll(cubeSeg.getCubeInstance().getCuboidsRecommend());
+        overlapCuboids.add(baseCuboidId);
+
+        Pair<Map<Long, Long>, Long> cuboidStats = CuboidStatsReaderUtil
+                .readCuboidStatsWithSourceFromSegment(overlapCuboids, cubeSeg);
+        Map<Long, Double> cubeSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(cubeSeg, cuboidStats.getFirst(),
+                cuboidStats.getSecond());
+        double totalSizeInM = 0;
+        for (Double cuboidSize : cubeSizeMap.values()) {
+            totalSizeInM += cuboidSize;
+        }
+
+        double baseSizeInM = cubeSizeMap.get(baseCuboidId);
+
+        KylinConfig kylinConfig = cubeSeg.getConfig();
+        int nBase = getReduceTaskNum(baseSizeInM, kylinConfig);
+        int nOther = getReduceTaskNum(totalSizeInM - baseSizeInM, kylinConfig);
+        return new Pair<>(nBase + nOther, nBase);
+    }
 
+    private static int getReduceTaskNum(double totalSizeInM, KylinConfig 
kylinConfig) {
         double perReduceInputMB = 
kylinConfig.getDefaultHadoopJobReducerInputMB();
         double reduceCountRatio = 
kylinConfig.getDefaultHadoopJobReducerCountRatio();
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
new file mode 100644
index 0000000..605905a
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataPartitioner.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.base.Preconditions;
+
+public class ConvergeCuboidDataPartitioner extends Partitioner<Text, Text> 
implements Configurable {
+
+    private Random rand = new Random();
+
+    private Configuration conf;
+    private boolean enableSharding;
+    private long baseCuboidID;
+    private int numReduceBaseCuboid;
+
+    @Override
+    public int getPartition(Text key, Text value, int numReduceTasks) {
+        long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), 
enableSharding);
+        // the first numReduceBaseCuboid are for base cuboid
+        if (cuboidID == baseCuboidID) {
+            return rand.nextInt(numReduceBaseCuboid);
+        } else {
+            return numReduceBaseCuboid + rand.nextInt(numReduceTasks - 
numReduceBaseCuboid);
+        }
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        String partiParam = 
conf.get(BatchConstants.CFG_CONVERGE_CUBOID_PARTITION_PARAM);
+        String[] params = partiParam.split(",");
+        Preconditions.checkArgument(params.length >= 3);
+        this.enableSharding = Boolean.parseBoolean(params[0]);
+        this.baseCuboidID = Long.parseLong(params[1]);
+        this.numReduceBaseCuboid = Integer.parseInt(params[2]);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
similarity index 69%
copy from 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
copy to 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
index 2bb8349..78860bf 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ConvergeCuboidDataReducer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@ import static 
org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
 import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
 
 import java.io.IOException;
-import java.util.Set;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,19 +34,16 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 
-import com.google.common.base.Preconditions;
-
-public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, 
Text, Text> {
+public class ConvergeCuboidDataReducer extends KylinReducer<Text, Text, Text, 
Text> {
 
     private MultipleOutputs mos;
 
-    private RowKeySplitter rowKeySplitter;
+    private boolean enableSharding;
     private long baseCuboid;
-    private Set<Long> recommendCuboids;
 
     @Override
     protected void doSetup(Context context) throws IOException {
@@ -59,30 +55,28 @@ public class FilterRecommendCuboidDataMapper extends 
KylinMapper<Text, Text, Tex
 
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
-        CubeManager cubeManager = CubeManager.getInstance(config);
-        CubeInstance cube = cubeManager.getCube(cubeName);
-        CubeSegment optSegment = cube.getSegmentById(segmentID);
-        CubeSegment originalSegment = 
cube.getOriginalSegmentToOptimize(optSegment);
-
-        rowKeySplitter = new RowKeySplitter(originalSegment);
-        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+        CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+        CubeSegment oldSegment = 
cube.getOriginalSegmentToOptimize(cubeSegment);
 
-        recommendCuboids = cube.getCuboidsRecommend();
-        Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map 
could not be null");
+        this.enableSharding = oldSegment.isEnableSharding();
+        this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
     }
 
     @Override
-    public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes());
-        if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
-            return;
+    public void doReduce(Text key, Iterable<Text> values, Context context) 
throws IOException, InterruptedException {
+        long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), 
enableSharding);
+
+        String baseOutputPath = cuboidID == baseCuboid ? PathNameCuboidBase : 
PathNameCuboidOld;
+        int n = 0;
+        for (Text value : values) {
+            mos.write(key, value, generateFileName(baseOutputPath));
+            n++;
         }
-
-        String baseOutputPath = PathNameCuboidOld;
-        if (cuboidID == baseCuboid) {
-            baseOutputPath = PathNameCuboidBase;
+        if (n > 1) {
+            throw new RuntimeException(
+                    "multiple records share the same key in aggregated cuboid 
data for cuboid " + cuboidID);
         }
-        mos.write(key, value, generateFileName(baseOutputPath));
     }
 
     @Override
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
index 2fbbc73..1b8bf58 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
@@ -19,21 +19,20 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.util.Locale;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,26 +68,21 @@ public class FilterRecommendCuboidDataJob extends 
AbstractHadoopJob {
 
             // Mapper
             job.setMapperClass(FilterRecommendCuboidDataMapper.class);
-
-            // Reducer
-            job.setNumReduceTasks(0);
-
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
 
             // Input
             job.setInputFormatClass(SequenceFileInputFormat.class);
             FileInputFormat.setInputPaths(job, input);
-            // Output
-            //// prevent to create zero-sized default output
-            LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
-            FileOutputFormat.setOutputPath(job, output);
+
+            // Reducer
+            ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output);
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
             // add metadata to distributed cache
-            attachSegmentMetadataWithDict(originalSegment, 
job.getConfiguration());
+            attachSegmentMetadata(originalSegment, job.getConfiguration(), 
false, false);
 
             this.deletePath(job.getConfiguration(), output);
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
index 2bb8349..2fad4e9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
@@ -18,18 +18,10 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
-
 import java.io.IOException;
 import java.util.Set;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -43,16 +35,13 @@ import com.google.common.base.Preconditions;
 
 public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, 
Text, Text> {
 
-    private MultipleOutputs mos;
-
-    private RowKeySplitter rowKeySplitter;
+    private boolean enableSharding;
     private long baseCuboid;
     private Set<Long> recommendCuboids;
 
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
-        mos = new MultipleOutputs(context);
 
         String cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
         String segmentID = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
@@ -64,7 +53,7 @@ public class FilterRecommendCuboidDataMapper extends 
KylinMapper<Text, Text, Tex
         CubeSegment optSegment = cube.getSegmentById(segmentID);
         CubeSegment originalSegment = 
cube.getOriginalSegmentToOptimize(optSegment);
 
-        rowKeySplitter = new RowKeySplitter(originalSegment);
+        enableSharding = originalSegment.isEnableSharding();
         baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
 
         recommendCuboids = cube.getCuboidsRecommend();
@@ -73,35 +62,11 @@ public class FilterRecommendCuboidDataMapper extends 
KylinMapper<Text, Text, Tex
 
     @Override
     public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes());
+        long cuboidID = RowKeySplitter.getCuboidId(key.getBytes(), 
enableSharding);
         if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
             return;
         }
 
-        String baseOutputPath = PathNameCuboidOld;
-        if (cuboidID == baseCuboid) {
-            baseOutputPath = PathNameCuboidBase;
-        }
-        mos.write(key, value, generateFileName(baseOutputPath));
-    }
-
-    @Override
-    public void doCleanup(Context context) throws IOException, 
InterruptedException {
-        mos.close();
-
-        Path outputDirBase = new 
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), 
PathNameCuboidBase);
-        FileSystem fs = FileSystem.get(context.getConfiguration());
-        if (!fs.exists(outputDirBase)) {
-            fs.mkdirs(outputDirBase);
-            SequenceFile
-                    .createWriter(context.getConfiguration(),
-                            SequenceFile.Writer.file(new Path(outputDirBase, 
"part-m-00000")),
-                            SequenceFile.Writer.keyClass(Text.class), 
SequenceFile.Writer.valueClass(Text.class))
-                    .close();
-        }
-    }
-
-    private String generateFileName(String subDir) {
-        return subDir + "/part";
+        context.write(key, value);
     }
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
index 80c483e..4012393 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
@@ -19,21 +19,20 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.util.Locale;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.ConvergeCuboidDataUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,20 +70,15 @@ public class UpdateOldCuboidShardJob extends 
AbstractHadoopJob {
 
             // Mapper
             job.setMapperClass(UpdateOldCuboidShardMapper.class);
-
-            // Reducer
-            job.setNumReduceTasks(0);
-
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Text.class);
 
             // Input
             job.setInputFormatClass(SequenceFileInputFormat.class);
             FileInputFormat.setInputPaths(job, input);
-            // Output
-            //// prevent to create zero-sized default output
-            LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
-            FileOutputFormat.setOutputPath(job, output);
+
+            // Reducer
+            ConvergeCuboidDataUtil.setupReducer(job, originalSegment, output);
 
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
index 3d18bd6..ac1d499 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -18,17 +18,9 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
-import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
-
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.cube.CubeInstance;
@@ -50,9 +42,6 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
 
     private static final Logger logger = 
LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class);
 
-    private MultipleOutputs mos;
-    private long baseCuboid;
-
     private CubeDesc cubeDesc;
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
@@ -64,7 +53,6 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
     @Override
     protected void doSetup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
-        mos = new MultipleOutputs(context);
 
         String cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
         String segmentID = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
@@ -76,7 +64,6 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
         CubeSegment oldSegment = 
cube.getOriginalSegmentToOptimize(cubeSegment);
 
         cubeDesc = cube.getDescriptor();
-        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
 
         rowKeySplitter = new RowKeySplitter(oldSegment);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
@@ -90,11 +77,7 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
         int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        String baseOutputPath = PathNameCuboidOld;
-        if (cuboidID == baseCuboid) {
-            baseOutputPath = PathNameCuboidBase;
-        }
-        mos.write(outputKey, value, generateFileName(baseOutputPath));
+        context.write(outputKey, value);
     }
 
     private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
@@ -104,7 +87,8 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
         int endIdx = startIdx + Long.bitCount(cuboid.getId());
         int offset = 0;
         for (int i = startIdx; i < endIdx; i++) {
-            System.arraycopy(splitBuffers[i].array(), 
splitBuffers[i].offset(), newKeyBodyBuf, offset, splitBuffers[i].length());
+            System.arraycopy(splitBuffers[i].array(), 
splitBuffers[i].offset(), newKeyBodyBuf, offset,
+                    splitBuffers[i].length());
             offset += splitBuffers[i].length();
         }
 
@@ -118,24 +102,4 @@ public class UpdateOldCuboidShardMapper extends 
KylinMapper<Text, Text, Text, Te
 
         return fullKeySize;
     }
-
-    @Override
-    public void doCleanup(Context context) throws IOException, 
InterruptedException {
-        mos.close();
-
-        Path outputDirBase = new 
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), 
PathNameCuboidBase);
-        FileSystem fs = FileSystem.get(context.getConfiguration());
-        if (!fs.exists(outputDirBase)) {
-            fs.mkdirs(outputDirBase);
-            SequenceFile
-                    .createWriter(context.getConfiguration(),
-                            SequenceFile.Writer.file(new Path(outputDirBase, 
"part-m-00000")),
-                            SequenceFile.Writer.keyClass(Text.class), 
SequenceFile.Writer.valueClass(Text.class))
-                    .close();
-        }
-    }
-
-    private String generateFileName(String subDir) {
-        return subDir + "/part";
-    }
 }

Reply via email to