KYLIN-2004 check whether source data is empty

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/56136ede
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/56136ede
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/56136ede

Branch: refs/heads/KYLIN-1726
Commit: 56136ede7c8b9abac5ddd7b7785b3f63c59b74db
Parents: 233a699
Author: shaofengshi <shaofeng...@apache.org>
Authored: Sat Sep 10 17:52:32 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Sat Sep 10 17:59:59 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/source/hive/HiveMRInput.java   | 37 ++++++++++----------
 1 file changed, 19 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/56136ede/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3ea9af5..520d7cc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -281,23 +281,6 @@ public class HiveMRInput implements IMRInput {
             }
         }
 
-        private int determineNumReducer(KylinConfig config) throws IOException 
{
-            computeRowCount(config.getCliCommandExecutor());
-
-            Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-            long rowCount = readRowCountFromFile(rowCountFile);
-            int mapperInputRows = config.getHadoopJobMapperInputRows();
-
-            int numReducers = Math.round(rowCount / ((float) mapperInputRows));
-            numReducers = Math.max(1, numReducers);
-
-            stepLogger.log("total input rows = " + rowCount);
-            stepLogger.log("expected input rows per mapper = " + 
mapperInputRows);
-            stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " 
+ numReducers);
-
-            return numReducers;
-        }
-
         private void redistributeTable(KylinConfig config, int numReducers) 
throws IOException {
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             hiveCmdBuilder.addStatement(getInitStatement());
@@ -327,7 +310,25 @@ public class HiveMRInput implements IMRInput {
             KylinConfig config = getCubeSpecificConfig();
 
             try {
-                int numReducers = determineNumReducer(config);
+
+                computeRowCount(config.getCliCommandExecutor());
+
+                Path rowCountFile = new Path(getRowCountOutputDir(), 
"000000_0");
+                long rowCount = readRowCountFromFile(rowCountFile);
+                if (!config.isEmptySegmentAllowed() && rowCount == 0) {
+                    stepLogger.log("Detect upstream hive table is empty, " + 
"fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
+                    return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
+                }
+
+                int mapperInputRows = config.getHadoopJobMapperInputRows();
+
+                int numReducers = Math.round(rowCount / ((float) 
mapperInputRows));
+                numReducers = Math.max(1, numReducers);
+
+                stepLogger.log("total input rows = " + rowCount);
+                stepLogger.log("expected input rows per mapper = " + 
mapperInputRows);
+                stepLogger.log("num reducers for RedistributeFlatHiveTableStep 
= " + numReducers);
+
                 redistributeTable(config, numReducers);
                 return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
 

Reply via email to