Repository: kylin
Updated Branches:
  refs/heads/master 819e660ee -> 554874db3


minor refactor


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/554874db
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/554874db
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/554874db

Branch: refs/heads/master
Commit: 554874db3af85a9cf8dd1989525511d566dec2af
Parents: 819e660
Author: Hongbin Ma <[email protected]>
Authored: Wed Jul 20 11:26:08 2016 +0800
Committer: Hongbin Ma <[email protected]>
Committed: Wed Jul 20 11:26:12 2016 +0800

----------------------------------------------------------------------
 .../gtrecord/GTCubeStorageQueryBase.java        | 11 +++++++++-
 .../engine/mr/common/AbstractHadoopJob.java     | 21 +++++++++++++++-----
 .../storage/hbase/util/StorageCleanupJob.java   |  3 +--
 3 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index e58e74a..65aa90a 100644
--- 
a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ 
b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -112,7 +112,12 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         for (CubeSegment cubeSeg : 
cubeInstance.getSegments(SegmentStatusEnum.READY)) {
             CubeSegmentScanner scanner;
             if (cubeSeg.getInputRecords() == 0) {
-                logger.warn("cube segment {} input record is 0, " + "it may 
caused by kylin failed to the job counter " + "as the hadoop history server 
wasn't running", cubeSeg);
+                if (!skipZeroInputSegment(cubeSeg)) {
+                    logger.warn("cube segment {} input record is 0, " + "it 
may caused by kylin failed to the job counter " + "as the hadoop history server 
wasn't running", cubeSeg);
+                } else {
+                    logger.warn("cube segment {} input record is 0, skip it ", 
cubeSeg);
+                    continue;
+                }
             }
             scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, 
groupsD, metrics, filterD, context, getGTStorage());
             scanners.add(scanner);
@@ -124,6 +129,10 @@ public abstract class GTCubeStorageQueryBase implements 
IStorageQuery {
         return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, 
metrics, returnTupleInfo, context);
     }
 
+    protected boolean skipZeroInputSegment(CubeSegment cubeSegment) {
+        return false;
+    }
+
     protected abstract String getGTStorage();
 
     private void buildDimensionsAndMetrics(SQLDigest sqlDigest, 
Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index b483dc1..02928e0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -371,33 +371,44 @@ public abstract class AbstractHadoopJob extends 
Configured implements Tool {
         return classpath;
     }
 
-    public static void addInputDirs(String input, Job job) throws IOException {
-        addInputDirs(StringSplitter.split(input, ","), job);
+    public static int addInputDirs(String input, Job job) throws IOException {
+        int folderNum = addInputDirs(StringSplitter.split(input, ","), job);
+        logger.info("Number of added folders:" + folderNum);
+        return folderNum;
     }
 
-    public static void addInputDirs(String[] inputs, Job job) throws 
IOException {
+    public static int addInputDirs(String[] inputs, Job job) throws 
IOException {
+        int ret = 0;//return number of added folders
         for (String inp : inputs) {
             inp = inp.trim();
             if (inp.endsWith("/*")) {
                 inp = inp.substring(0, inp.length() - 2);
                 FileSystem fs = FileSystem.get(job.getConfiguration());
                 Path path = new Path(inp);
+
+                if (!fs.exists(path)) {
+                    logger.warn("Path not exist:" + path.toString());
+                    continue;
+                }
+
                 FileStatus[] fileStatuses = fs.listStatus(path);
                 boolean hasDir = false;
                 for (FileStatus stat : fileStatuses) {
                     if (stat.isDirectory() && 
!stat.getPath().getName().startsWith("_")) {
                         hasDir = true;
-                        addInputDirs(stat.getPath().toString(), job);
+                        ret += addInputDirs(new String[] { 
stat.getPath().toString() }, job);
                     }
                 }
                 if (fileStatuses.length > 0 && !hasDir) {
-                    addInputDirs(path.toString(), job);
+                    ret += addInputDirs(new String[] { path.toString() }, job);
                 }
             } else {
                 logger.debug("Add input " + inp);
                 FileInputFormat.addInputPath(job, new Path(inp));
+                ret++;
             }
         }
+        return ret;
     }
 
     public static KylinConfig loadKylinPropsAndMetadata() throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/554874db/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 249f506..4bd2c53 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -50,7 +50,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.manager.ExecutableManager;
@@ -210,7 +209,7 @@ public class StorageCleanupJob extends AbstractApplication {
             for (CubeSegment seg : cube.getSegments()) {
                 String jobUuid = seg.getLastBuildJobID();
                 if (jobUuid != null && jobUuid.equals("") == false) {
-                    String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),jobUuid);
+                    String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobUuid);
                     allHdfsPathsNeedToBeDeleted.remove(path);
                     logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg + " of cube " + cube.getName());
                 }

Reply via email to