KYLIN-2004 check whether source data is empty Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/56136ede Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/56136ede Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/56136ede
Branch: refs/heads/KYLIN-1726 Commit: 56136ede7c8b9abac5ddd7b7785b3f63c59b74db Parents: 233a699 Author: shaofengshi <shaofeng...@apache.org> Authored: Sat Sep 10 17:52:32 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sat Sep 10 17:59:59 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/source/hive/HiveMRInput.java | 37 ++++++++++---------- 1 file changed, 19 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/56136ede/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 3ea9af5..520d7cc 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -281,23 +281,6 @@ public class HiveMRInput implements IMRInput { } } - private int determineNumReducer(KylinConfig config) throws IOException { - computeRowCount(config.getCliCommandExecutor()); - - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - long rowCount = readRowCountFromFile(rowCountFile); - int mapperInputRows = config.getHadoopJobMapperInputRows(); - - int numReducers = Math.round(rowCount / ((float) mapperInputRows)); - numReducers = Math.max(1, numReducers); - - stepLogger.log("total input rows = " + rowCount); - stepLogger.log("expected input rows per mapper = " + mapperInputRows); - stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); - - return numReducers; - } - private void redistributeTable(KylinConfig config, int numReducers) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(getInitStatement()); @@ -327,7 +310,25 @@ public class HiveMRInput implements IMRInput { KylinConfig config = getCubeSpecificConfig(); try { - int numReducers = determineNumReducer(config); + + computeRowCount(config.getCliCommandExecutor()); + + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + long rowCount = readRowCountFromFile(rowCountFile); + if (!config.isEmptySegmentAllowed() && rowCount == 0) { + stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + + int mapperInputRows = config.getHadoopJobMapperInputRows(); + + int numReducers = Math.round(rowCount / ((float) mapperInputRows)); + numReducers = Math.max(1, numReducers); + + stepLogger.log("total input rows = " + rowCount); + stepLogger.log("expected input rows per mapper = " + mapperInputRows); + stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); + redistributeTable(config, numReducers); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());