Repository: kylin Updated Branches: refs/heads/1.5.4-rc1 639e4b015 -> 7c5c6c1c6
KYLIN-2004 Make the creating intermediate hive table steps configurable Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7c5c6c1c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7c5c6c1c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7c5c6c1c Branch: refs/heads/1.5.4-rc1 Commit: 7c5c6c1c681154d62adbf18285b80f3ebed7204b Parents: 639e4b0 Author: shaofengshi <shaofeng...@apache.org> Authored: Fri Sep 9 19:04:10 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Fri Sep 9 19:04:23 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/KylinConfigBase.java | 4 +- .../org/apache/kylin/job/JoinedFlatTable.java | 48 ++++-- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/job/execution/AbstractExecutable.java | 2 +- .../apache/kylin/job/JoinedFlatTableTest.java | 2 +- .../kylin/metadata/model/DataModelDesc.java | 8 +- ...t_kylin_cube_without_slr_left_join_desc.json | 3 +- .../kylin/rest/controller/CubeController.java | 2 +- .../source/hive/CreateFlatHiveTableStep.java | 32 +++- .../apache/kylin/source/hive/HiveMRInput.java | 169 ++++++++++++++++++- 10 files changed, 234 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 2ac9d48..de9051c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -805,7 +805,7 @@ abstract public class KylinConfigBase implements Serializable { setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize)); } - public boolean getTableJoinTypeCheck() { - return Boolean.valueOf(this.getOptional("kylin.table.join.strong.check", "true")); + public String getCreateFlatHiveTableMethod() { + return getOptional("kylin.hive.create.flat.table.method", "1"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index b39265d..699d084 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -107,14 +107,14 @@ public class JoinedFlatTable { return ddl.toString(); } - public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) { + public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig, boolean redistribute) { StringBuilder sql = new StringBuilder(); sql.append(generateHiveSetStatements(engineConfig)); - sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n"); + sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc, redistribute) + ";").append("\n"); return sql.toString(); } - public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) { + public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc, boolean redistribute) { StringBuilder sql = new StringBuilder(); sql.append("SELECT" + "\n"); String tableAlias; @@ -129,7 +129,15 @@ public class JoinedFlatTable { } appendJoinStatement(flatDesc, sql, tableAliasMap); appendWhereStatement(flatDesc, sql, tableAliasMap); - appendDistributeStatement(flatDesc, sql, tableAliasMap); + if (redistribute == true) { + String redistributeCol = null; + TblColRef distDcol = flatDesc.getDistributedBy(); + if (distDcol != null) { + String tblAlias = tableAliasMap.get(distDcol.getTable()); + redistributeCol = tblAlias + "." + distDcol.getName(); + } + appendDistributeStatement(sql, redistributeCol); + } return sql.toString(); } @@ -228,14 +236,11 @@ public class JoinedFlatTable { return result; } - private static void appendDistributeStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql, Map<String, String> tableAliasMap) { - TblColRef distDcol = flatDesc.getDistributedBy(); - - if (distDcol != null) { - String tblAlias = tableAliasMap.get(distDcol.getTable()); - sql.append(" DISTRIBUTE BY ").append(tblAlias).append(".").append(distDcol.getName()); + private static void appendDistributeStatement(StringBuilder sql, String redistributeCol) { + if (redistributeCol != null) { + sql.append(" DISTRIBUTE BY ").append(redistributeCol).append(";\n"); } else { - sql.append(" DISTRIBUTE BY RAND()"); + sql.append(" DISTRIBUTE BY RAND()").append(";\n"); } } @@ -280,4 +285,25 @@ public class JoinedFlatTable { return hiveDataType.toLowerCase(); } + public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { + StringBuilder sql = new StringBuilder(); + sql.append("set hive.exec.compress.output=false;\n"); + sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"); + return sql.toString(); + } + + public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc intermediateTableDesc) { + final String tableName = intermediateTableDesc.getTableName(); + StringBuilder sql = new StringBuilder(); + sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName); + + String redistributeCol = null; + TblColRef distDcol = intermediateTableDesc.getDistributedBy(); + if (distDcol != null) { + redistributeCol = colName(distDcol.getCanonicalName()); + } + appendDistributeStatement(sql, redistributeCol); + return sql.toString(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 6084e7b..893c034 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -56,5 +56,6 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile"; public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info"; + public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table"; public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Env: ${env_name}</li>" + "<li>Project: ${project_name}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 4dedad1..09f9b54 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -49,7 +49,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final String START_TIME = "startTime"; protected static final String END_TIME = "endTime"; - private static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); + protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); protected int retry = 0; private String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java index 0faf22a..1fe47f8 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java @@ -77,7 +77,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenerateInsertSql() throws IOException { - String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()), true); System.out.println(sqls); int length = sqls.length(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java index 7f5edfe..d04830b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java @@ -314,13 +314,7 @@ public class DataModelDesc extends RootPersistentEntity { } for (int i = 0; i < fkCols.length; i++) { if (!fkCols[i].getDatatype().equals(pkCols[i].getDatatype())) { - final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - final String msg = "Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype(); - if (kylinConfig.getTableJoinTypeCheck() == true) { - throw new IllegalStateException(msg); - } else { - logger.warn(msg); - } + logger.warn("Primary key " + lookup.getTable() + "." + pkCols[i].getName() + "." + pkCols[i].getDatatype() + " are not consistent with Foreign key " + this.getFactTable() + "." + fkCols[i].getName() + "." + fkCols[i].getDatatype()); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json index ca1b35c..0470dc6 100644 --- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json +++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json @@ -289,7 +289,8 @@ "engine_type" : 2, "storage_type" : 2, "override_kylin_properties": { - "kylin.job.cubing.inmem.sampling.hll.precision": "16" + "kylin.job.cubing.inmem.sampling.hll.precision": "16", + "kylin.hive.create.flat.table.method": "2" }, "partition_date_start": 0 } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 7081d02..5397df7 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -152,7 +152,7 @@ public class CubeController extends BasicController { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY); IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); - String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc); + String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc, false); GeneralResponse repsonse = new GeneralResponse(); repsonse.setProperty("sql", sql); http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index cd32f9c..bcb9a38 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -76,8 +76,11 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement(getInitStatement()); - hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); - hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge + boolean useRedistribute = getUseRedistribute(); + if (useRedistribute == true) { + hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); + hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); //disable merge + } hiveCmdBuilder.addStatement(getCreateTableStatement()); final String cmd = hiveCmdBuilder.toString(); @@ -101,13 +104,20 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); try { - long rowCount = readRowCountFromFile(); - if (!config.isEmptySegmentAllowed() && rowCount == 0) { - stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); - return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + + boolean useRedistribute = getUseRedistribute(); + + int numReducers = 0; + if (useRedistribute == true) { + long rowCount = readRowCountFromFile(); + if (!config.isEmptySegmentAllowed() && rowCount == 0) { + stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + + numReducers = determineNumReducer(config, rowCount); } - int numReducers = determineNumReducer(config, rowCount); createFlatHiveTable(config, numReducers); return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); @@ -125,6 +135,14 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { return getParam("HiveInit"); } + public void setUseRedistribute(boolean useRedistribute) { + setParam("useRedistribute", String.valueOf(useRedistribute)); + } + + public boolean getUseRedistribute() { + return Boolean.valueOf(getParam("useRedistribute")); + } + public void setCreateTableStatement(String sql) { setParam("HiveRedistributeData", sql); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7c5c6c1c/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index e3d7879..3ea9af5 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,8 +19,10 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.io.InputStream; import java.util.Set; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,6 +30,11 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; @@ -110,16 +117,46 @@ public class HiveMRInput implements IMRInput { public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); - final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; + final KylinConfig kylinConfig = CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig(); + + String createFlatTableMethod = kylinConfig.getCreateFlatHiveTableMethod(); + if ("1".equals(createFlatTableMethod)) { + // create flat table first, then count and redistribute + jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, false, "")); + jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); + } else if ("2".equals(createFlatTableMethod)) { + // count from source table first, and then redistribute, suitable for partitioned table + final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; + jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir)); + jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir)); + } else { + throw new IllegalArgumentException("Unknown value for kylin.hive.create.flat.table.method: " + createFlatTableMethod); + } - jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir)); - jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, rowCountOutputDir)); AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId()); if (task != null) { jobFlow.addTask(task); } } + public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { + StringBuilder hiveInitBuf = new StringBuilder(); + hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); + hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); + + String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; + + RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); + step.setInitStatement(hiveInitBuf.toString()); + step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir)); + step.setRowCountOutputDir(rowCountOutputDir); + step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc)); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); + return step; + } + + public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { final ShellExecutable step = new ShellExecutable(); @@ -174,17 +211,17 @@ public class HiveMRInput implements IMRInput { return step; } - public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, String rowCountOutputDir) { + public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName, boolean redistribute, String rowCountOutputDir) { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); - String insertDataHqls; - insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf); + String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute); CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setUseRedistribute(redistribute); step.setInitStatement(hiveInitBuf.toString()); step.setRowCountOutputDir(rowCountOutputDir); step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls); @@ -213,6 +250,126 @@ public class HiveMRInput implements IMRInput { } } + public static class RedistributeFlatHiveTableStep extends AbstractExecutable { + private final BufferedLogger stepLogger = new BufferedLogger(logger); + + private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(getInitStatement()); + hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); + hiveCmdBuilder.addStatement(getSelectRowCountStatement()); + final String cmd = hiveCmdBuilder.build(); + + stepLogger.log("Compute row count of flat hive table, cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to compute row count of flat hive table"); + } + } + + private long readRowCountFromFile(Path file) throws IOException { + FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); + InputStream in = fs.open(file); + try { + String content = IOUtils.toString(in); + return Long.valueOf(content.trim()); // strip the '\n' character + + } finally { + IOUtils.closeQuietly(in); + } + } + + private int determineNumReducer(KylinConfig config) throws IOException { + computeRowCount(config.getCliCommandExecutor()); + + Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); + long rowCount = readRowCountFromFile(rowCountFile); + int mapperInputRows = config.getHadoopJobMapperInputRows(); + + int numReducers = Math.round(rowCount / ((float) mapperInputRows)); + numReducers = Math.max(1, numReducers); + + stepLogger.log("total input rows = " + rowCount); + stepLogger.log("expected input rows per mapper = " + mapperInputRows); + stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers); + + return numReducers; + } + + private void redistributeTable(KylinConfig config, int numReducers) throws IOException { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement(getInitStatement()); + hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); + hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); + hiveCmdBuilder.addStatement(getRedistributeDataStatement()); + final String cmd = hiveCmdBuilder.toString(); + + stepLogger.log("Redistribute table, cmd: "); + stepLogger.log(cmd); + + Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + if (response.getFirst() != 0) { + throw new RuntimeException("Failed to redistribute flat hive table"); + } + } + + private KylinConfig getCubeSpecificConfig() { + String cubeName = CubingExecutableUtil.getCubeName(getParams()); + CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = manager.getCube(cubeName); + return cube.getConfig(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = getCubeSpecificConfig(); + + try { + int numReducers = determineNumReducer(config); + redistributeTable(config, numReducers); + return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog()); + + } catch (Exception e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); + } + } + + public void setInitStatement(String sql) { + setParam("HiveInit", sql); + } + + public String getInitStatement() { + return getParam("HiveInit"); + } + + public void setSelectRowCountStatement(String sql) { + setParam("HiveSelectRowCount", sql); + } + + public String getSelectRowCountStatement() { + return getParam("HiveSelectRowCount"); + } + + public void setRedistributeDataStatement(String sql) { + setParam("HiveRedistributeData", sql); + } + + public String getRedistributeDataStatement() { + return getParam("HiveRedistributeData"); + } + + public void setRowCountOutputDir(String rowCountOutputDir) { + setParam("rowCountOutputDir", rowCountOutputDir); + } + + public String getRowCountOutputDir() { + return getParam("rowCountOutputDir"); + } + } + public static class GarbageCollectionStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);