This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 34a76dfd912b995d0772b1f280a80ca04e7167fc
Author: yanghua <yanghua1...@gmail.com>
AuthorDate: Tue Jul 16 15:33:58 2019 +0800

    KYLIN-4087 Use reduceGroup operator to optimize build cube step
---
 .../kylin/engine/flink/FlinkCubingByLayer.java     | 91 ++++++++++++++++++++--
 1 file changed, 86 insertions(+), 5 deletions(-)

diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
index 1551788..9e6f86f 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.DataSet;
@@ -180,11 +181,11 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
             totalCount = encodedBaseDataSet.count();
         }
 
-        final BaseCuboidReduceFunction baseCuboidReducerFunction = new 
BaseCuboidReduceFunction(cubeName, metaUrl, sConf);
+        final BaseCuboidReduceGroupFunction baseCuboidReducerFunction = new 
BaseCuboidReduceGroupFunction(cubeName, metaUrl, sConf);
 
-        BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction;
+        BaseCuboidReduceGroupFunction reducerFunction = 
baseCuboidReducerFunction;
         if (!allNormalMeasure) {
-            reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, 
sConf, needAggr);
+            reducerFunction = new CuboidReduceGroupFunction(cubeName, metaUrl, 
sConf, needAggr);
         }
 
         final int totalLevels = 
cubeSegment.getCuboidScheduler().getBuildLevel();
@@ -192,14 +193,14 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
         int level = 0;
 
         // aggregate to calculate base cuboid
-        allDataSets[0] = 
encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction);
+        allDataSets[0] = 
encodedBaseDataSet.groupBy(0).reduceGroup(baseCuboidReducerFunction);
 
         sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 
0, Job.getInstance(), envConfig);
 
         CuboidFlatMapFunction flatMapFunction = new 
CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf);
 
         for (level = 1; level <= totalLevels; level++) {
-            allDataSets[level] = allDataSets[level - 
1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction);
+            allDataSets[level] = allDataSets[level - 
1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction);
             if (envConfig.isFlinkSanityCheckEnabled()) {
                 sanityCheck(allDataSets[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
@@ -321,6 +322,53 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
         }
     }
 
+    private static class BaseCuboidReduceGroupFunction extends 
RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, 
Object[]>> {
+
+        protected String cubeName;
+        protected String metaUrl;
+        protected CubeDesc cubeDesc;
+        protected int measureNum;
+        protected MeasureAggregators aggregators;
+        protected SerializableConfiguration conf;
+
+        public BaseCuboidReduceGroupFunction(String cubeName, String metaUrl, 
SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
+                cubeDesc = cubeInstance.getDescriptor();
+                aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+                measureNum = cubeDesc.getMeasures().size();
+            }
+        }
+
+        @Override
+        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, 
Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            Object[] result = null;
+            ByteArray key = null;
+
+            for (Tuple2<ByteArray, Object[]> item : iterable) {
+                key = item.f0;
+                if (result == null) {
+                    result = item.f1;
+                } else {
+                    Object[] temp = new Object[measureNum];
+                    aggregators.aggregate(item.f1, result, temp);
+                    result = temp;
+                }
+            }
+
+            collector.collect(new Tuple2<>(key, result));
+        }
+    }
+
     /**
      * A reduce function used to aggregate base cuboid.
      */
@@ -361,6 +409,39 @@ public class FlinkCubingByLayer extends 
AbstractApplication implements Serializa
 
     }
 
+    private static class CuboidReduceGroupFunction extends 
BaseCuboidReduceGroupFunction {
+        private boolean[] needAgg;
+
+        public CuboidReduceGroupFunction(String cubeName, String metaUrl, 
SerializableConfiguration conf, boolean[] needAgg) {
+            super(cubeName, metaUrl, conf);
+            this.needAgg = needAgg;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+        }
+
+        @Override
+        public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, 
Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception {
+            Object[] result = null;
+            ByteArray key = null;
+
+            for (Tuple2<ByteArray, Object[]> item : iterable) {
+                key = item.f0;
+                if (result == null) {
+                    result = item.f1;
+                } else {
+                    Object[] temp = new Object[measureNum];
+                    aggregators.aggregate(item.f1, result, temp, needAgg);
+                    result = temp;
+                }
+            }
+
+            collector.collect(new Tuple2<>(key, result));
+        }
+    }
+
     /**
      * A reduce function does aggregation based on boolean flag array.
      */

Reply via email to