KYLIN-2532 make Hive flat step more extensible

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/38fe432d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/38fe432d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/38fe432d

Branch: refs/heads/KYLIN-2506
Commit: 38fe432d914d0bde8a0ddda50a6867ea42b5ccbf
Parents: d216e22
Author: Roger Shi <rogershijich...@hotmail.com>
Authored: Wed Apr 5 13:12:00 2017 +0800
Committer: Roger Shi <rogershijich...@gmail.com>
Committed: Wed Apr 5 00:20:55 2017 -0500

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfigBase.java  |  8 ++++++++
 .../org/apache/kylin/job/JoinedFlatTable.java     | 18 +++++++++++++++++-
 .../org/apache/kylin/source/hive/HiveMRInput.java |  5 +++++
 3 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 760b7da..4361242 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -448,6 +448,14 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(getOptional("kylin.job.error-record-threshold", "0"));
     }
 
+    public boolean isAdvancedFlatTableUsed() {
+        return 
Boolean.parseBoolean(getOptional("kylin.job.use-advanced-flat-table", "false"));
+    }
+
+    public String getAdvancedFlatTableClass() {
+        return getOptional("kylin.job.advanced-flat-table.class");
+    }
+
     // 
============================================================================
     // SOURCE.HIVE
     // 
============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9ed563f..7313630 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -19,12 +19,15 @@
 package org.apache.kylin.job;
 
 import java.io.File;
+import java.lang.reflect.Method;
 import java.util.HashSet;
 import java.util.Set;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -99,9 +102,22 @@ public class JoinedFlatTable {
     }
 
     public static String generateInsertDataStatement(IJoinedFlatTableDesc 
flatDesc, JobEngineConfig engineConfig) {
+        final KylinConfig kylinConfig = ((CubeSegment) 
flatDesc.getSegment()).getConfig();
         StringBuilder sql = new StringBuilder();
+
+        if (kylinConfig.isAdvancedFlatTableUsed()) {
+            try {
+                Class advancedFlatTable = 
Class.forName(kylinConfig.getAdvancedFlatTableClass());
+                Method method = 
advancedFlatTable.getMethod("generateInsertDataStatement", 
IJoinedFlatTableDesc.class, JobEngineConfig.class);
+                return (String) method.invoke(null, flatDesc, engineConfig);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
         sql.append(generateHiveSetStatements(engineConfig));
-        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + 
generateSelectDataStatement(flatDesc) + ";").append("\n");
+        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + 
generateSelectDataStatement(flatDesc) + ";\n");
+
         return sql.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/38fe432d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index bbf3c60..67e811a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,9 +19,12 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang.IncompleteArgumentException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,6 +61,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
+import javax.el.MethodNotFoundException;
+
 public class HiveMRInput implements IMRInput {
 
     public static String getTableNameForHCat(TableDesc table, boolean 
isFullTable) {

Reply via email to