This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 415a43a4c9aa9cf583d7ffa943ab9abec57cc2af
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Mon Jun 15 14:52:53 2020 +0800

    KYLIN-4565 Add cli tool to export base cuboid data
---
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   5 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   3 +
 .../kylin/engine/mr/steps/ExportBaseCuboidJob.java | 115 ++++++++++++++++
 .../engine/mr/steps/ExportBaseCuboidMapper.java    | 123 +++++++++++++++++
 .../apache/kylin/tool/job/BaseCuboidExportCLI.java | 153 +++++++++++++++++++++
 5 files changed, 398 insertions(+), 1 deletion(-)

diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index c463014..c7976df 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -139,7 +139,10 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
             .hasArg().isRequired(true).withDescription("HDFS metadata 
url").create(BatchConstants.ARG_META_URL);
     public static final Option OPTION_HBASE_CONF_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg()
             .isRequired(true).withDescription("HBase config file 
path").create(BatchConstants.ARG_HBASE_CONF_PATH);
-
+    protected static final Option OPTION_DATA_EXPORT_DELIMITER = OptionBuilder
+            
.withArgName(BatchConstants.ARG_DATA_EXPORT_DELIMITER).hasArg().isRequired(false)
+            .withDescription("Data export delimiter for 
columns").create(BatchConstants.ARG_DATA_EXPORT_DELIMITER);
+    
     private static final String MAP_REDUCE_CLASSPATH = 
"mapreduce.application.classpath";
 
     private static final Map<String, KylinConfig> kylinConfigCache = 
Maps.newConcurrentMap();
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d0e2936..2f153c1 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,6 +81,8 @@ public interface BatchConstants {
 
     String CFG_CONVERGE_CUBOID_PARTITION_PARAM = 
"converge.cuboid.partition.param";
 
+    String CFG_DATA_EXPORT_DELIMITER = "data.export.delimiter";
+
     /**
      * command line ARGuments
      */
@@ -116,6 +118,7 @@ public interface BatchConstants {
     String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
     String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dict_part_reduce_stats";
     String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = 
"global_dict_max_distinct_count";
+    String ARG_DATA_EXPORT_DELIMITER = "dataExportDelimiter";
 
     /**
      * logger and counter
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java
new file mode 100644
index 0000000..1a5ee2d
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class ExportBaseCuboidJob extends AbstractHadoopJob {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ExportBaseCuboidJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_DATA_EXPORT_DELIMITER);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String delimiter = getOptionValue(OPTION_DATA_EXPORT_DELIMITER);
+            if (Strings.isNullOrEmpty(delimiter)) {
+                delimiter = ",";
+            }
+
+            logger.info("Starting: " + job.getJobName());
+
+            CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+
+            setJobClasspath(job, cube.getConfig());
+
+            // setup mapper
+            job.setMapperClass(ExportBaseCuboidMapper.class);
+
+            // setup reducer
+            job.setNumReduceTasks(0);
+
+            // Input
+            FileInputFormat.setInputPaths(job, input);
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+
+            // Output
+            //// prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+            FileOutputFormat.setOutputPath(job, output);
+            job.setMapOutputKeyClass(NullWritable.class);
+            job.setMapOutputValueClass(Text.class);
+
+            // set job configuration
+            
job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", 
"false");
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+            
job.getConfiguration().set(BatchConstants.CFG_DATA_EXPORT_DELIMITER, delimiter);
+            // add metadata to distributed cache
+            attachSegmentMetadataWithDict(cubeSeg, job.getConfiguration());
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            logger.error("error in ExportBaseCuboidJob", e);
+            printUsage(options);
+            throw e;
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java
new file mode 100644
index 0000000..aa8a772
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.RowKeyDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExportBaseCuboidMapper extends KylinMapper<Text, Text, 
NullWritable, Text> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ExportBaseCuboidMapper.class);
+
+    private MultipleOutputs mos;
+    private String delimiter;
+
+    private RowKeyDecoder rowKeyDecoder;
+    private long baseCuboidId;
+
+    private TblColRef partCol;
+    private int partitionColumnIndex = -1;
+
+    private BufferedMeasureCodec codec;
+    private Object[] measureResults;
+
+    private int count = 0;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        mos = new MultipleOutputs(context);
+
+        String cubeName = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        String segmentID = 
context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+        delimiter = 
context.getConfiguration().get(BatchConstants.CFG_DATA_EXPORT_DELIMITER);
+
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(cubeName);
+        CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+
+        baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+
+        rowKeyDecoder = new RowKeyDecoder(cubeSeg);
+        partCol = 
cube.getModel().getPartitionDesc().getPartitionDateColumnRef();
+        RowKeyDesc rowKeyDesc = cube.getDescriptor().getRowkey();
+        partitionColumnIndex = rowKeyDesc.getRowKeyColumns().length - 
rowKeyDesc.getColumnBitIndex(partCol) - 1;
+
+        List<MeasureDesc> measuresDescs = cube.getDescriptor().getMeasures();
+        codec = new BufferedMeasureCodec(measuresDescs);
+        measureResults = new Object[measuresDescs.size()];
+    }
+
+    @Override
+    public void doMap(Text key, Text value, Context context) throws 
IOException, InterruptedException {
+        long cuboidID = rowKeyDecoder.decode(key.getBytes());
+        if (cuboidID != baseCuboidId) {
+            return; // Skip data from cuboids which are not the base cuboid
+        }
+
+        List<String> keys = rowKeyDecoder.getValues();
+        String baseOutputPath = partitionColumnIndex < 0 ? "NoPartition"
+                : partCol.getName() + "=" + keys.remove(partitionColumnIndex);
+
+        codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), 
measureResults);
+
+        StringBuilder sb = new StringBuilder();
+        sb.append(StringUtils.join(keys, delimiter));
+        if (measureResults.length > 0) {
+            sb.append(delimiter);
+            sb.append(StringUtils.join(measureResults, delimiter));
+        }
+
+        if (count++ % 10000 == 0) {
+            logger.info("base cuboid value: " + sb.toString());
+        }
+        mos.write(NullWritable.get(), sb.toString(), 
generateFileName(baseOutputPath));
+    }
+
+    @Override
+    public void doCleanup(Context context) throws IOException, 
InterruptedException {
+        mos.close();
+    }
+
+    private String generateFileName(String subDir) {
+        return subDir + "/part";
+    }
+}
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java
new file mode 100644
index 0000000..4c87704
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.tool.job;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.ExportBaseCuboidJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+public class BaseCuboidExportCLI extends AbstractApplication {
+    private static final Logger logger = 
LoggerFactory.getLogger(BaseCuboidExportCLI.class);
+
+    private static final Option OPTION_CUBE = 
OptionBuilder.withArgName("cube").hasArg().isRequired(true)
+            .withDescription("Specify for which cube to export 
data").create("cube");
+    private static final Option OPTION_OUTPUT = 
OptionBuilder.withArgName("output").hasArg().isRequired(true)
+            .withDescription("Specify for output path for the exported 
data").create("output");
+    private static final Option OPTION_TIME_START = 
OptionBuilder.withArgName("startTime").hasArg().isRequired(false)
+            .withDescription("Specify a start time for filtering which 
segment's data should be exported")
+            .create("startTime");
+    private static final Option OPTION_TIME_END = 
OptionBuilder.withArgName("endTime").hasArg().isRequired(false)
+            .withDescription("Specify a end time for filtering which segment's 
data should be exported")
+            .create("endTime");
+    private static final Option OPTION_DELIMITER = 
OptionBuilder.withArgName("delimiter").hasArg().isRequired(false)
+            .withDescription("Specify the delimiter for the exported column 
data").create("delimiter");
+
+    private final Options options;
+
+    public BaseCuboidExportCLI() {
+        options = new Options();
+        options.addOption(OPTION_CUBE);
+        options.addOption(OPTION_OUTPUT);
+        options.addOption(OPTION_TIME_START);
+        options.addOption(OPTION_TIME_END);
+        options.addOption(OPTION_DELIMITER);
+    }
+
+    protected Options getOptions() {
+        return options;
+    }
+
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+        String outputDir = optionsHelper.getOptionValue(OPTION_OUTPUT);
+        long startTime = 
parseTime(optionsHelper.getOptionValue(OPTION_TIME_START));
+        long endTime = 
parseTime(optionsHelper.getOptionValue(OPTION_TIME_END));
+        if (endTime < 0) {
+            endTime = Long.MAX_VALUE;
+        }
+        if (startTime >= endTime) {
+            throw new RuntimeException("start time " + startTime + " should be 
less than end time " + endTime);
+        }
+        String delimiter = optionsHelper.getOptionValue(OPTION_DELIMITER);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        if (cubeInstance == null) {
+            throw new RuntimeException("cannot find cube " + cubeName);
+        }
+        Configuration hConf = HadoopUtil.getCurrentConfiguration();
+        FileSystem fs = FileSystem.get(hConf);
+        Path outputDirPath = new Path(outputDir);
+        if (!fs.exists(outputDirPath)) {
+            logger.info("Path " + outputDir + " does not exist. Will create 
one");
+            fs.mkdirs(outputDirPath);
+        } else if (!fs.isDirectory(outputDirPath)) {
+            throw new RuntimeException("Path " + outputDir + " exists. But 
it's not a directory");
+        }
+
+        for (CubeSegment cubeSeg : 
cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            if (!(startTime <= cubeSeg.getTSRange().start.v && endTime >= 
cubeSeg.getTSRange().end.v)) {
+                logger.info("Will skip segment " + cubeSeg.getName() + ", 
since it's out of the time range ["
+                        + startTime + "," + endTime + ")");
+                continue;
+            }
+
+            JobBuilderSupport jobSupport = new JobBuilderSupport(cubeSeg, 
"ADMIN");
+            String jobName = "Export_Base_Cuboid_Data_for_Cube_" + cubeName + 
"_Segment_" + cubeSeg.getName();
+            String segmentID = cubeSeg.getUuid();
+            String inputPath = jobSupport.getCuboidRootPath(cubeSeg) + "*";
+            String outputPath = outputDir + "/" + cubeSeg.getName();
+
+            StringBuilder cmd = new StringBuilder();
+            jobSupport.appendMapReduceParameters(cmd);
+            jobSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_CUBE_NAME, cubeName);
+            jobSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_SEGMENT_ID, segmentID);
+            jobSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_JOB_NAME, jobName);
+            jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, 
inputPath);
+            jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
outputPath);
+            if (!Strings.isNullOrEmpty(delimiter)) {
+                jobSupport.appendExecCmdParameters(cmd, 
BatchConstants.ARG_DATA_EXPORT_DELIMITER, delimiter);
+            }
+
+            String[] args = cmd.toString().trim().split("\\s+");
+            ExportBaseCuboidJob mrJob = new ExportBaseCuboidJob();
+            mrJob.setConf(hConf);
+            MRUtil.runMRJob(mrJob, args);
+        }
+    }
+
+    private long parseTime(String timeStr) {
+        long time = -1;
+        if (!Strings.isNullOrEmpty(timeStr)) {
+            time = Long.parseLong(timeStr);
+        }
+        return time;
+    }
+
+    public static void main(String[] args) {
+        BaseCuboidExportCLI cli = new BaseCuboidExportCLI();
+        try {
+            cli.execute(args);
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error start exporting cube data", e);
+            System.exit(-1);
+        }
+    }
+}

Reply via email to