This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new 4f04060  KYLIN-3851 Flink cubing step : merge dictionary
4f04060 is described below

commit 4f04060b11e20298b84978d40582c952ed41b03d
Author: yanghua <yanghua1...@gmail.com>
AuthorDate: Wed Mar 20 20:39:29 2019 +0800

    KYLIN-3851 Flink cubing step : merge dictionary
---
 .../engine/flink/FlinkBatchMergeJobBuilder2.java   |  46 ++--
 .../kylin/engine/flink/FlinkMergingDictionary.java | 297 +++++++++++++++++++++
 .../engine/mr/steps/UpdateDictionaryStep.java      |   2 +-
 3 files changed, 320 insertions(+), 25 deletions(-)

diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
index 155ef47..8a5635f 100644
--- 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkBatchMergeJobBuilder2.java
@@ -23,11 +23,7 @@ import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +62,7 @@ public class FlinkBatchMergeJobBuilder2 extends 
JobBuilderSupport {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryMRStep(cubeSegment, jobId, 
mergingSegmentIds));
+        result.addTask(createMergeDictionaryFlinkStep(cubeSegment, jobId, 
mergingSegmentIds));
         result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, 
mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
@@ -83,25 +79,27 @@ public class FlinkBatchMergeJobBuilder2 extends 
JobBuilderSupport {
         return result;
     }
 
-    public MapReduceExecutable createMergeDictionaryMRStep(CubeSegment seg, 
String jobID, List<String> mergingSegmentIds) {
-        MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
-        
mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
-
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, 
getSegmentMetadataUrl(seg.getConfig(), jobID));
-        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, 
getSegmentMetadataUrl(seg.getConfig(), jobID));
-        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), 
StringUtil.join(mergingSegmentIds, ","));
-        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
-        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeDictionaryStep.setMapReduceParams(cmd.toString());
-        mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
-
-        return mergeDictionaryStep;
+    public FlinkExecutable createMergeDictionaryFlinkStep(CubeSegment seg, 
String jobID, List<String> mergingSegmentIds) {
+        final FlinkExecutable flinkExecutable = new FlinkExecutable();
+        flinkExecutable.setClassName(FlinkMergingDictionary.class.getName());
+
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), jobID));
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_MERGE_SEGMENT_IDS.getOpt(),
 StringUtil.join(mergingSegmentIds, ","));
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_OUTPUT_PATH_DICT.getOpt(),
 getDictInfoPath(jobID));
+        
flinkExecutable.setParam(FlinkMergingDictionary.OPTION_OUTPUT_PATH_STAT.getOpt(),
 getStatisticsPath(jobID));
+
+        flinkExecutable.setJobId(jobID);
+        
flinkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        
flinkExecutable.setFlinkConfigName(ExecutableConstants.FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, 
seg.getConfig().getFlinkAdditionalJars());
+        flinkExecutable.setJars(jars.toString());
+
+        return flinkExecutable;
     }
 
     public FlinkExecutable createMergeCuboidDataFlinkStep(CubeSegment seg, 
List<CubeSegment> mergingSegments, String jobID) {
diff --git 
a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
new file mode 100644
index 0000000..3839535
--- /dev/null
+++ 
b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkMergingDictionary.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.flink;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Flink Cubing merge step : merge dictionary.
+ */
+public class FlinkMergingDictionary extends AbstractApplication implements 
Serializable {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(FlinkMergingDictionary.class);
+
+    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_MERGE_SEGMENT_IDS = 
OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment 
Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = 
OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource 
path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = 
OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource 
path").create("statOutputPath");
+
+    private Options options;
+
+    public FlinkMergingDictionary() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_MERGE_SEGMENT_IDS);
+        options.addOption(OPTION_OUTPUT_PATH_DICT);
+        options.addOption(OPTION_OUTPUT_PATH_STAT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = 
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String segmentIds = 
optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+        final String dictOutputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
+        final String statOutputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+        final Job job = Job.getInstance();
+
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        HadoopUtil.deletePath(job.getConfiguration(), new 
Path(dictOutputPath));
+
+        final SerializableConfiguration sConf = new 
SerializableConfiguration(job.getConfiguration());
+        final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+
+        logger.info("Dictionary output path: {}", dictOutputPath);
+        logger.info("Statistics output path: {}", statOutputPath);
+
+        final TblColRef[] tblColRefs = 
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+        final int columnLength = tblColRefs.length;
+
+        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+
+        for (int i = 0; i <= columnLength; i++) {
+            indexs.add(i);
+        }
+
+        DataSource<Integer> indexDS = env.fromCollection(indexs);
+
+        DataSet<Tuple2<Text, Text>> colToDictPathDS = indexDS.map(new 
MergeDictAndStatsFunction(cubeName,
+                metaUrl, segmentId, StringUtil.splitByComma(segmentIds), 
statOutputPath, tblColRefs, sConf))
+                .setParallelism(columnLength + 1);
+
+        FlinkUtil.setHadoopConfForCuboid(job, null, null);
+        HadoopOutputFormat<Text, Text> hadoopOF =
+                new HadoopOutputFormat<>(new SequenceFileOutputFormat<>(), 
job);
+        SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
+
+        colToDictPathDS.output(hadoopOF).setParallelism(1);
+
+        env.execute("Merge dictionary for cube:" + cubeName + ", segment " + 
segmentId);
+    }
+
+    public static class MergeDictAndStatsFunction extends 
RichMapFunction<Integer, Tuple2<Text, Text>> {
+
+        private String cubeName;
+        private String metaUrl;
+        private String segmentId;
+        private String[] segmentIds;
+        private String statOutputPath;
+        private TblColRef[] tblColRefs;
+        private SerializableConfiguration conf;
+        private transient DictionaryManager dictMgr;
+        private KylinConfig kylinConfig;
+        private List<CubeSegment> mergingSegments;
+
+        public MergeDictAndStatsFunction(String cubeName, String metaUrl, 
String segmentId, String[] segmentIds,
+                String statOutputPath, TblColRef[] tblColRefs, 
SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.segmentId = segmentId;
+            this.segmentIds = segmentIds;
+            this.statOutputPath = statOutputPath;
+            this.tblColRefs = tblColRefs;
+            this.conf = conf;
+        }
+
+        @Override
+        public void open(org.apache.flink.configuration.Configuration 
parameters) throws Exception {
+            kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, 
metaUrl);
+            try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                    .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                dictMgr = DictionaryManager.getInstance(kylinConfig);
+                mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+            }
+        }
+
+        @Override
+        public Tuple2<Text, Text> map(Integer index) throws Exception {
+            if (index < tblColRefs.length) {
+                // merge dictionary
+                TblColRef col = tblColRefs[index];
+                List<DictionaryInfo> dictInfos = Lists.newArrayList();
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                        .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    for (CubeSegment segment : mergingSegments) {
+                        if (segment.getDictResPath(col) != null) {
+                            DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                            if (dictInfo != null && 
!dictInfos.contains(dictInfo)) {
+                                dictInfos.add(dictInfo);
+                            }
+                        }
+                    }
+
+                    DictionaryInfo mergedDictInfo = 
dictMgr.mergeDictionary(dictInfos);
+                    String tblCol = col.getTableAlias() + ":" + col.getName();
+                    String dictInfoPath = mergedDictInfo == null ? "" : 
mergedDictInfo.getResourcePath();
+
+                    return new Tuple2<>(new Text(tblCol), new 
Text(dictInfoPath));
+                }
+            } else {
+                // merge statistics
+                try (KylinConfig.SetAndUnsetThreadLocalConfig autoUnset = 
KylinConfig
+                        .setAndUnsetThreadLocalConfig(kylinConfig)) {
+                    CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                    CubeSegment newSegment = 
cubeInstance.getSegmentById(segmentId);
+                    ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+                    Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+                    Configuration conf = null;
+                    int averageSamplingPercentage = 0;
+
+                    for (CubeSegment cubeSegment : mergingSegments) {
+                        String filePath = 
cubeSegment.getStatisticsResourcePath();
+
+                        File tempFile = File.createTempFile(segmentId, ".seq");
+
+                        try(InputStream is = 
rs.getResource(filePath).content();
+                            FileOutputStream tempFileStream = new 
FileOutputStream(tempFile)) {
+
+                            org.apache.commons.io.IOUtils.copy(is, 
tempFileStream);
+                        }
+
+                        FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
+
+                        conf = HadoopUtil.getCurrentConfiguration();
+
+                        try(SequenceFile.Reader reader = new 
SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf)) {
+                            //noinspection deprecation
+                            LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                            BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                            while (reader.next(key, value)) {
+                                if (key.get() == 0L) {
+                                    // sampling percentage
+                                    averageSamplingPercentage += 
Bytes.toInt(value.getBytes());
+                                } else if (key.get() > 0) {
+                                    HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                                    ByteArray byteArray = new 
ByteArray(value.getBytes());
+                                    hll.readRegisters(byteArray.asBuffer());
+
+                                    if (cuboidHLLMap.get(key.get()) != null) {
+                                        cuboidHLLMap.get(key.get()).merge(hll);
+                                    } else {
+                                        cuboidHLLMap.put(key.get(), hll);
+                                    }
+                                }
+                            }
+                        }
+                    }
+
+                    averageSamplingPercentage = averageSamplingPercentage / 
mergingSegments.size();
+                    CubeStatsWriter.writeCuboidStatistics(conf, new 
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+                    Path statisticsFilePath = new Path(statOutputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+                    FileSystem fs = 
HadoopUtil.getFileSystem(statisticsFilePath, conf);
+                    FSDataInputStream fis = fs.open(statisticsFilePath);
+
+                    try {
+                        // put the statistics to metadata store
+                        String statisticsFileName = 
newSegment.getStatisticsResourcePath();
+                        rs.putResource(statisticsFileName, fis, 
System.currentTimeMillis());
+                    } finally {
+                        IOUtils.closeStream(fis);
+                    }
+
+                    return new Tuple2<>(new Text(""), new Text(""));
+                }
+            }
+        }
+
+        private List<CubeSegment> getMergingSegments(CubeInstance cube, 
String[] segmentIds) {
+            List<CubeSegment> result = 
Lists.newArrayListWithCapacity(segmentIds.length);
+            for (String id : segmentIds) {
+                result.add(cube.getSegmentById(id));
+            }
+            return result;
+        }
+    }
+
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
index a68f215..43b537a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -89,7 +89,7 @@ public class UpdateDictionaryStep extends AbstractExecutable {
             FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), 
new PathFilter() {
                 @Override
                 public boolean accept(Path path) {
-                    return path.getName().startsWith("part");
+                    return path.getName().startsWith("part") || 
path.getName().startsWith("tmp");
                 }
             });
 

Reply via email to