Repository: kylin Updated Branches: refs/heads/master cadc85ada -> a9fe953df
KYLIN-2165 Use hive table statistics data to get the total count Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a9fe953d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a9fe953d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a9fe953d Branch: refs/heads/master Commit: a9fe953dfafae5f0aefd080d9e6c3d6a54c89fc5 Parents: cadc85a Author: shaofengshi <shaofeng...@apache.org> Authored: Mon Nov 7 14:26:27 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Mon Nov 7 14:26:37 2016 +0800 ---------------------------------------------------------------------- build/conf/kylin.properties | 3 + build/conf/kylin_hive_conf.xml | 2 +- .../apache/kylin/common/KylinConfigBase.java | 6 ++ .../org/apache/kylin/job/JoinedFlatTable.java | 7 -- .../test_case_data/sandbox/kylin.properties | 1 + .../test_case_data/sandbox/kylin_hive_conf.xml | 9 ++- .../test_case_data/sandbox/kylin_job_conf.xml | 13 ++-- .../kylin/source/hive/BeelineHiveClient.java | 17 ++++- .../apache/kylin/source/hive/CLIHiveClient.java | 6 ++ .../kylin/source/hive/HiveCmdBuilder.java | 3 +- .../apache/kylin/source/hive/HiveMRInput.java | 76 ++++++-------------- .../apache/kylin/source/hive/IHiveClient.java | 2 + tool/pom.xml | 6 +- 13 files changed, 78 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/build/conf/kylin.properties ---------------------------------------------------------------------- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 24e8f50..91aa5b8 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -101,6 +101,9 @@ kylin.job.yarn.app.rest.check.interval.seconds=10 # Hive database name for putting the intermediate flat tables kylin.job.hive.database.for.intermediatetable=default +# Whether redistribute the intermediate flat table before building +kylin.job.hive.intermediatetable.redistribute.enabled=true + # The percentage of the sampling, default 100% kylin.job.cubing.inmem.sampling.percent=100 http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/build/conf/kylin_hive_conf.xml ---------------------------------------------------------------------- diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml index 4a1b3b6..c201240 100644 --- a/build/conf/kylin_hive_conf.xml +++ b/build/conf/kylin_hive_conf.xml @@ -60,13 +60,13 @@ <value>org.apache.hadoop.io.compress.SnappyCodec</value> <description></description> </property> - --> <property> <name>mapreduce.output.fileoutputformat.compress.type</name> <value>BLOCK</value> <description>The compression type to use for job outputs</description> </property> + --> <property> <name>mapreduce.job.split.metainfo.maxsize</name> <value>-1</value> http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index acc4eb1..d9d10bb 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -629,6 +629,11 @@ abstract public class KylinConfigBase implements Serializable { return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default"); } + + public boolean isHiveRedistributeEnabled() { + return Boolean.parseBoolean(this.getOptional("kylin.job.hive.intermediatetable.redistribute.enabled", "true")); + } + public String getHiveDependencyFilterList() { return this.getOptional("kylin.job.dependency.filterlist", "[^,]*hive-exec[0-9.-]+[^,]*?\\.jar" + "|" + "[^,]*hive-metastore[0-9.-]+[^,]*?\\.jar" + "|" + "[^,]*hive-hcatalog-core[0-9.-]+[^,]*?\\.jar"); } @@ -807,6 +812,7 @@ abstract public class KylinConfigBase implements Serializable { setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize)); } + @Deprecated public String getCreateFlatHiveTableMethod() { return getOptional("kylin.hive.create.flat.table.method", "1"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 61b5b11..6c08bc9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -218,13 +218,6 @@ public class JoinedFlatTable { return hiveDataType.toLowerCase(); } - public static String generateSelectRowCountStatement(IJoinedFlatTableDesc flatDesc, String outputDir) { - StringBuilder sql = new StringBuilder(); - sql.append("set hive.exec.compress.output=false;\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + flatDesc.getTableName() + ";\n"); - return sql.toString(); - } - public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) { final String tableName = flatDesc.getTableName(); StringBuilder sql = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 8f3075e..43b0855 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -34,6 +34,7 @@ kylin.rest.timezone=GMT+8 # Hive client, valid value [cli, beeline] kylin.hive.client=cli +#kylin.hive.beeline.params=-n root --hiveconf hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*' -u 'jdbc:hive2://localhost:10000' ### STORAGE ### http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/examples/test_case_data/sandbox/kylin_hive_conf.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin_hive_conf.xml b/examples/test_case_data/sandbox/kylin_hive_conf.xml index 593d4f8..c85731d 100644 --- a/examples/test_case_data/sandbox/kylin_hive_conf.xml +++ b/examples/test_case_data/sandbox/kylin_hive_conf.xml @@ -45,16 +45,23 @@ <value>org.apache.hadoop.io.compress.SnappyCodec</value> <description></description> </property> + + <property> + <name>hive.execution.engine</name> + <value>mr</value> + <description></description> + </property> + <!-- <property> <name>mapreduce.output.fileoutputformat.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> <description></description> </property> - --> <property> <name>hive.merge.size.per.task</name> <value>32000000</value> <description>Size for the merged file</description> </property> + --> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/examples/test_case_data/sandbox/kylin_job_conf.xml ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml index 27f4fb8..c2046e6 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf.xml @@ -32,37 +32,38 @@ </property> <property> - <name>mapreduce.map.output.compress</name> + <name>mapreduce.map.output.compress</name> <value>true</value> <description>Compress map outputs</description> </property> <property> - <name>mapreduce.map.output.compress.codec</name> + <name>mapreduce.map.output.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> <description>The compression codec to use for map outputs </description> </property> <property> - <name>mapreduce.output.fileoutputformat.compress</name> + <name>mapreduce.output.fileoutputformat.compress</name> <value>true</value> <description>Compress the output of a MapReduce job</description> </property> <property> - <name>mapreduce.output.fileoutputformat.compress.codec</name> + <name>mapreduce.output.fileoutputformat.compress.codec</name> <value>org.apache.hadoop.io.compress.SnappyCodec</value> <description>The compression codec to use for job outputs </description> </property> + <!-- <property> - <name>mapreduce.output.fileoutputformat.compress.type</name> + <name>mapreduce.output.fileoutputformat.compress.type</name> <value>BLOCK</value> <description>The compression type to use for job outputs</description> </property> - + --> <property> <name>mapreduce.job.max.split.locations</name> <value>2000</value> http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java index fb6ddc1..47b551b 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java @@ -29,10 +29,10 @@ import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.kylin.common.util.DBUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.kylin.common.util.DBUtils; public class BeelineHiveClient implements IHiveClient { @@ -102,6 +102,21 @@ public class BeelineHiveClient implements IHiveClient { } @Override + public long getHiveTableRows(String database, String tableName) throws Exception { + ResultSet resultSet = null; + long count = 0; + try { + resultSet = stmt.executeQuery("select count(*) from " + database + "." + tableName); + if (resultSet.next()) { + count = resultSet.getLong(1); + } + } finally { + DBUtils.closeQuietly(resultSet); + } + return count; + } + + @Override public void executeHQL(String hql) throws CommandNeedRetryException, IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java index 60cf47a..5a17f1f 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java @@ -123,6 +123,12 @@ public class CLIHiveClient implements IHiveClient { return getMetaStoreClient().getAllTables(database); } + @Override + public long getHiveTableRows(String database, String tableName) throws Exception { + Table table = getMetaStoreClient().getTable(database, tableName); + return getBasicStatForTable(new org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.ROW_COUNT); + } + private HiveMetaStoreClient getMetaStoreClient() throws Exception { if (metaStoreClient == null) { metaStoreClient = new HiveMetaStoreClient(hiveConf); http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java index bce85b8..844cf12 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java @@ -75,8 +75,7 @@ public class HiveCmdBuilder { buf.append(kylinConfig.getHiveBeelineParams()); buf.append(" -f "); buf.append(tmpHql.getAbsolutePath()); - buf.append(";rm -f "); - buf.append(tmpHql.getAbsolutePath()); + buf.append(";"); logger.info("The statements to execute in beeline: \n" + hqlBuf); if (logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 2f056c5..e0853b0 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,11 +19,9 @@ package org.apache.kylin.source.hive; import java.io.IOException; -import java.io.InputStream; import java.util.Map; import java.util.Set; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,7 +30,6 @@ import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BufferedLogger; -import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -62,7 +59,6 @@ import com.google.common.collect.Sets; public class HiveMRInput implements IMRInput { - @Override public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { return new BatchCubingInputSide(flatDesc); @@ -136,7 +132,9 @@ public class HiveMRInput implements IMRInput { if ("1".equals(createFlatTableMethod)) { // create flat table first, then count and redistribute jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, false, "")); - jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); + if (kylinConfig.isHiveRedistributeEnabled() == true) { + jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); + } } else if ("2".equals(createFlatTableMethod)) { // count from source table first, and then redistribute, suitable for partitioned table final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; @@ -158,19 +156,16 @@ public class HiveMRInput implements IMRInput { hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig(); appendHiveOverrideProperties(kylinConfig, hiveInitBuf); - String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); step.setInitStatement(hiveInitBuf.toString()); - step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir)); - step.setRowCountOutputDir(rowCountOutputDir); + step.setIntermediateTable(flatTableDesc.getTableName()); step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc)); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE); return step; } - public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { final ShellExecutable step = new ShellExecutable(); @@ -187,7 +182,6 @@ public class HiveMRInput implements IMRInput { return step; } - public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); @@ -272,32 +266,9 @@ public class HiveMRInput implements IMRInput { public static class RedistributeFlatHiveTableStep extends AbstractExecutable { private final BufferedLogger stepLogger = new BufferedLogger(logger); - private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); - hiveCmdBuilder.addStatement(getInitStatement()); - hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); - hiveCmdBuilder.addStatement(getSelectRowCountStatement()); - final String cmd = hiveCmdBuilder.build(); - - stepLogger.log("Compute row count of flat hive table, cmd: "); - stepLogger.log(cmd); - - Pair<Integer, String> response = cmdExecutor.execute(cmd, stepLogger); - if (response.getFirst() != 0) { - throw new RuntimeException("Failed to compute row count of flat hive table"); - } - } - - private long readRowCountFromFile(Path file) throws IOException { - FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration()); - InputStream in = fs.open(file); - try { - String content = IOUtils.toString(in, "UTF-8"); - return Long.valueOf(content.trim()); // strip the '\n' character - - } finally { - IOUtils.closeQuietly(in); - } + private long computeRowCount(String database, String table) throws Exception { + IHiveClient hiveClient = HiveClientFactory.getHiveClient(); + return hiveClient.getHiveTableRows(database, table); } private void redistributeTable(KylinConfig config, int numReducers) throws IOException { @@ -327,13 +298,19 @@ public class HiveMRInput implements IMRInput { @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = getCubeSpecificConfig(); + String intermediateTable = getIntermediateTable(); + String database, tableName; + if (intermediateTable.indexOf(".") > 0) { + database = intermediateTable.substring(0, intermediateTable.indexOf(".")); + tableName = intermediateTable.substring(intermediateTable.indexOf(".") + 1); + } else { + database = config.getHiveDatabaseForIntermediateTable(); + tableName = intermediateTable; + } try { - - computeRowCount(config.getCliCommandExecutor()); - - Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0"); - long rowCount = readRowCountFromFile(rowCountFile); + long rowCount = computeRowCount(database, tableName); + logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); if (!config.isEmptySegmentAllowed() && rowCount == 0) { stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); @@ -366,14 +343,6 @@ public class HiveMRInput implements IMRInput { return getParam("HiveInit"); } - public void setSelectRowCountStatement(String sql) { - setParam("HiveSelectRowCount", sql); - } - - public String getSelectRowCountStatement() { - return getParam("HiveSelectRowCount"); - } - public void setRedistributeDataStatement(String sql) { setParam("HiveRedistributeData", sql); } @@ -382,12 +351,12 @@ public class HiveMRInput implements IMRInput { return getParam("HiveRedistributeData"); } - public void setRowCountOutputDir(String rowCountOutputDir) { - setParam("rowCountOutputDir", rowCountOutputDir); + public String getIntermediateTable() { + return getParam("intermediateTable"); } - public String getRowCountOutputDir() { - return getParam("rowCountOutputDir"); + public void setIntermediateTable(String intermediateTable) { + setParam("intermediateTable", intermediateTable); } } @@ -457,7 +426,6 @@ public class HiveMRInput implements IMRInput { } - private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) { final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride(); if (hiveConfOverride.isEmpty() == false) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java index f218cce..22bea46 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/IHiveClient.java @@ -33,4 +33,6 @@ public interface IHiveClient { List<String> getHiveDbNames() throws Exception; List<String> getHiveTableNames(String database) throws Exception; + + long getHiveTableRows(String database, String tableName) throws Exception; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a9fe953d/tool/pom.xml ---------------------------------------------------------------------- diff --git a/tool/pom.xml b/tool/pom.xml index e3d7bfa..3d341d8 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -61,7 +61,11 @@ <type>test-jar</type> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-source-hive</artifactId> + </dependency> + </dependencies>