This is an automated email from the ASF dual-hosted git repository. nju_yaho pushed a commit to tag ebay-3.1.0-release-20200701 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 415a43a4c9aa9cf583d7ffa943ab9abec57cc2af Author: Zhong, Yanghong <nju_y...@apache.org> AuthorDate: Mon Jun 15 14:52:53 2020 +0800 KYLIN-4565 Add cli tool to export base cuboid data --- .../kylin/engine/mr/common/AbstractHadoopJob.java | 5 +- .../kylin/engine/mr/common/BatchConstants.java | 3 + .../kylin/engine/mr/steps/ExportBaseCuboidJob.java | 115 ++++++++++++++++ .../engine/mr/steps/ExportBaseCuboidMapper.java | 123 +++++++++++++++++ .../apache/kylin/tool/job/BaseCuboidExportCLI.java | 153 +++++++++++++++++++++ 5 files changed, 398 insertions(+), 1 deletion(-) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index c463014..c7976df 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -139,7 +139,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL); public static final Option OPTION_HBASE_CONF_PATH = OptionBuilder.withArgName(BatchConstants.ARG_HBASE_CONF_PATH).hasArg() .isRequired(true).withDescription("HBase config file path").create(BatchConstants.ARG_HBASE_CONF_PATH); - + protected static final Option OPTION_DATA_EXPORT_DELIMITER = OptionBuilder + .withArgName(BatchConstants.ARG_DATA_EXPORT_DELIMITER).hasArg().isRequired(false) + .withDescription("Data export delimiter for columns").create(BatchConstants.ARG_DATA_EXPORT_DELIMITER); + private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; private static final Map<String, KylinConfig> kylinConfigCache = Maps.newConcurrentMap(); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index d0e2936..2f153c1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -81,6 +81,8 @@ public interface BatchConstants { String CFG_CONVERGE_CUBOID_PARTITION_PARAM = "converge.cuboid.partition.param"; + String CFG_DATA_EXPORT_DELIMITER = "data.export.delimiter"; + /** * command line ARGuments */ @@ -116,6 +118,7 @@ public interface BatchConstants { String ARG_BASE64_ENCODED_SQL = "base64EncodedSql"; String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dict_part_reduce_stats"; String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "global_dict_max_distinct_count"; + String ARG_DATA_EXPORT_DELIMITER = "dataExportDelimiter"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java new file mode 100644 index 0000000..1a5ee2d --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidJob.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class ExportBaseCuboidJob extends AbstractHadoopJob { + + private static final Logger logger = LoggerFactory.getLogger(ExportBaseCuboidJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_DATA_EXPORT_DELIMITER); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String delimiter = getOptionValue(OPTION_DATA_EXPORT_DELIMITER); + if (Strings.isNullOrEmpty(delimiter)) { + delimiter = ","; + } + + logger.info("Starting: " + job.getJobName()); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment cubeSeg = cube.getSegmentById(segmentID); + + setJobClasspath(job, cube.getConfig()); + + // setup mapper + job.setMapperClass(ExportBaseCuboidMapper.class); + + // setup reducer + job.setNumReduceTasks(0); + + // Input + FileInputFormat.setInputPaths(job, input); + job.setInputFormatClass(SequenceFileInputFormat.class); + + // Output + //// prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + FileOutputFormat.setOutputPath(job, output); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(Text.class); + + // set job configuration + job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set(BatchConstants.CFG_DATA_EXPORT_DELIMITER, delimiter); + // add metadata to distributed cache + attachSegmentMetadataWithDict(cubeSeg, job.getConfiguration()); + + this.deletePath(job.getConfiguration(), output); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in ExportBaseCuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java new file mode 100644 index 0000000..aa8a772 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExportBaseCuboidMapper.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.kv.RowKeyDecoder; +import org.apache.kylin.cube.model.RowKeyDesc; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExportBaseCuboidMapper extends KylinMapper<Text, Text, NullWritable, Text> { + + private static final Logger logger = LoggerFactory.getLogger(ExportBaseCuboidMapper.class); + + private MultipleOutputs mos; + private String delimiter; + + private RowKeyDecoder rowKeyDecoder; + private long baseCuboidId; + + private TblColRef partCol; + private int partitionColumnIndex = -1; + + private BufferedMeasureCodec codec; + private Object[] measureResults; + + private int count = 0; + + @Override + protected void doSetup(Context context) throws IOException { + super.bindCurrentConfiguration(context.getConfiguration()); + mos = new MultipleOutputs(context); + + String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); + String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); + delimiter = context.getConfiguration().get(BatchConstants.CFG_DATA_EXPORT_DELIMITER); + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); + CubeSegment cubeSeg = cube.getSegmentById(segmentID); + + baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId(); + + rowKeyDecoder = new RowKeyDecoder(cubeSeg); + partCol = cube.getModel().getPartitionDesc().getPartitionDateColumnRef(); + RowKeyDesc rowKeyDesc = cube.getDescriptor().getRowkey(); + partitionColumnIndex = rowKeyDesc.getRowKeyColumns().length - rowKeyDesc.getColumnBitIndex(partCol) - 1; + + List<MeasureDesc> measuresDescs = cube.getDescriptor().getMeasures(); + codec = new BufferedMeasureCodec(measuresDescs); + measureResults = new Object[measuresDescs.size()]; + } + + @Override + public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { + long cuboidID = rowKeyDecoder.decode(key.getBytes()); + if (cuboidID != baseCuboidId) { + return; // Skip data from cuboids which are not the base cuboid + } + + List<String> keys = rowKeyDecoder.getValues(); + String baseOutputPath = partitionColumnIndex < 0 ? "NoPartition" + : partCol.getName() + "=" + keys.remove(partitionColumnIndex); + + codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureResults); + + StringBuilder sb = new StringBuilder(); + sb.append(StringUtils.join(keys, delimiter)); + if (measureResults.length > 0) { + sb.append(delimiter); + sb.append(StringUtils.join(measureResults, delimiter)); + } + + if (count++ % 10000 == 0) { + logger.info("base cuboid value: " + sb.toString()); + } + mos.write(NullWritable.get(), sb.toString(), generateFileName(baseOutputPath)); + } + + @Override + public void doCleanup(Context context) throws IOException, InterruptedException { + mos.close(); + } + + private String generateFileName(String subDir) { + return subDir + "/part"; + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java b/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java new file mode 100644 index 0000000..4c87704 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/job/BaseCuboidExportCLI.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool.job; + +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.steps.ExportBaseCuboidJob; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class BaseCuboidExportCLI extends AbstractApplication { + private static final Logger logger = LoggerFactory.getLogger(BaseCuboidExportCLI.class); + + private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(true) + .withDescription("Specify for which cube to export data").create("cube"); + private static final Option OPTION_OUTPUT = OptionBuilder.withArgName("output").hasArg().isRequired(true) + .withDescription("Specify for output path for the exported data").create("output"); + private static final Option OPTION_TIME_START = OptionBuilder.withArgName("startTime").hasArg().isRequired(false) + .withDescription("Specify a start time for filtering which segment's data should be exported") + .create("startTime"); + private static final Option OPTION_TIME_END = OptionBuilder.withArgName("endTime").hasArg().isRequired(false) + .withDescription("Specify a end time for filtering which segment's data should be exported") + .create("endTime"); + private static final Option OPTION_DELIMITER = OptionBuilder.withArgName("delimiter").hasArg().isRequired(false) + .withDescription("Specify the delimiter for the exported column data").create("delimiter"); + + private final Options options; + + public BaseCuboidExportCLI() { + options = new Options(); + options.addOption(OPTION_CUBE); + options.addOption(OPTION_OUTPUT); + options.addOption(OPTION_TIME_START); + options.addOption(OPTION_TIME_END); + options.addOption(OPTION_DELIMITER); + } + + protected Options getOptions() { + return options; + } + + protected void execute(OptionsHelper optionsHelper) throws Exception { + String cubeName = optionsHelper.getOptionValue(OPTION_CUBE); + String outputDir = optionsHelper.getOptionValue(OPTION_OUTPUT); + long startTime = parseTime(optionsHelper.getOptionValue(OPTION_TIME_START)); + long endTime = parseTime(optionsHelper.getOptionValue(OPTION_TIME_END)); + if (endTime < 0) { + endTime = Long.MAX_VALUE; + } + if (startTime >= endTime) { + throw new RuntimeException("start time " + startTime + " should be less than end time " + endTime); + } + String delimiter = optionsHelper.getOptionValue(OPTION_DELIMITER); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeManager = CubeManager.getInstance(kylinConfig); + CubeInstance cubeInstance = cubeManager.getCube(cubeName); + if (cubeInstance == null) { + throw new RuntimeException("cannot find cube " + cubeName); + } + Configuration hConf = HadoopUtil.getCurrentConfiguration(); + FileSystem fs = FileSystem.get(hConf); + Path outputDirPath = new Path(outputDir); + if (!fs.exists(outputDirPath)) { + logger.info("Path " + outputDir + " does not exist. Will create one"); + fs.mkdirs(outputDirPath); + } else if (!fs.isDirectory(outputDirPath)) { + throw new RuntimeException("Path " + outputDir + " exists. But it's not a directory"); + } + + for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { + if (!(startTime <= cubeSeg.getTSRange().start.v && endTime >= cubeSeg.getTSRange().end.v)) { + logger.info("Will skip segment " + cubeSeg.getName() + ", since it's out of the time range [" + + startTime + "," + endTime + ")"); + continue; + } + + JobBuilderSupport jobSupport = new JobBuilderSupport(cubeSeg, "ADMIN"); + String jobName = "Export_Base_Cuboid_Data_for_Cube_" + cubeName + "_Segment_" + cubeSeg.getName(); + String segmentID = cubeSeg.getUuid(); + String inputPath = jobSupport.getCuboidRootPath(cubeSeg) + "*"; + String outputPath = outputDir + "/" + cubeSeg.getName(); + + StringBuilder cmd = new StringBuilder(); + jobSupport.appendMapReduceParameters(cmd); + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, cubeName); + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, segmentID); + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, jobName); + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + if (!Strings.isNullOrEmpty(delimiter)) { + jobSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_DATA_EXPORT_DELIMITER, delimiter); + } + + String[] args = cmd.toString().trim().split("\\s+"); + ExportBaseCuboidJob mrJob = new ExportBaseCuboidJob(); + mrJob.setConf(hConf); + MRUtil.runMRJob(mrJob, args); + } + } + + private long parseTime(String timeStr) { + long time = -1; + if (!Strings.isNullOrEmpty(timeStr)) { + time = Long.parseLong(timeStr); + } + return time; + } + + public static void main(String[] args) { + BaseCuboidExportCLI cli = new BaseCuboidExportCLI(); + try { + cli.execute(args); + System.exit(0); + } catch (Exception e) { + logger.error("error start exporting cube data", e); + System.exit(-1); + } + } +}