http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java deleted file mode 100644 index 3b25ee1..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.cube; - -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.cmd.ICommandOutput; -import org.apache.kylin.job.cmd.ShellCmd; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.exception.JobException; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.manager.ExecutableManager; -import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * @author ysong1 - */ -public class StorageCleanupJob extends AbstractHadoopJob { - - @SuppressWarnings("static-access") - private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete"); - - protected static final Logger log = LoggerFactory.getLogger(StorageCleanupJob.class); - - boolean delete = false; - - protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - - /* - * (non-Javadoc) - * - * @see org.apache.hadoop.util.Tool#run(java.lang.String[]) - */ - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - log.info("----- jobs args: " + Arrays.toString(args)); - try { - options.addOption(OPTION_DELETE); - parseOptions(options, args); - - log.info("options: '" + getOptionsAsString() + "'"); - log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'"); - delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE)); - - Configuration conf = HBaseConfiguration.create(getConf()); - - cleanUnusedIntermediateHiveTable(conf); - cleanUnusedHdfsFiles(conf); - cleanUnusedHBaseTables(conf); - - return 0; - } catch (Exception e) { - e.printStackTrace(System.err); - throw e; - } - } - - private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - - // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); - String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; - HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); - List<String> allTablesNeedToBeDropped = new ArrayList<String>(); - for (HTableDescriptor desc : tableDescriptors) { - String host = desc.getValue(IRealizationConstants.HTableTag); - if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) { - //only take care htables that belongs to self - allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString()); - } - } - - // remove every segment htable from drop list - for (CubeInstance cube : cubeMgr.listAllCubes()) { - for (CubeSegment seg : cube.getSegments()) { - String tablename = seg.getStorageLocationIdentifier(); - allTablesNeedToBeDropped.remove(tablename); - log.info("Remove table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); - } - } - - // remove every ii segment htable from drop list - for (IIInstance ii : iiManager.listAllIIs()) { - for (IISegment seg : ii.getSegments()) { - String tablename = seg.getStorageLocationIdentifier(); - allTablesNeedToBeDropped.remove(tablename); - log.info("Remove table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus()); - } - } - - if (delete == true) { - // drop tables - for (String htableName : allTablesNeedToBeDropped) { - log.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - hbaseAdmin.disableTable(htableName); - hbaseAdmin.deleteTable(htableName); - log.info("Deleted HBase table " + htableName); - } else { - log.info("HBase table" + htableName + " does not exist"); - } - } - } else { - System.out.println("--------------- Tables To Be Dropped ---------------"); - for (String htableName : allTablesNeedToBeDropped) { - System.out.println(htableName); - } - System.out.println("----------------------------------------------------"); - } - - hbaseAdmin.close(); - } - - private void cleanUnusedHdfsFiles(Configuration conf) throws IOException { - JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); - CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - - FileSystem fs = FileSystem.get(conf); - List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>(); - // GlobFilter filter = new - // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() - // + "/kylin-.*"); - FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())); - for (FileStatus status : fStatus) { - String path = status.getPath().getName(); - // System.out.println(path); - if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) { - String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path; - allHdfsPathsNeedToBeDeleted.add(kylinJobPath); - } - } - - List<String> allJobs = executableManager.getAllJobIds(); - for (String jobId : allJobs) { - // only remove FINISHED and DISCARDED job intermediate files - final ExecutableState state = executableManager.getOutput(jobId).getState(); - if (!state.isFinalState()) { - String path = JobInstance.getJobWorkingDir(jobId, engineConfig.getHdfsWorkingDirectory()); - allHdfsPathsNeedToBeDeleted.remove(path); - log.info("Remove " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state); - } - } - - // remove every segment working dir from deletion list - for (CubeInstance cube : cubeMgr.listAllCubes()) { - for (CubeSegment seg : cube.getSegments()) { - String jobUuid = seg.getLastBuildJobID(); - if (jobUuid != null && jobUuid.equals("") == false) { - String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory()); - allHdfsPathsNeedToBeDeleted.remove(path); - log.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); - } - } - } - - if (delete == true) { - // remove files - for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { - log.info("Deleting hdfs path " + hdfsPath); - Path p = new Path(hdfsPath); - if (fs.exists(p) == true) { - fs.delete(p, true); - log.info("Deleted hdfs path " + hdfsPath); - } else { - log.info("Hdfs path " + hdfsPath + "does not exist"); - } - } - } else { - System.out.println("--------------- HDFS Path To Be Deleted ---------------"); - for (String hdfsPath : allHdfsPathsNeedToBeDeleted) { - System.out.println(hdfsPath); - } - System.out.println("-------------------------------------------------------"); - } - - } - - private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException { - int uuidLength = 36; - final String useDatabaseHql = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";"; - StringBuilder buf = new StringBuilder(); - buf.append("hive -e \""); - buf.append(useDatabaseHql); - buf.append("show tables " + "\'kylin_intermediate_*\'" + "; "); - buf.append("\""); - - ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); - ICommandOutput output = null; - - try { - output = cmd.execute(); - } catch (JobException e) { - e.printStackTrace(); - } - - if (output == null) - return; - String outputStr = output.getOutput(); - BufferedReader reader = new BufferedReader(new StringReader(outputStr)); - String line = null; - List<String> allJobs = executableManager.getAllJobIds(); - List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>(); - List<String> workingJobList = new ArrayList<String>(); - - for (String jobId : allJobs) { - // only remove FINISHED and DISCARDED job intermediate table - final ExecutableState state = executableManager.getOutput(jobId).getState(); - - if (!state.isFinalState()) { - workingJobList.add(jobId); - log.info("Exclude intermediate hive table with job id " + jobId + " with job status " + state); - } - } - - while ((line = reader.readLine()) != null) { - if (line.startsWith("kylin_intermediate_")) { - boolean isNeedDel = false; - String uuid = line.substring(line.length() - uuidLength, line.length()); - uuid = uuid.replace("_", "-"); - //Check whether it's a hive table in use - if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) { - isNeedDel = true; - } - - if (isNeedDel) { - allHiveTablesNeedToBeDeleted.add(line); - } - } - } - - if (delete == true) { - buf.delete(0, buf.length()); - buf.append("hive -e \""); - buf.append(useDatabaseHql); - for (String delHive : allHiveTablesNeedToBeDeleted) { - buf.append("drop table if exists " + delHive + "; "); - log.info("Remove " + delHive + " from hive tables."); - } - buf.append("\""); - cmd = new ShellCmd(buf.toString(), null, null, null, false); - - try { - cmd.execute(); - } catch (JobException e) { - e.printStackTrace(); - } - } else { - System.out.println("------ Intermediate Hive Tables To Be Dropped ------"); - for (String hiveTable : allHiveTablesNeedToBeDeleted) { - System.out.println(hiveTable); - } - System.out.println("----------------------------------------------------"); - } - - if (reader != null) - reader.close(); - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new StorageCleanupJob(), args); - System.exit(exitCode); - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java deleted file mode 100644 index c7369c4..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.dict; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.cli.DictionaryGeneratorCLI; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; - -/** - * @author ysong1 - * - */ - -public class CreateDictionaryJob extends AbstractHadoopJob { - - private int returnCode = 0; - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_SEGMENT_NAME); - options.addOption(OPTION_INPUT_PATH); - parseOptions(options, args); - - String cubeName = getOptionValue(OPTION_CUBE_NAME); - String segmentName = getOptionValue(OPTION_SEGMENT_NAME); - String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); - - KylinConfig config = KylinConfig.getInstanceFromEnv(); - - DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, factColumnsInputPath); - } catch (Exception e) { - printUsage(options); - throw e; - } - - return returnCode; - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CreateDictionaryJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java deleted file mode 100644 index 1d5cbfe..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.dict; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; - -/** - * @author ysong1 - * - */ -public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_INPUT_PATH); - parseOptions(options, args); - - String iiname = getOptionValue(OPTION_II_NAME); - String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - - IIManager mgr = IIManager.getInstance(config); - IIInstance ii = mgr.getII(iiname); - - mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), factColumnsInputPath); - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CreateInvertedIndexDictionaryJob(), args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java deleted file mode 100644 index 1dc634e..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hbase; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - * - */ -public class BulkLoadJob extends AbstractHadoopJob { - - protected static final Logger log = LoggerFactory.getLogger(BulkLoadJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_HTABLE_NAME); - options.addOption(OPTION_CUBE_NAME); - parseOptions(options, args); - - String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - // e.g - // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/ - // end with "/" - String input = getOptionValue(OPTION_INPUT_PATH); - - Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fs = FileSystem.get(conf); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); - CubeInstance cube = cubeMgr.getCube(cubeName); - CubeDesc cubeDesc = cube.getDescriptor(); - FsPermission permission = new FsPermission((short) 0777); - for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) { - String cfName = cf.getName(); - Path columnFamilyPath = new Path(input, cfName); - - // File may have already been auto-loaded (in the case of MapR DB) - if (fs.exists(columnFamilyPath)) { - fs.setPermission(columnFamilyPath, permission); - } - } - - String[] newArgs = new String[2]; - newArgs[0] = input; - newArgs[1] = tableName; - - log.debug("Start to run LoadIncrementalHFiles"); - int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs); - log.debug("End to run LoadIncrementalHFiles"); - return ret; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new BulkLoadJob(), args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java deleted file mode 100644 index 027c0ca..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hbase; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.tools.DeployCoprocessorCLI; -import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class CreateHTableJob extends AbstractHadoopJob { - - protected static final Logger logger = LoggerFactory.getLogger(CreateHTableJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - options.addOption(OPTION_CUBE_NAME); - options.addOption(OPTION_PARTITION_FILE_PATH); - options.addOption(OPTION_HTABLE_NAME); - parseOptions(options, args); - - Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH)); - - String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeManager cubeMgr = CubeManager.getInstance(config); - CubeInstance cube = cubeMgr.getCube(cubeName); - CubeDesc cubeDesc = cube.getDescriptor(); - - String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); - // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html - tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); - tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); - - Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); - - try { - if (User.isHBaseSecurityEnabled(conf)) { - // add coprocessor for bulk load - tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - } - - for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) { - HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName()); - cf.setMaxVersions(1); - - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - String hbaseDefaultCC = kylinConfig.getHbaseDefaultCompressionCodec().toLowerCase(); - - switch (hbaseDefaultCC) { - case "snappy": { - logger.info("hbase will use snappy to compress data"); - cf.setCompressionType(Algorithm.SNAPPY); - break; - } - case "lzo": { - logger.info("hbase will use lzo to compress data"); - cf.setCompressionType(Algorithm.LZO); - break; - } - case "gz": - case "gzip": { - logger.info("hbase will use gzip to compress data"); - cf.setCompressionType(Algorithm.GZ); - break; - } - case "lz4": { - logger.info("hbase will use lz4 to compress data"); - cf.setCompressionType(Algorithm.LZ4); - break; - } - default: { - logger.info("hbase will not user any compression codec to compress data"); - - } - } - - //if (LZOSupportnessChecker.getSupportness()) { - // logger.info("hbase will use lzo to compress data"); - // cf.setCompressionType(Algorithm.LZO); - // } else { - // logger.info("hbase will not use lzo to compress data"); - // } - - cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - cf.setInMemory(false); - cf.setBlocksize(4 * 1024 * 1024); // set to 4MB - tableDesc.addFamily(cf); - } - - byte[][] splitKeys = getSplits(conf, partitionFilePath); - - if (admin.tableExists(tableName)) { - // admin.disableTable(tableName); - // admin.deleteTable(tableName); - throw new RuntimeException("HBase table " + tableName + " exists!"); - } - - DeployCoprocessorCLI.deployCoprocessor(tableDesc); - - admin.createTable(tableDesc, splitKeys); - logger.info("create hbase table " + tableName + " done."); - - return 0; - } catch (Exception e) { - printUsage(options); - e.printStackTrace(System.err); - logger.error(e.getLocalizedMessage(), e); - return 2; - } finally { - admin.close(); - } - } - - @SuppressWarnings("deprecation") - public byte[][] getSplits(Configuration conf, Path path) throws Exception { - FileSystem fs = path.getFileSystem(conf); - if (fs.exists(path) == false) { - System.err.println("Path " + path + " not found, no region split, HTable will be one region"); - return null; - } - - List<byte[]> rowkeyList = new ArrayList<byte[]>(); - SequenceFile.Reader reader = null; - try { - reader = new SequenceFile.Reader(fs, path, conf); - Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); - Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); - while (reader.next(key, value)) { - rowkeyList.add(((Text) key).copyBytes()); - } - } catch (Exception e) { - e.printStackTrace(); - throw e; - } finally { - IOUtils.closeStream(reader); - } - - logger.info((rowkeyList.size() + 1) + " regions"); - logger.info(rowkeyList.size() + " splits"); - for (byte[] split : rowkeyList) { - System.out.println(StringUtils.byteToHexString(split)); - } - - byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]); - return retValue.length == 0 ? null : retValue; - } - - public static void main(String[] args) throws Exception { - int exitCode = ToolRunner.run(new CreateHTableJob(), args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java deleted file mode 100644 index b2bed9f..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hive; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.util.BytesSplitter; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * @author George Song (ysong1) - */ -public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { - - private String tableName; - private final CubeDesc cubeDesc; - private final CubeSegment cubeSegment; - - private int columnCount; - private int[] rowKeyColumnIndexes; // the column index on flat table - private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table - - private List<IntermediateColumnDesc> columnList = Lists.newArrayList(); - - public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) { - this.cubeDesc = cubeDesc; - this.cubeSegment = cubeSegment; - parseCubeDesc(); - } - - /** - * @return the cubeSegment - */ - public CubeSegment getCubeSegment() { - return cubeSegment; - } - - // check what columns from hive tables are required, and index them - private void parseCubeDesc() { - int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length; - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - - if (cubeSegment == null) { - this.tableName = "kylin_intermediate_" + cubeDesc.getName(); - } else { - this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName(); - } - - Map<String, Integer> dimensionIndexMap = Maps.newHashMap(); - int columnIndex = 0; - for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) { - dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex); - columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col)); - columnIndex++; - } - - // build index - List<TblColRef> cuboidColumns = baseCuboid.getColumns(); - rowKeyColumnIndexes = new int[rowkeyColCount]; - for (int i = 0; i < rowkeyColCount; i++) { - String colName = colName(cuboidColumns.get(i).getCanonicalName()); - Integer dimIdx = dimensionIndexMap.get(colName); - if (dimIdx == null) { - throw new RuntimeException("Can't find column " + colName); - } - rowKeyColumnIndexes[i] = dimIdx; - } - - List<MeasureDesc> measures = cubeDesc.getMeasures(); - int measureSize = measures.size(); - measureColumnIndexes = new int[measureSize][]; - for (int i = 0; i < measureSize; i++) { - FunctionDesc func = measures.get(i).getFunction(); - List<TblColRef> colRefs = func.getParameter().getColRefs(); - if (colRefs == null) { - measureColumnIndexes[i] = null; - } else { - measureColumnIndexes[i] = new int[colRefs.size()]; - for (int j = 0; j < colRefs.size(); j++) { - TblColRef c = colRefs.get(j); - measureColumnIndexes[i][j] = contains(columnList, c); - if (measureColumnIndexes[i][j] < 0) { - measureColumnIndexes[i][j] = columnIndex; - columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c)); - columnIndex++; - } - } - } - } - - columnCount = columnIndex; - } - - private int contains(List<IntermediateColumnDesc> columnList, TblColRef c) { - for (int i = 0; i < columnList.size(); i++) { - IntermediateColumnDesc col = columnList.get(i); - - if (col.isSameAs(c.getTable(), c.getName())) - return i; - } - return -1; - } - - // sanity check the input record (in bytes) matches what's expected - public void sanityCheck(BytesSplitter bytesSplitter) { - if (columnCount != bytesSplitter.getBufferSize()) { - throw new IllegalArgumentException("Expect " + columnCount + " columns, but see " + bytesSplitter.getBufferSize() + " -- " + bytesSplitter); - } - - // TODO: check data types here - } - - public CubeDesc getCubeDesc() { - return cubeDesc; - } - - public int[] getRowKeyColumnIndexes() { - return rowKeyColumnIndexes; - } - - public int[][] getMeasureColumnIndexes() { - return measureColumnIndexes; - } - - @Override - public String getTableName(String jobUUID) { - return tableName + "_" + jobUUID.replace("-", "_"); - } - - @Override - public List<IntermediateColumnDesc> getColumnList() { - return columnList; - } - - @Override - public DataModelDesc getDataModel() { - return cubeDesc.getModel(); - } - - @Override - public DataModelDesc.RealizationCapacity getCapacity() { - return cubeDesc.getModel().getCapacity(); - } - - private static String colName(String canonicalColName) { - return canonicalColName.replace(".", "_"); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hive/IIJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IIJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/IIJoinedFlatTableDesc.java deleted file mode 100644 index 36cb049..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IIJoinedFlatTableDesc.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hive; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.metadata.model.DataModelDesc; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; - -/** - * Created by Hongbin Ma(Binmahone) on 12/30/14. - */ -public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc { - - private IIDesc iiDesc; - private String tableName; - private List<IntermediateColumnDesc> columnList = Lists.newArrayList(); - private Map<String, String> tableAliasMap; - - public IIJoinedFlatTableDesc(IIDesc iiDesc) { - this.iiDesc = iiDesc; - parseIIDesc(); - } - - private void parseIIDesc() { - this.tableName = "kylin_intermediate_ii_" + iiDesc.getName(); - - int columnIndex = 0; - for (TblColRef col : iiDesc.listAllColumns()) { - columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col)); - columnIndex++; - } - } - - @Override - public String getTableName(String jobUUID) { - return tableName + "_" + jobUUID.replace("-", "_"); - } - - public List<IntermediateColumnDesc> getColumnList() { - return columnList; - } - - @Override - public DataModelDesc getDataModel() { - return iiDesc.getModel(); - } - - @Override - public DataModelDesc.RealizationCapacity getCapacity() { - return DataModelDesc.RealizationCapacity.SMALL; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hive/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/IJoinedFlatTableDesc.java deleted file mode 100644 index 86a5751..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IJoinedFlatTableDesc.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hive; - -import java.util.List; - -import org.apache.kylin.metadata.model.DataModelDesc; - -/** - * Created by Hongbin Ma(Binmahone) on 12/30/14. - */ -public interface IJoinedFlatTableDesc { - - public String getTableName(String jobUUID); - - public List<IntermediateColumnDesc> getColumnList(); - - public DataModelDesc getDataModel(); - - public DataModelDesc.RealizationCapacity getCapacity(); - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hive/IntermediateColumnDesc.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IntermediateColumnDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/IntermediateColumnDesc.java deleted file mode 100644 index 17c87cc..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/IntermediateColumnDesc.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hive; - -import org.apache.kylin.metadata.model.TblColRef; - -/** - * Created by Hongbin Ma(Binmahone) on 12/30/14. - */ -public class IntermediateColumnDesc { - private String id; - private TblColRef colRef; - - public IntermediateColumnDesc(String id, TblColRef colRef) { - this.id = id; - this.colRef = colRef; - } - - public String getId() { - return id; - } - - public String getColumnName() { - return colRef.getName(); - } - - public String getDataType() { - return colRef.getDatatype(); - } - - public String getTableName() { - return colRef.getTable(); - } - - public boolean isSameAs(String tableName, String columnName) { - return colRef.isSameAs(tableName, columnName); - } - - public String getCanonicalName() { - return colRef.getCanonicalName(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java deleted file mode 100644 index a1a6f22..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.hive; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author George Song (ysong1) - * - */ -public class SqlHiveDataTypeMapping { - - private static final Map<String, String> sqlToHiveDataTypeMapping = new HashMap<String, String>(); - - static { - sqlToHiveDataTypeMapping.put("short", "smallint"); - sqlToHiveDataTypeMapping.put("long", "bigint"); - sqlToHiveDataTypeMapping.put("byte", "tinyint"); - sqlToHiveDataTypeMapping.put("datetime", "date"); - } - - public static String getHiveDataType(String javaDataType) { - String hiveDataType = sqlToHiveDataTypeMapping.get(javaDataType.toLowerCase()); - if (hiveDataType == null) { - hiveDataType = javaDataType; - } - return hiveDataType.toLowerCase(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java deleted file mode 100644 index 3ffc770..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.metadata.model.SegmentStatusEnum; - -/** - * @author ysong1 - * - */ -public class IIBulkLoadJob extends AbstractHadoopJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_HTABLE_NAME); - options.addOption(OPTION_II_NAME); - parseOptions(options, args); - - String tableName = getOptionValue(OPTION_HTABLE_NAME); - String input = getOptionValue(OPTION_INPUT_PATH); - String iiname = getOptionValue(OPTION_II_NAME); - - FileSystem fs = FileSystem.get(getConf()); - FsPermission permission = new FsPermission((short) 0777); - fs.setPermission(new Path(input, IIDesc.HBASE_FAMILY), permission); - - int hbaseExitCode = ToolRunner.run(new LoadIncrementalHFiles(getConf()), new String[] { input, tableName }); - - IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIInstance ii = mgr.getII(iiname); - IISegment seg = ii.getFirstSegment(); - seg.setStorageLocationIdentifier(tableName); - seg.setStatus(SegmentStatusEnum.READY); - mgr.updateII(ii); - - return hbaseExitCode; - - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - IIBulkLoadJob job = new IIBulkLoadJob(); - job.setConf(HadoopUtil.getCurrentHBaseConfiguration()); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java deleted file mode 100644 index c032bbc..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - * - */ -public class IICreateHFileJob extends AbstractHadoopJob { - - protected static final Logger log = LoggerFactory.getLogger(IICreateHFileJob.class); - - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_INPUT_PATH); - options.addOption(OPTION_OUTPUT_PATH); - options.addOption(OPTION_HTABLE_NAME); - parseOptions(options, args); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - - setJobClasspath(job); - - addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); - FileOutputFormat.setOutputPath(job, output); - - job.setInputFormatClass(SequenceFileInputFormat.class); - job.setMapperClass(IICreateHFileMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - - String tableName = getOptionValue(OPTION_HTABLE_NAME); - HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); - HFileOutputFormat.configureIncrementalLoad(job, htable); - - this.deletePath(job.getConfiguration(), output); - - return waitForCompletion(job); - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - IICreateHFileJob job = new IICreateHFileJob(); - job.setConf(HadoopUtil.getCurrentHBaseConfiguration()); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java deleted file mode 100644 index 22d9de8..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileMapper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.invertedindex.model.IIDesc; - -/** - * @author yangli9 - */ -public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> { - - long timestamp; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - super.publishConfiguration(context.getConfiguration()); - - timestamp = System.currentTimeMillis(); - } - - @Override - protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException { - - KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), // - IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, // - IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, // - timestamp, Type.Put, // - value.get(), value.getOffset(), value.getLength()); - - context.write(key, kv); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java deleted file mode 100644 index 32d065a..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.invertedindex.model.IIKeyValueCodec; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.tools.DeployCoprocessorCLI; -import org.apache.kylin.metadata.realization.IRealizationConstants; - -/** - * @author George Song (ysong1) - */ -public class IICreateHTableJob extends AbstractHadoopJob { - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_HTABLE_NAME); - parseOptions(options, args); - - String tableName = getOptionValue(OPTION_HTABLE_NAME); - String iiName = getOptionValue(OPTION_II_NAME); - - KylinConfig config = KylinConfig.getInstanceFromEnv(); - IIManager iiManager = IIManager.getInstance(config); - IIInstance ii = iiManager.getII(iiName); - int sharding = ii.getDescriptor().getSharding(); - - HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); - HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY); - cf.setMaxVersions(1); - //cf.setCompressionType(Algorithm.LZO); - cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); - tableDesc.addFamily(cf); - tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); - - Configuration conf = HBaseConfiguration.create(getConf()); - if (User.isHBaseSecurityEnabled(conf)) { - // add coprocessor for bulk load - tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - } - - DeployCoprocessorCLI.deployCoprocessor(tableDesc); - - // drop the table first - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - - // create table - byte[][] splitKeys = getSplits(sharding); - if (splitKeys.length == 0) - splitKeys = null; - admin.createTable(tableDesc, splitKeys); - if (splitKeys != null) { - for (int i = 0; i < splitKeys.length; i++) { - System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i])); - } - } - System.out.println("create hbase table " + tableName + " done."); - admin.close(); - - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - //one region for one shard - private byte[][] getSplits(int shard) { - byte[][] result = new byte[shard - 1][]; - for (int i = 1; i < shard; ++i) { - byte[] split = new byte[IIKeyValueCodec.SHARD_LEN]; - BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN); - result[i - 1] = split; - } - return result; - } - - public static void main(String[] args) throws Exception { - IICreateHTableJob job = new IICreateHTableJob(); - job.setConf(HadoopUtil.getCurrentHBaseConfiguration()); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java deleted file mode 100644 index dccc594..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsCombiner.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; -import java.util.HashSet; - -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> { - - private Text outputValue = new Text(); - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - for (ByteArray value : set) { - outputValue.set(value.data); - context.write(key, outputValue); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java deleted file mode 100644 index d82386d..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.job.constant.BatchConstants; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc; -import org.apache.kylin.job.hadoop.hive.IntermediateColumnDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsJob extends AbstractHadoopJob { - protected static final Logger log = LoggerFactory.getLogger(IIDistinctColumnsJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - - try { - options.addOption(OPTION_JOB_NAME); - options.addOption(OPTION_TABLE_NAME); - options.addOption(OPTION_II_NAME); - options.addOption(OPTION_OUTPUT_PATH); - parseOptions(options, args); - - job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); - String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase(); - String iiName = getOptionValue(OPTION_II_NAME); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - - // ---------------------------------------------------------------------------- - - log.info("Starting: " + job.getJobName() + " on table " + tableName); - - IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); - IIInstance ii = iiMgr.getII(iiName); - job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName); - job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii)); - - setJobClasspath(job); - - setupMapper(); - setupReducer(output); - - return waitForCompletion(job); - - } catch (Exception e) { - printUsage(options); - throw e; - } - - } - - private String getColumns(IIInstance ii) { - IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor()); - StringBuilder buf = new StringBuilder(); - for (IntermediateColumnDesc col : iiflat.getColumnList()) { - if (buf.length() > 0) - buf.append(","); - buf.append(col.getColumnName()); - } - return buf.toString(); - } - - private void setupMapper() throws IOException { - - String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME); - String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName); - - log.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]); - - HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]); - - job.setInputFormatClass(HCatInputFormat.class); - - job.setMapperClass(IIDistinctColumnsMapper.class); - job.setCombinerClass(IIDistinctColumnsCombiner.class); - job.setMapOutputKeyClass(ShortWritable.class); - job.setMapOutputValueClass(Text.class); - } - - private void setupReducer(Path output) throws IOException { - job.setReducerClass(IIDistinctColumnsReducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(Text.class); - - FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); - - job.setNumReduceTasks(1); - - deletePath(job.getConfiguration(), output); - } - - public static void main(String[] args) throws Exception { - IIDistinctColumnsJob job = new IIDistinctColumnsJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java deleted file mode 100644 index c7d2c05..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; - -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.hive.hcatalog.data.HCatRecord; -import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; -import org.apache.hive.hcatalog.data.schema.HCatSchema; -import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; -import org.apache.kylin.common.mr.KylinMapper; -import org.apache.kylin.common.util.Bytes; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> { - - private ShortWritable outputKey = new ShortWritable(); - private Text outputValue = new Text(); - private HCatSchema schema = null; - private int columnSize = 0; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - schema = HCatInputFormat.getTableSchema(context.getConfiguration()); - columnSize = schema.getFields().size(); - } - - @Override - public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException { - - HCatFieldSchema fieldSchema = null; - for (short i = 0; i < columnSize; i++) { - outputKey.set(i); - fieldSchema = schema.get(i); - Object fieldValue = record.get(fieldSchema.getName(), schema); - if (fieldValue == null) - continue; - byte[] bytes = Bytes.toBytes(fieldValue.toString()); - outputValue.set(bytes, 0, bytes.length); - context.write(outputKey, outputValue); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java deleted file mode 100644 index 2c49d11..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import java.io.IOException; -import java.util.HashSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.ShortWritable; -import org.apache.hadoop.io.Text; -import org.apache.kylin.common.mr.KylinReducer; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.job.constant.BatchConstants; - -/** - * @author yangli9 - */ -public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> { - - private String[] columns; - - @Override - protected void setup(Context context) throws IOException { - super.publishConfiguration(context.getConfiguration()); - - Configuration conf = context.getConfiguration(); - this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(","); - } - - @Override - public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - String columnName = columns[key.get()]; - - HashSet<ByteArray> set = new HashSet<ByteArray>(); - for (Text textValue : values) { - ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); - set.add(value); - } - - Configuration conf = context.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - FSDataOutputStream out = fs.create(new Path(outputPath, columnName)); - - try { - for (ByteArray value : set) { - out.write(value.data); - out.write('\n'); - } - } finally { - out.close(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java deleted file mode 100644 index e9d8a4a..0000000 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.hadoop.invertedindex; - -import org.apache.commons.cli.Options; -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.invertedindex.IIDescManager; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.model.IIDesc; -import org.apache.kylin.job.JobInstance; -import org.apache.kylin.job.JoinedFlatTable; -import org.apache.kylin.job.cmd.ICommandOutput; -import org.apache.kylin.job.cmd.ShellCmd; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc; -import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Created by Hongbin Ma(Binmahone) on 12/30/14. - */ -public class IIFlattenHiveJob extends AbstractHadoopJob { - - protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class); - - @Override - public int run(String[] args) throws Exception { - Options options = new Options(); - try { - options.addOption(OPTION_II_NAME); - parseOptions(options, args); - - String iiname = getOptionValue(OPTION_II_NAME); - KylinConfig config = KylinConfig.getInstanceFromEnv(); - - IIInstance iiInstance = IIManager.getInstance(config).getII(iiname); - IIDesc iidesc = IIDescManager.getInstance(config).getIIDesc(iiInstance.getDescName()); - - String jobUUID = "00bf87b5-c7b5-4420-a12a-07f6b37b3187"; - JobEngineConfig engineConfig = new JobEngineConfig(config); - IJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iidesc); - final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";"; - String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobUUID); - String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, // - JobInstance.getJobWorkingDir(jobUUID, engineConfig.getHdfsWorkingDirectory()), jobUUID); - String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig); - - StringBuffer buf = new StringBuffer(); - buf.append("hive -e \""); - buf.append(useDatabaseHql + "\n"); - buf.append(dropTableHql + "\n"); - buf.append(createTableHql + "\n"); - buf.append(insertDataHqls + "\n"); - buf.append("\""); - - System.out.println(buf.toString()); - System.out.println("========================"); - - ShellCmd cmd = new ShellCmd(buf.toString(), null, null, null, false); - ICommandOutput output = cmd.execute(); - System.out.println(output.getOutput()); - System.out.println(output.getExitCode()); - - return 0; - } catch (Exception e) { - printUsage(options); - throw e; - } - } - - public static void main(String[] args) throws Exception { - IIFlattenHiveJob job = new IIFlattenHiveJob(); - int exitCode = ToolRunner.run(job, args); - System.exit(exitCode); - } -}
