KYLIN-2532 make Hive flat step more extensible
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/38fe432d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/38fe432d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/38fe432d Branch: refs/heads/KYLIN-2506 Commit: 38fe432d914d0bde8a0ddda50a6867ea42b5ccbf Parents: d216e22 Author: Roger Shi <rogershijich...@hotmail.com> Authored: Wed Apr 5 13:12:00 2017 +0800 Committer: Roger Shi <rogershijich...@gmail.com> Committed: Wed Apr 5 00:20:55 2017 -0500 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++++++++ .../org/apache/kylin/job/JoinedFlatTable.java | 18 +++++++++++++++++- .../org/apache/kylin/source/hive/HiveMRInput.java | 5 +++++ 3 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 760b7da..4361242 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -448,6 +448,14 @@ abstract public class KylinConfigBase implements Serializable { return Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0")); } + public boolean isAdvancedFlatTableUsed() { + return Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", "false")); + } + + public String getAdvancedFlatTableClass() { + return getOptional("kylin.job.advanced-flat-table.class"); + } + // ============================================================================ // SOURCE.HIVE // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 9ed563f..7313630 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -19,12 +19,15 @@ package org.apache.kylin.job; import java.io.File; +import java.lang.reflect.Method; import java.util.HashSet; import java.util.Set; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -99,9 +102,22 @@ public class JoinedFlatTable { } public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig) { + final KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig(); StringBuilder sql = new StringBuilder(); + + if (kylinConfig.isAdvancedFlatTableUsed()) { + try { + Class advancedFlatTable = Class.forName(kylinConfig.getAdvancedFlatTableClass()); + Method method = advancedFlatTable.getMethod("generateInsertDataStatement", IJoinedFlatTableDesc.class, JobEngineConfig.class); + return (String) method.invoke(null, flatDesc, engineConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + sql.append(generateHiveSetStatements(engineConfig)); - sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";").append("\n"); + sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";\n"); + return sql.toString(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index bbf3c60..67e811a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -19,9 +19,12 @@ package org.apache.kylin.source.hive; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Map; import java.util.Set; +import org.apache.commons.lang.IncompleteArgumentException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,6 +61,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; +import javax.el.MethodNotFoundException; + public class HiveMRInput implements IMRInput { public static String getTableNameForHCat(TableDesc table, boolean isFullTable) {