http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java deleted file mode 100644 index 698a978..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ /dev/null @@ -1,552 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop; - -/** - * @author George Song (ysong1) - * - */ - -import static org.apache.hadoop.util.StringUtils.*; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Tool; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.RawResource; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.common.util.StringSplitter; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.cmd.ShellCmdOutput; -import org.apache.kylin.job.exception.JobException; -import org.apache.kylin.job.tools.OptionsHelper; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@SuppressWarnings("static-access") -public abstract class AbstractHadoopJob extends Configured implements Tool { - protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); - - protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname"); - protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename"); - protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname"); - protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname"); - protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename"); - protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); - protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat"); - protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim"); - protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); - protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level"); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input"); - protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename"); - - protected String name; - protected boolean isAsync = false; - protected OptionsHelper optionsHelper = new OptionsHelper(); - - protected Job job; - - protected void parseOptions(Options options, String[] args) throws ParseException { - optionsHelper.parseOptions(options, args); - } - - public void printUsage(Options options) { - optionsHelper.printUsage(getClass().getSimpleName(), options); - } - - public Option[] getOptions() { - return optionsHelper.getOptions(); - } - - public String getOptionsAsString() { - return optionsHelper.getOptionsAsString(); - } - - protected String getOptionValue(Option option) { - return optionsHelper.getOptionValue(option); - } - - protected boolean hasOption(Option option) { - return optionsHelper.hasOption(option); - } - - protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException { - int retVal = 0; - long start = System.nanoTime(); - if (isAsync) { - job.submit(); - } else { - job.waitForCompletion(true); - retVal = job.isSuccessful() ? 0 : 1; - logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures. Time taken ") + formatTime((System.nanoTime() - start) / 1000000L)); - } - return retVal; - } - - private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar"; - - String filterKylinHiveDependency(String kylinHiveDependency) { - if (StringUtils.isBlank(kylinHiveDependency)) - return ""; - - StringBuilder jarList = new StringBuilder(); - - Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS); - Matcher matcher = hivePattern.matcher(kylinHiveDependency); - - while (matcher.find()) { - if (jarList.length() > 0) - jarList.append(","); - jarList.append(matcher.group()); - } - - return jarList.toString(); - } - - private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath"; - - protected void setJobClasspath(Job job) { - String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath(); - File jarFile = new File(jarPath); - if (jarFile.exists()) { - job.setJar(jarPath); - logger.info("append job jar: " + jarPath); - } else { - job.setJarByClass(this.getClass()); - } - - String kylinHiveDependency = System.getProperty("kylin.hive.dependency"); - String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency"); - logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH); - - Configuration jobConf = job.getConfiguration(); - String classpath = jobConf.get(MAP_REDUCE_CLASSPATH); - if (classpath == null || classpath.length() == 0) { - logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value."); - classpath = getDefaultMapRedClasspath(); - classpath = classpath.replace(":", ","); // yarn classpath is comma separated - logger.info("The default mapred classpath is: " + classpath); - } - - if (kylinHBaseDependency != null) { - // yarn classpath is comma separated - kylinHBaseDependency = kylinHBaseDependency.replace(":", ","); - classpath = classpath + "," + kylinHBaseDependency; - } - - jobConf.set(MAP_REDUCE_CLASSPATH, classpath); - logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); - - /* - * set extra dependencies as tmpjars & tmpfiles if configured - */ - StringBuilder kylinDependency = new StringBuilder(); - - // for hive dependencies - if (kylinHiveDependency != null) { - // yarn classpath is comma separated - kylinHiveDependency = kylinHiveDependency.replace(":", ","); - - logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency); - String filteredHive = filterKylinHiveDependency(kylinHiveDependency); - logger.info("Hive Dependencies After Filtered: " + filteredHive); - - if (kylinDependency.length() > 0) - kylinDependency.append(","); - kylinDependency.append(filteredHive); - } - - // for KylinJobMRLibDir - String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir(); - if (!StringUtils.isBlank(mrLibDir)) { - File dirFileMRLIB = new File(mrLibDir); - if (dirFileMRLIB.exists()) { - if (kylinDependency.length() > 0) - kylinDependency.append(","); - kylinDependency.append(mrLibDir); - } else { - logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!"); - } - } - - setJobTmpJarsAndFiles(job, kylinDependency.toString()); - } - - private void setJobTmpJarsAndFiles(Job job, String kylinDependency) { - if (StringUtils.isBlank(kylinDependency)) - return; - - String[] fNameList = kylinDependency.split(","); - - try { - Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.getLocal(jobConf); - - StringBuilder jarList = new StringBuilder(); - StringBuilder fileList = new StringBuilder(); - - for (String fileName : fNameList) { - Path p = new Path(fileName); - if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, fileName); - continue; - } - - StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; - if (list.length() > 0) - list.append(","); - list.append(fs.getFileStatus(p).getPath().toString()); - } - - appendTmpFiles(fileList.toString(), jobConf); - appendTmpJars(jarList.toString(), jobConf); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void appendTmpDir(Job job, String tmpDir) { - if (StringUtils.isBlank(tmpDir)) - return; - - try { - Configuration jobConf = job.getConfiguration(); - FileSystem fs = FileSystem.getLocal(jobConf); - FileStatus[] fList = fs.listStatus(new Path(tmpDir)); - - StringBuilder jarList = new StringBuilder(); - StringBuilder fileList = new StringBuilder(); - - for (FileStatus file : fList) { - Path p = file.getPath(); - if (fs.getFileStatus(p).isDirectory()) { - appendTmpDir(job, p.toString()); - continue; - } - - StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList; - if (list.length() > 0) - list.append(","); - list.append(fs.getFileStatus(p).getPath().toString()); - } - - appendTmpFiles(fileList.toString(), jobConf); - appendTmpJars(jarList.toString(), jobConf); - - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void appendTmpJars(String jarList, Configuration conf) { - if (StringUtils.isBlank(jarList)) - return; - - String tmpJars = conf.get("tmpjars", null); - if (tmpJars == null) { - tmpJars = jarList; - } else { - tmpJars += "," + jarList; - } - conf.set("tmpjars", tmpJars); - logger.info("Job 'tmpjars' updated -- " + tmpJars); - } - - private void appendTmpFiles(String fileList, Configuration conf) { - if (StringUtils.isBlank(fileList)) - return; - - String tmpFiles = conf.get("tmpfiles", null); - if (tmpFiles == null) { - tmpFiles = fileList; - } else { - tmpFiles += "," + fileList; - } - conf.set("tmpfiles", tmpFiles); - logger.info("Job 'tmpfiles' updated -- " + tmpFiles); - } - - private String getDefaultMapRedClasspath() { - - String classpath = ""; - try { - CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor(); - ShellCmdOutput output = new ShellCmdOutput(); - executor.execute("mapred classpath", output); - - classpath = output.getOutput().trim(); - } catch (IOException e) { - logger.error("Failed to run: 'mapred classpath'.", e); - } - - return classpath; - } - - public void addInputDirs(String input, Job job) throws IOException { - for (String inp : StringSplitter.split(input, ",")) { - inp = inp.trim(); - if (inp.endsWith("/*")) { - inp = inp.substring(0, inp.length() - 2); - FileSystem fs = FileSystem.get(job.getConfiguration()); - Path path = new Path(inp); - FileStatus[] fileStatuses = fs.listStatus(path); - boolean hasDir = false; - for (FileStatus stat : fileStatuses) { - if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) { - hasDir = true; - addInputDirs(stat.getPath().toString(), job); - } - } - if (fileStatuses.length > 0 && !hasDir) { - addInputDirs(path.toString(), job); - } - } else { - logger.debug("Add input " + inp); - FileInputFormat.addInputPath(job, new Path(inp)); - } - } - } - - protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException { - File tmp = File.createTempFile("kylin_job_meta", ""); - FileUtils.forceDelete(tmp); - - File metaDir = new File(tmp, "meta"); - metaDir.mkdirs(); - - // write kylin.properties - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - File kylinPropsFile = new File(metaDir, "kylin.properties"); - kylinConfig.writeProperties(kylinPropsFile); - - // write cube / model_desc / cube_desc / dict / table - ArrayList<String> dumpList = new ArrayList<String>(); - dumpList.add(cube.getResourcePath()); - dumpList.add(cube.getDescriptor().getModel().getResourcePath()); - dumpList.add(cube.getDescriptor().getResourcePath()); - for (String tableName : cube.getDescriptor().getModel().getAllTables()) { - TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName); - dumpList.add(table.getResourcePath()); - } - - for (CubeSegment segment : cube.getSegments()) { - dumpList.addAll(segment.getDictionaryPaths()); - } - - dumpResources(kylinConfig, metaDir, dumpList); - addToHadoopDistCache(conf, metaDir); - } - - protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException { - File tmp = File.createTempFile("kylin_job_meta", ""); - FileUtils.forceDelete(tmp); - - File metaDir = new File(tmp, "meta"); - metaDir.mkdirs(); - - // write kylin.properties - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - File kylinPropsFile = new File(metaDir, "kylin.properties"); - kylinConfig.writeProperties(kylinPropsFile); - - // write II / model_desc / II_desc / dict / table - ArrayList<String> dumpList = new ArrayList<String>(); - dumpList.add(ii.getResourcePath()); - dumpList.add(ii.getDescriptor().getModel().getResourcePath()); - dumpList.add(ii.getDescriptor().getResourcePath()); - - for (String tableName : ii.getDescriptor().getModel().getAllTables()) { - TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName); - dumpList.add(table.getResourcePath()); - } - - for (IISegment segment : ii.getSegments()) { - dumpList.addAll(segment.getDictionaryPaths()); - } - - dumpResources(kylinConfig, metaDir, dumpList); - addToHadoopDistCache(conf, metaDir); - } - - private void addToHadoopDistCache(Configuration conf, File metaDir) { - // hadoop distributed cache - String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()); - if (hdfsMetaDir.startsWith("/")) // note Path on windows is like "d:/../..." - hdfsMetaDir = "file://" + hdfsMetaDir; - else - hdfsMetaDir = "file:///" + hdfsMetaDir; - logger.info("HDFS meta dir is: " + hdfsMetaDir); - - appendTmpFiles(hdfsMetaDir, conf); - } - - private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException { - ResourceStore from = ResourceStore.getStore(kylinConfig); - KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()); - ResourceStore to = ResourceStore.getStore(localConfig); - for (String path : dumpList) { - RawResource res = from.getResource(path); - if (res == null) - throw new IllegalStateException("No resource found at -- " + path); - to.putResource(path, res.inputStream, res.timestamp); - res.inputStream.close(); - } - } - - protected void deletePath(Configuration conf, Path path) throws IOException { - FileSystem fs = FileSystem.get(path.toUri(), conf); - if (fs.exists(path)) { - fs.delete(path, true); - } - } - - protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException { - if (job == null) { - throw new JobException("Job is null"); - } - - long mapInputBytes = 0; - InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration()); - for (InputSplit split : input.getSplits(job)) { - mapInputBytes += split.getLength(); - } - if (mapInputBytes == 0) { - throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!"); - } - double totalMapInputMB = (double) mapInputBytes / 1024 / 1024; - return totalMapInputMB; - } - - protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException { - if (job == null) { - throw new JobException("Job is null"); - } - InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration()); - return input.getSplits(job).size(); - } - - public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException { - File metaDir = new File("meta"); - System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); - logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); - return kylinConfig; - } - - protected void cleanupTempConfFile(Configuration conf) { - String tempMetaFileString = conf.get("tmpfiles"); - logger.info("tempMetaFileString is : " + tempMetaFileString); - if (tempMetaFileString != null) { - if (tempMetaFileString.startsWith("file://")) { - tempMetaFileString = tempMetaFileString.substring("file://".length()); - File tempMetaFile = new File(tempMetaFileString); - if (tempMetaFile.exists()) { - try { - FileUtils.forceDelete(tempMetaFile.getParentFile()); - - } catch (IOException e) { - logger.warn("error when deleting " + tempMetaFile, e); - } - } else { - logger.info("" + tempMetaFileString + " does not exist"); - } - } else { - logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString); - } - } - } - - public void kill() throws JobException { - if (job != null) { - try { - job.killJob(); - } catch (IOException e) { - throw new JobException(e); - } - } - } - - public Map<String, String> getInfo() throws JobException { - if (job != null) { - Map<String, String> status = new HashMap<String, String>(); - if (null != job.getJobID()) { - status.put(JobInstance.MR_JOB_ID, job.getJobID().toString()); - } - if (null != job.getTrackingURL()) { - status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString()); - } - - return status; - } else { - throw new JobException("Job is null"); - } - } - - public Counters getCounters() throws JobException { - if (job != null) { - try { - return job.getCounters(); - } catch (IOException e) { - throw new JobException(e); - } - } else { - throw new JobException("Job is null"); - } - } - - public void setAsync(boolean isAsync) { - this.isAsync = isAsync; - } - - public Job getJob() { - return this.job; - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java deleted file mode 100644 index 787181c..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cardinality; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.kv.RowConstants; - -/** - * @author Jack - * - */ -public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWritable, BytesWritable> { - - private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>(); - public static final String DEFAULT_DELIM = ","; - - private int counter = 0; - - private HCatSchema schema = null; - private int columnSize = 0; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - columnSize = schema.getFields().size(); - } - - @Override - public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException { - - HCatFieldSchema field; - Object fieldValue; - for (int m = 0; m < columnSize; m++) { - field = schema.get(m); - fieldValue = value.get(field.getName(), schema); - if (fieldValue == null) - fieldValue = "NULL"; - - if (counter < 5 && m < 10) { - System.out.println("Get row " + counter + " column '" + field.getName() + "' value: " + fieldValue); - } - - if (fieldValue != null) - getHllc(m).add(Bytes.toBytes(fieldValue.toString())); - } - - counter++; - } - - private HyperLogLogPlusCounter getHllc(Integer key) { - if (!hllcMap.containsKey(key)) { - hllcMap.put(key, new HyperLogLogPlusCounter()); - } - return hllcMap.get(key); - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - Iterator<Integer> it = hllcMap.keySet().iterator(); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit())); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java deleted file mode 100644 index ab4285a..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cardinality; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.cube.kv.RowConstants; - -/** - * @author Jack - * - */ -public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> { - - public static final int ONE = 1; - private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - } - - @Override - public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException { - int skey = key.get(); - for (BytesWritable v : values) { - ByteBuffer buffer = ByteBuffer.wrap(v.getBytes()); - HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(); - hll.readRegisters(buffer); - getHllc(skey).merge(hll); - hll.clear(); - } - } - - private HyperLogLogPlusCounter getHllc(Integer key) { - if (!hllcMap.containsKey(key)) { - hllcMap.put(key, new HyperLogLogPlusCounter()); - } - return hllcMap.get(key); - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - List<Integer> keys = new ArrayList<Integer>(); - Iterator<Integer> it = hllcMap.keySet().iterator(); - while (it.hasNext()) { - keys.add(it.next()); - } - Collections.sort(keys); - it = keys.iterator(); - while (it.hasNext()) { - int key = it.next(); - HyperLogLogPlusCounter hllc = hllcMap.get(key); - ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - buf.clear(); - hllc.writeRegisters(buf); - buf.flip(); - context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate())); - // context.write(new Text("ErrorRate_" + key), new - // LongWritable((long)hllc.getErrorRate())); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java deleted file mode 100644 index f27d074..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cardinality; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; - -/** - * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column. - * @author shaoshi - * - */ -public class HiveColumnCardinalityJob extends AbstractHadoopJob { - public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job"; - - @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); - - public static final String OUTPUT_PATH = BatchConstants.CFG_KYLIN_HDFS_TEMP_DIR + "cardinality"; - - public HiveColumnCardinalityJob() { - } - - @Override - public int run(String[] args) throws Exception { - - Options options = new Options(); - - try { - options.addOption(OPTION_TABLE); - options.addOption(OPTION_OUTPUT_PATH); - - parseOptions(options, args); - - // start job - String jobName = JOB_TITLE + getOptionsAsString(); - System.out.println("Starting: " + jobName); - Configuration conf = getConf(); - - JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); - conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null)); - - job = Job.getInstance(conf, jobName); - - setJobClasspath(job); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set("dfs.block.size", "67108864"); - - // Mapper - String table = getOptionValue(OPTION_TABLE); - String[] dbTableNames = HadoopUtil.parseHiveTableName(table); - HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); - - job.setInputFormatClass(HCatInputFormat.class); - job.setMapperClass(ColumnCardinalityMapper.class); - job.setMapOutputKeyClass(IntWritable.class); - job.setMapOutputValueClass(BytesWritable.class); - - // Reducer - only one - job.setReducerClass(ColumnCardinalityReducer.class); - job.setOutputFormatClass(TextOutputFormat.class); - job.setOutputKeyClass(IntWritable.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); - - this.deletePath(job.getConfiguration(), output); - - System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'"); - int result = waitForCompletion(job); - - return result; - } catch (Exception e) { - printUsage(options); - throw e; - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java deleted file mode 100644 index 7bd3814..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cardinality; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.MetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * This job will update save the cardinality result into Kylin table metadata store. - * - * @author shaoshi - */ -public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob { - public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job"; - - @SuppressWarnings("static-access") - protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table"); - - private static final Logger logger = LoggerFactory.getLogger(HiveColumnCardinalityUpdateJob.class); - private String table; - - public HiveColumnCardinalityUpdateJob() { - - } - - @Override - public int run(String[] args) throws Exception { - - Options options = new Options(); - - try { - options.addOption(OPTION_TABLE); - options.addOption(OPTION_OUTPUT_PATH); - - parseOptions(options, args); - - this.table = getOptionValue(OPTION_TABLE).toUpperCase(); - // start job - String jobName = JOB_TITLE + getOptionsAsString(); - logger.info("Starting: " + jobName); - Configuration conf = getConf(); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - updateKylinTableExd(table.toUpperCase(), output.toString(), conf); - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - - } - - public void updateKylinTableExd(String tableName, String outPath, Configuration config) throws IOException { - List<String> columns = null; - try { - columns = readLines(new Path(outPath), config); - } catch (Exception e) { - logger.error("Failed to resolve cardinality for " + tableName + " from " + outPath, e); - return; - } - - StringBuffer cardi = new StringBuffer(); - Iterator<String> it = columns.iterator(); - while (it.hasNext()) { - String string = it.next(); - String[] ss = StringUtils.split(string, "\t"); - - if (ss.length != 2) { - logger.info("The hadoop cardinality value is not valid " + string); - continue; - } - cardi.append(ss[1]); - cardi.append(","); - } - String scardi = cardi.toString(); - if (scardi.length() > 0) { - scardi = scardi.substring(0, scardi.length() - 1); - MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); - Map<String, String> tableExd = metaMgr.getTableDescExd(tableName); - tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi); - metaMgr.saveTableExd(tableName.toUpperCase(), tableExd); - } else { - throw new IllegalArgumentException("No cardinality data is collected for table " + tableName); - } - } - - private static List<String> readLines(Path location, Configuration conf) throws Exception { - FileSystem fileSystem = FileSystem.get(location.toUri(), conf); - CompressionCodecFactory factory = new CompressionCodecFactory(conf); - FileStatus[] items = fileSystem.listStatus(location); - if (items == null) - return new ArrayList<String>(); - List<String> results = new ArrayList<String>(); - for (FileStatus item : items) { - - // ignoring files like _SUCCESS - if (item.getPath().getName().startsWith("_")) { - continue; - } - - CompressionCodec codec = factory.getCodec(item.getPath()); - InputStream stream = null; - - // check if we have a compression codec we need to use - if (codec != null) { - stream = codec.createInputStream(fileSystem.open(item.getPath())); - } else { - stream = fileSystem.open(item.getPath()); - } - - StringWriter writer = new StringWriter(); - IOUtils.copy(stream, writer, "UTF-8"); - String raw = writer.toString(); - for (String str : raw.split("\n")) { - results.add(str); - } - } - return results; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java deleted file mode 100644 index b600213..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.hadoop.util.ToolRunner; - -/** - * @author honma - * - */ - -public class BaseCuboidJob extends CuboidJob { - public BaseCuboidJob() { - this.setMapperClass(BaseCuboidMapper.class); - } - - public static void main(String[] args) throws Exception { - CuboidJob job = new BaseCuboidJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java deleted file mode 100644 index d06963b..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesSplitter; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.SplittedBytes; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * @author George Song (ysong1) - */ -public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> { - - private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class); - - public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); - public static final byte[] ONE = Bytes.toBytes("1"); - - private String cubeName; - private String segmentName; - private Cuboid baseCuboid; - private CubeInstance cube; - private CubeDesc cubeDesc; - private CubeSegment cubeSegment; - private List<byte[]> nullBytes; - - private CubeJoinedFlatTableDesc intermediateTableDesc; - private String intermediateTableRowDelimiter; - private byte byteRowDelimiter; - - private int counter; - private int errorRecordCounter; - private Text outputKey = new Text(); - private Text outputValue = new Text(); - protected MeasureIngester<?>[] aggrIngesters; - protected Map<TblColRef, Dictionary<String>> dictionaryMap; - private Object[] measures; - private byte[][] keyBytesBuf; - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - - private BytesSplitter bytesSplitter; - private AbstractRowKeyEncoder rowKeyEncoder; - private MeasureCodec measureCodec; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME); - intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); - if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { - throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); - } - - byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0]; - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - cube = CubeManager.getInstance(config).getCube(cubeName); - cubeDesc = cube.getDescriptor(); - cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - - intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment); - - bytesSplitter = new BytesSplitter(200, 16384); - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - - measureCodec = new MeasureCodec(cubeDesc.getMeasures()); - measures = new Object[cubeDesc.getMeasures().size()]; - - int colCount = cubeDesc.getRowkey().getRowKeyColumns().length; - keyBytesBuf = new byte[colCount][]; - - aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - dictionaryMap = cubeSegment.buildDictionaryMap(); - - initNullBytes(); - } - - private void initNullBytes() { - nullBytes = Lists.newArrayList(); - nullBytes.add(HIVE_NULL); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullBytes.add(Bytes.toBytes(s)); - } - } - } - - private boolean isNull(byte[] v) { - for (byte[] nullByte : nullBytes) { - if (Bytes.equals(v, nullByte)) - return true; - } - return false; - } - - private byte[] buildKey(SplittedBytes[] splitBuffers) { - int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); - for (int i = 0; i < baseCuboid.getColumns().size(); i++) { - int index = rowKeyColumnIndexes[i]; - keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length); - if (isNull(keyBytesBuf[i])) { - keyBytesBuf[i] = null; - } - } - return rowKeyEncoder.encode(keyBytesBuf); - } - - private void buildValue(SplittedBytes[] splitBuffers) { - - for (int i = 0; i < measures.length; i++) { - measures[i] = buildValueOf(i, splitBuffers); - } - - valueBuf.clear(); - measureCodec.encode(measures, valueBuf); - } - - private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) { - MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int colParamIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers); - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private String getCell(int i, SplittedBytes[] splitBuffers) { - byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length); - if (isNull(bytes)) - return null; - else - return Bytes.toString(bytes); - } - - @Override - public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { - counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { - logger.info("Handled " + counter + " records!"); - } - - try { - bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); - intermediateTableDesc.sanityCheck(bytesSplitter); - - byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers()); - outputKey.set(rowKey, 0, rowKey.length); - - buildValue(bytesSplitter.getSplitBuffers()); - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - - context.write(outputKey, outputValue); - } catch (Exception ex) { - handleErrorRecord(bytesSplitter, ex); - } - } - - private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { - - ex.printStackTrace(System.err); - System.err.println("Insane record: " + bytesSplitter); - - errorRecordCounter++; - if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { - if (ex instanceof IOException) - throw (IOException) ex; - else if (ex instanceof RuntimeException) - throw (RuntimeException) ex; - else - throw new RuntimeException("", ex); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java deleted file mode 100644 index 3c1e4a5..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author George Song (ysong1) - */ -public class CubeHFileJob extends AbstractHadoopJob { - - protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class); - - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_HTABLE_NAME); - parseOptions(options, args); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - - CubeInstance cube = cubeMgr.getCube(cubeName); - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - FileOutputFormat.setOutputPath(job, output); - - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(CubeHFileMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - - // set job configuration - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - Configuration conf = HBaseConfiguration.create(getConf()); - // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTable htable = new HTable(conf, tableName); - - //Automatic config ! - HFileOutputFormat.configureIncrementalLoad(job, htable); - - // set block replication to 3 for hfiles - conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); - - this.deletePath(job.getConfiguration(), output); - - return waitForCompletion(job); - } catch (Exception e) { - logger.error("error in CubeHFileJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CubeHFileJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java deleted file mode 100644 index f12d229..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.metadata.model.MeasureDesc; - -import com.google.common.collect.Lists; - -/** - * @author George Song (ysong1) - * - */ -public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWritable, KeyValue> { - - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); - - String cubeName; - CubeDesc cubeDesc; - - MeasureCodec inputCodec; - Object[] inputMeasures; - List<KeyValueCreator> keyValueCreators; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - CubeManager cubeMgr = CubeManager.getInstance(config); - cubeDesc = cubeMgr.getCube(cubeName).getDescriptor(); - - inputCodec = new MeasureCodec(cubeDesc.getMeasures()); - inputMeasures = new Object[cubeDesc.getMeasures().size()]; - keyValueCreators = Lists.newArrayList(); - - for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) { - for (HBaseColumnDesc colDesc : cfDesc.getColumns()) { - keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc)); - } - } - } - - @Override - public void map(Text key, Text value, Context context) throws IOException, InterruptedException { - outputKey.set(key.getBytes(), 0, key.getLength()); - KeyValue outputValue; - - int n = keyValueCreators.size(); - if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for - // simple full copy - - outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength()); - context.write(outputKey, outputValue); - - } else { // normal (complex) case that distributes measures to multiple - // HBase columns - - inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures); - - for (int i = 0; i < n; i++) { - outputValue = keyValueCreators.get(i).create(key, inputMeasures); - context.write(outputKey, outputValue); - } - } - } - - class KeyValueCreator { - byte[] cfBytes; - byte[] qBytes; - long timestamp; - - int[] refIndex; - MeasureDesc[] refMeasures; - - MeasureCodec codec; - Object[] colValues; - ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - - boolean isFullCopy; - - public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) { - - cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName()); - qBytes = Bytes.toBytes(colDesc.getQualifier()); - timestamp = 0; // use 0 for timestamp - - List<MeasureDesc> measures = cubeDesc.getMeasures(); - String[] measureNames = getMeasureNames(cubeDesc); - String[] refs = colDesc.getMeasureRefs(); - - refIndex = new int[refs.length]; - refMeasures = new MeasureDesc[refs.length]; - for (int i = 0; i < refs.length; i++) { - refIndex[i] = indexOf(measureNames, refs[i]); - refMeasures[i] = measures.get(refIndex[i]); - } - - codec = new MeasureCodec(refMeasures); - colValues = new Object[refs.length]; - - isFullCopy = true; - for (int i = 0; i < measures.size(); i++) { - if (refIndex.length <= i || refIndex[i] != i) - isFullCopy = false; - } - } - - public KeyValue create(Text key, Object[] measureValues) { - for (int i = 0; i < colValues.length; i++) { - colValues[i] = measureValues[refIndex[i]]; - } - - valueBuf.clear(); - codec.encode(colValues, valueBuf); - - return create(key, valueBuf.array(), 0, valueBuf.position()); - } - - public KeyValue create(Text key, byte[] value, int voffset, int vlen) { - return new KeyValue(key.getBytes(), 0, key.getLength(), // - cfBytes, 0, cfBytes.length, // - qBytes, 0, qBytes.length, // - timestamp, Type.Put, // - value, voffset, vlen); - } - - private int indexOf(String[] measureNames, String ref) { - for (int i = 0; i < measureNames.length; i++) - if (measureNames[i].equalsIgnoreCase(ref)) - return i; - - throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames)); - } - - private String[] getMeasureNames(CubeDesc cubeDesc) { - List<MeasureDesc> measures = cubeDesc.getMeasures(); - String[] result = new String[measures.size()]; - for (int i = 0; i < measures.size(); i++) - result[i] = measures.get(i).getName(); - return result; - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java deleted file mode 100644 index e4875e9..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.cuboid.CuboidCLI; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.exception.JobException; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - */ -public class CuboidJob extends AbstractHadoopJob { - - protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class); - private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"; - - @SuppressWarnings("rawtypes") - private Class<? extends Mapper> mapperClass; - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_NCUBOID_LEVEL); - options.addOption(OPTION_INPUT_FORMAT); - parseOptions(options, args); - - Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); - - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); - CubeInstance cube = cubeMgr.getCube(cubeName); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - logger.info("Starting: " + job.getJobName()); - FileInputFormat.setInputPaths(job, input); - - setJobClasspath(job); - - // Mapper - if (this.mapperClass == null) { - throw new Exception("Mapper class is not set!"); - } - - boolean isInputTextFormat = false; - if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) { - isInputTextFormat = true; - } - - if (isInputTextFormat) { - job.setInputFormatClass(TextInputFormat.class); - - } else { - job.setInputFormatClass(SequenceFileInputFormat.class); - } - job.setMapperClass(this.mapperClass); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others - - // Reducer - job.setReducerClass(CuboidReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - FileOutputFormat.setOutputPath(job, output); - - // set job configuration - job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); - job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); - job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel); - - // add metadata to distributed cache - attachKylinPropsAndMetadata(cube, job.getConfiguration()); - - setReduceTaskNum(job, config, cubeName, nCuboidLevel); - - this.deletePath(job.getConfiguration(), output); - - return waitForCompletion(job); - } catch (Exception e) { - logger.error("error in CuboidJob", e); - printUsage(options); - throw e; - } finally { - if (job != null) - cleanupTempConfFile(job.getConfiguration()); - } - } - - protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException { - Configuration jobConf = job.getConfiguration(); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - - CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor(); - - double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); - double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); - - // total map input MB - double totalMapInputMB = this.getTotalMapInputMB(); - - // output / input ratio - int preLevelCuboids, thisLevelCuboids; - if (level == 0) { // base cuboid - preLevelCuboids = thisLevelCuboids = 1; - } else { // n-cuboid - int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc); - preLevelCuboids = allLevelCount[level - 1]; - thisLevelCuboids = allLevelCount[level]; - } - - // total reduce input MB - double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids; - - // number of reduce tasks - int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio); - - // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance - if (cubeDesc.hasMemoryHungryMeasures()) { - numReduceTasks = numReduceTasks * 4; - } - - // at least 1 reducer - numReduceTasks = Math.max(1, numReduceTasks); - // no more than 5000 reducer by default - numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); - - jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks); - - logger.info("Having total map input MB " + Math.round(totalMapInputMB)); - logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids); - logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio); - logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks); - } - - /** - * @param mapperClass - * the mapperClass to set - */ - @SuppressWarnings("rawtypes") - public void setMapperClass(Class<? extends Mapper> mapperClass) { - this.mapperClass = mapperClass; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java deleted file mode 100644 index 3859d0e..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.measure.MeasureAggregators; -import org.apache.kylin.measure.MeasureCodec; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author George Song (ysong1) - * - */ -public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { - - private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class); - - private String cubeName; - private CubeDesc cubeDesc; - private List<MeasureDesc> measuresDescs; - - private MeasureCodec codec; - private MeasureAggregators aggs; - - private int counter; - private int cuboidLevel; - private boolean[] needAggr; - private Object[] input; - private Object[] result; - - private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE); - private Text outputValue = new Text(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); - // only used in Build job, not in Merge job - cuboidLevel = context.getConfiguration().getInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 0); - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); - - cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor(); - measuresDescs = cubeDesc.getMeasures(); - - codec = new MeasureCodec(measuresDescs); - aggs = new MeasureAggregators(measuresDescs); - - input = new Object[measuresDescs.size()]; - result = new Object[measuresDescs.size()]; - needAggr = new boolean[measuresDescs.size()]; - - if (cuboidLevel > 0) { - for (int i = 0; i < measuresDescs.size(); i++) { - needAggr[i] = !measuresDescs.get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); - } - } - } - - @Override - public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - - aggs.reset(); - - for (Text value : values) { - codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input); - if (cuboidLevel > 0) { - aggs.aggregate(input, needAggr); - } else { - aggs.aggregate(input); - } - } - aggs.collectStates(result); - - valueBuf.clear(); - codec.encode(result, valueBuf); - - outputValue.set(valueBuf.array(), 0, valueBuf.position()); - context.write(key, outputValue); - - counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { - logger.info("Handled " + counter + " records!"); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java deleted file mode 100644 index 9792463..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import java.io.IOException; -import java.util.HashSet; - -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; - -/** - * @author yangli9 - */ -public class FactDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> { - - private Text outputValue = new Text(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - for (ByteArray value : set) { - outputValue.set(value.data); - context.write(key, outputValue); - } - } - -}
