Repository: kylin Updated Branches: refs/heads/master 819e660ee -> 554874db3
minor refactor Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/554874db Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/554874db Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/554874db Branch: refs/heads/master Commit: 554874db3af85a9cf8dd1989525511d566dec2af Parents: 819e660 Author: Hongbin Ma <[email protected]> Authored: Wed Jul 20 11:26:08 2016 +0800 Committer: Hongbin Ma <[email protected]> Committed: Wed Jul 20 11:26:12 2016 +0800 ---------------------------------------------------------------------- .../gtrecord/GTCubeStorageQueryBase.java | 11 +++++++++- .../engine/mr/common/AbstractHadoopJob.java | 21 +++++++++++++++----- .../storage/hbase/util/StorageCleanupJob.java | 3 +-- 3 files changed, 27 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index e58e74a..65aa90a 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -112,7 +112,12 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) { CubeSegmentScanner scanner; if (cubeSeg.getInputRecords() == 0) { - logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); + if (!skipZeroInputSegment(cubeSeg)) { + logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg); + } else { + logger.warn("cube segment {} input record is 0, skip it ", cubeSeg); + continue; + } } scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage()); scanners.add(scanner); @@ -124,6 +129,10 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context); } + protected boolean skipZeroInputSegment(CubeSegment cubeSegment) { + return false; + } + protected abstract String getGTStorage(); private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) { http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index b483dc1..02928e0 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -371,33 +371,44 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { return classpath; } - public static void addInputDirs(String input, Job job) throws IOException { - addInputDirs(StringSplitter.split(input, ","), job); + public static int addInputDirs(String input, Job job) throws IOException { + int folderNum = addInputDirs(StringSplitter.split(input, ","), job); + logger.info("Number of added folders:" + folderNum); + return folderNum; } - public static void addInputDirs(String[] inputs, Job job) throws IOException { + public static int addInputDirs(String[] inputs, Job job) throws IOException { + int ret = 0;//return number of added folders for (String inp : inputs) { inp = inp.trim(); if (inp.endsWith("/*")) { inp = inp.substring(0, inp.length() - 2); FileSystem fs = FileSystem.get(job.getConfiguration()); Path path = new Path(inp); + + if (!fs.exists(path)) { + logger.warn("Path not exist:" + path.toString()); + continue; + } + FileStatus[] fileStatuses = fs.listStatus(path); boolean hasDir = false; for (FileStatus stat : fileStatuses) { if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) { hasDir = true; - addInputDirs(stat.getPath().toString(), job); + ret += addInputDirs(new String[] { stat.getPath().toString() }, job); } } if (fileStatuses.length > 0 && !hasDir) { - addInputDirs(path.toString(), job); + ret += addInputDirs(new String[] { path.toString() }, job); } } else { logger.debug("Add input " + inp); FileInputFormat.addInputPath(job, new Path(inp)); + ret++; } } + return ret; } public static KylinConfig loadKylinPropsAndMetadata() throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index 249f506..4bd2c53 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -50,7 +50,6 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; @@ -210,7 +209,7 @@ public class StorageCleanupJob extends AbstractApplication { for (CubeSegment seg : cube.getSegments()) { String jobUuid = seg.getLastBuildJobID(); if (jobUuid != null && jobUuid.equals("") == false) { - String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),jobUuid); + String path = JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), jobUuid); allHdfsPathsNeedToBeDeleted.remove(path); logger.info("Skip " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName()); }
