This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch engine-flink in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 34a76dfd912b995d0772b1f280a80ca04e7167fc Author: yanghua <yanghua1...@gmail.com> AuthorDate: Tue Jul 16 15:33:58 2019 +0800 KYLIN-4087 Use reduceGroup operator to optimize build cube step --- .../kylin/engine/flink/FlinkCubingByLayer.java | 91 ++++++++++++++++++++-- 1 file changed, 86 insertions(+), 5 deletions(-) diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java index 1551788..9e6f86f 100644 --- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java +++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkCubingByLayer.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.DataSet; @@ -180,11 +181,11 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa totalCount = encodedBaseDataSet.count(); } - final BaseCuboidReduceFunction baseCuboidReducerFunction = new BaseCuboidReduceFunction(cubeName, metaUrl, sConf); + final BaseCuboidReduceGroupFunction baseCuboidReducerFunction = new BaseCuboidReduceGroupFunction(cubeName, metaUrl, sConf); - BaseCuboidReduceFunction reducerFunction = baseCuboidReducerFunction; + BaseCuboidReduceGroupFunction reducerFunction = baseCuboidReducerFunction; if (!allNormalMeasure) { - reducerFunction = new CuboidReduceFunction(cubeName, metaUrl, sConf, needAggr); + reducerFunction = new CuboidReduceGroupFunction(cubeName, metaUrl, sConf, needAggr); } final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel(); @@ -192,14 +193,14 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa int level = 0; // aggregate to calculate base cuboid - allDataSets[0] = encodedBaseDataSet.groupBy(0).reduce(baseCuboidReducerFunction); + allDataSets[0] = encodedBaseDataSet.groupBy(0).reduceGroup(baseCuboidReducerFunction); sinkToHDFS(allDataSets[0], metaUrl, cubeName, cubeSegment, outputPath, 0, Job.getInstance(), envConfig); CuboidFlatMapFunction flatMapFunction = new CuboidFlatMapFunction(cubeName, segmentId, metaUrl, sConf); for (level = 1; level <= totalLevels; level++) { - allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduce(reducerFunction); + allDataSets[level] = allDataSets[level - 1].flatMap(flatMapFunction).groupBy(0).reduceGroup(reducerFunction); if (envConfig.isFlinkSanityCheckEnabled()) { sanityCheck(allDataSets[level], totalCount, level, cubeStatsReader, countMeasureIndex); } @@ -321,6 +322,53 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa } } + private static class BaseCuboidReduceGroupFunction extends RichGroupReduceFunction<Tuple2<ByteArray, Object[]>, Tuple2<ByteArray, Object[]>> { + + protected String cubeName; + protected String metaUrl; + protected CubeDesc cubeDesc; + protected int measureNum; + protected MeasureAggregators aggregators; + protected SerializableConfiguration conf; + + public BaseCuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf) { + this.cubeName = cubeName; + this.metaUrl = metaUrl; + this.conf = conf; + } + + @Override + public void open(Configuration parameters) throws Exception { + KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl); + try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = KylinConfig + .setAndUnsetThreadLocalConfig(kConfig)) { + CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName); + cubeDesc = cubeInstance.getDescriptor(); + aggregators = new MeasureAggregators(cubeDesc.getMeasures()); + measureNum = cubeDesc.getMeasures().size(); + } + } + + @Override + public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception { + Object[] result = null; + ByteArray key = null; + + for (Tuple2<ByteArray, Object[]> item : iterable) { + key = item.f0; + if (result == null) { + result = item.f1; + } else { + Object[] temp = new Object[measureNum]; + aggregators.aggregate(item.f1, result, temp); + result = temp; + } + } + + collector.collect(new Tuple2<>(key, result)); + } + } + /** * A reduce function used to aggregate base cuboid. */ @@ -361,6 +409,39 @@ public class FlinkCubingByLayer extends AbstractApplication implements Serializa } + private static class CuboidReduceGroupFunction extends BaseCuboidReduceGroupFunction { + private boolean[] needAgg; + + public CuboidReduceGroupFunction(String cubeName, String metaUrl, SerializableConfiguration conf, boolean[] needAgg) { + super(cubeName, metaUrl, conf); + this.needAgg = needAgg; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + } + + @Override + public void reduce(Iterable<Tuple2<ByteArray, Object[]>> iterable, Collector<Tuple2<ByteArray, Object[]>> collector) throws Exception { + Object[] result = null; + ByteArray key = null; + + for (Tuple2<ByteArray, Object[]> item : iterable) { + key = item.f0; + if (result == null) { + result = item.f1; + } else { + Object[] temp = new Object[measureNum]; + aggregators.aggregate(item.f1, result, temp, needAgg); + result = temp; + } + } + + collector.collect(new Tuple2<>(key, result)); + } + } + /** * A reduce function does aggregation based on boolean flag array. */