Repository: kylin
Updated Branches:
  refs/heads/master b8acf14f5 -> 76a53da74


KYLIN-2165 cleanup old codes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/76a53da7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/76a53da7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/76a53da7

Branch: refs/heads/master
Commit: 76a53da74536f4b3414bd7f5ef3950b9b489b3e8
Parents: b8acf14
Author: shaofengshi <[email protected]>
Authored: Mon Nov 7 18:20:28 2016 +0800
Committer: shaofengshi <[email protected]>
Committed: Mon Nov 7 18:21:08 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/JoinedFlatTable.java   | 10 +--
 .../apache/kylin/job/JoinedFlatTableTest.java   |  2 +-
 .../kylin/rest/controller/CubeController.java   |  2 +-
 .../source/hive/CreateFlatHiveTableStep.java    | 72 +-------------------
 .../apache/kylin/source/hive/HiveMRInput.java   | 41 ++---------
 .../kylin/source/hive/HiveCmdBuilderTest.java   |  6 +-
 6 files changed, 18 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6c08bc9..b26f50d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -98,14 +98,14 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc 
flatDesc, JobEngineConfig engineConfig, boolean redistribute) {
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc 
flatDesc, JobEngineConfig engineConfig) {
         StringBuilder sql = new StringBuilder();
         sql.append(generateHiveSetStatements(engineConfig));
-        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + 
generateSelectDataStatement(flatDesc, redistribute) + ";").append("\n");
+        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + 
generateSelectDataStatement(flatDesc) + ";").append("\n");
         return sql.toString();
     }
 
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc 
flatDesc, boolean redistribute) {
+    public static String generateSelectDataStatement(IJoinedFlatTableDesc 
flatDesc) {
         StringBuilder sql = new StringBuilder();
         sql.append("SELECT" + "\n");
         for (int i = 0; i < flatDesc.getAllColumns().size(); i++) {
@@ -117,10 +117,6 @@ public class JoinedFlatTable {
         }
         appendJoinStatement(flatDesc, sql);
         appendWhereStatement(flatDesc, sql);
-        if (redistribute == true) {
-            TblColRef distCol = flatDesc.getDistributedBy();
-            appendDistributeStatement(sql, distCol);
-        }
         return sql.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java 
b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 1fe47f8..0faf22a 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -77,7 +77,7 @@ public class JoinedFlatTableTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new 
JobEngineConfig(KylinConfig.getInstanceFromEnv()), true);
+        String sqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new 
JobEngineConfig(KylinConfig.getInstanceFromEnv()));
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 79739c2..5ca7cb5 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -145,7 +145,7 @@ public class CubeController extends BasicController {
     public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable 
String segmentName) {
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
         IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
-        String sql = 
JoinedFlatTable.generateSelectDataStatement(flatTableDesc, false);
+        String sql = 
JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
 
         GeneralResponse repsonse = new GeneralResponse();
         repsonse.setProperty("sql", sql);

http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index bcb9a38..025fd94 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -18,18 +18,12 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BufferedLogger;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -45,42 +39,9 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
     private static final Logger logger = 
LoggerFactory.getLogger(CreateFlatHiveTableStep.class);
     private final BufferedLogger stepLogger = new BufferedLogger(logger);
 
-    private long readRowCountFromFile() throws IOException {
-        Path rowCountFile = new Path(getRowCountOutputDir(), "000000_0");
-
-        FileSystem fs = FileSystem.get(rowCountFile.toUri(), 
HadoopUtil.getCurrentConfiguration());
-        InputStream in = fs.open(rowCountFile);
-        try {
-            String content = IOUtils.toString(in, Charset.defaultCharset());
-            return Long.valueOf(content.trim()); // strip the '\n' character
-
-        } finally {
-            IOUtils.closeQuietly(in);
-        }
-    }
-
-    private int determineNumReducer(KylinConfig config, long rowCount) throws 
IOException {
-        int mapperInputRows = config.getHadoopJobMapperInputRows();
-
-        int numReducers = Math.round(rowCount / ((float) mapperInputRows));
-        numReducers = Math.max(numReducers, 
config.getHadoopJobMinReducerNumber());
-        numReducers = Math.min(numReducers, 
config.getHadoopJobMaxReducerNumber());
-
-        stepLogger.log("total input rows = " + rowCount);
-        stepLogger.log("expected input rows per mapper = " + mapperInputRows);
-        stepLogger.log("reducers for RedistributeFlatHiveTableStep = " + 
numReducers);
-
-        return numReducers;
-    }
-
-    private void createFlatHiveTable(KylinConfig config, int numReducers) 
throws IOException {
+    private void createFlatHiveTable(KylinConfig config) throws IOException {
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
         hiveCmdBuilder.addStatement(getInitStatement());
-        boolean useRedistribute = getUseRedistribute();
-        if (useRedistribute == true) {
-            hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + 
numReducers + ";\n");
-            hiveCmdBuilder.addStatement("set 
hive.merge.mapredfiles=false;\n"); //disable merge
-        }
         hiveCmdBuilder.addStatement(getCreateTableStatement());
         final String cmd = hiveCmdBuilder.toString();
 
@@ -104,21 +65,7 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
         KylinConfig config = getCubeSpecificConfig();
         try {
-
-            boolean useRedistribute = getUseRedistribute();
-
-            int numReducers = 0;
-            if (useRedistribute == true) {
-                long rowCount = readRowCountFromFile();
-                if (!config.isEmptySegmentAllowed() && rowCount == 0) {
-                    stepLogger.log("Detect upstream hive table is empty, " + 
"fail the job because \"kylin.job.allow.empty.segment\" = \"false\"");
-                    return new ExecuteResult(ExecuteResult.State.ERROR, 
stepLogger.getBufferedLog());
-                }
-
-                numReducers = determineNumReducer(config, rowCount);
-            }
-
-            createFlatHiveTable(config, numReducers);
+            createFlatHiveTable(config);
             return new ExecuteResult(ExecuteResult.State.SUCCEED, 
stepLogger.getBufferedLog());
 
         } catch (Exception e) {
@@ -135,14 +82,6 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
         return getParam("HiveInit");
     }
 
-    public void setUseRedistribute(boolean useRedistribute) {
-        setParam("useRedistribute", String.valueOf(useRedistribute));
-    }
-
-    public boolean getUseRedistribute() {
-        return Boolean.valueOf(getParam("useRedistribute"));
-    }
-
     public void setCreateTableStatement(String sql) {
         setParam("HiveRedistributeData", sql);
     }
@@ -151,11 +90,4 @@ public class CreateFlatHiveTableStep extends 
AbstractExecutable {
         return getParam("HiveRedistributeData");
     }
 
-    public void setRowCountOutputDir(String rowCountOutputDir) {
-        setParam("rowCountOutputDir", rowCountOutputDir);
-    }
-
-    public String getRowCountOutputDir() {
-        return getParam("rowCountOutputDir");
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index e0853b0..67ceffc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -128,22 +128,11 @@ public class HiveMRInput implements IMRInput {
             final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final KylinConfig kylinConfig = 
CubeManager.getInstance(conf.getConfig()).getCube(cubeName).getConfig();
 
-            String createFlatTableMethod = 
kylinConfig.getCreateFlatHiveTableMethod();
-            if ("1".equals(createFlatTableMethod)) {
-                // create flat table first, then count and redistribute
-                jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, 
jobFlow.getId(), cubeName, false, ""));
-                if (kylinConfig.isHiveRedistributeEnabled() == true) {
-                    jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, 
flatDesc, jobFlow.getId(), cubeName));
-                }
-            } else if ("2".equals(createFlatTableMethod)) {
-                // count from source table first, and then redistribute, 
suitable for partitioned table
-                final String rowCountOutputDir = 
JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
-                jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, 
jobFlow.getId(), rowCountOutputDir));
-                jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, 
jobFlow.getId(), cubeName, true, rowCountOutputDir));
-            } else {
-                throw new IllegalArgumentException("Unknown value for 
kylin.hive.create.flat.table.method: " + createFlatTableMethod);
+            // create flat table first, then count and redistribute
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, 
jobFlow.getId(), cubeName));
+            if (kylinConfig.isHiveRedistributeEnabled() == true) {
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, 
flatDesc, jobFlow.getId(), cubeName));
             }
-
             AbstractExecutable task = 
createLookupHiveViewMaterializationStep(jobFlow.getId());
             if (task != null) {
                 jobFlow.addTask(task);
@@ -166,22 +155,6 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        public static AbstractExecutable 
createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String rowCountOutputDir) {
-            final ShellExecutable step = new ShellExecutable();
-
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
-            appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
-            
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
-            hiveCmdBuilder.addStatement("set 
hive.exec.compress.output=false;\n");
-            
hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc,
 rowCountOutputDir));
-
-            step.setCmd(hiveCmdBuilder.build());
-            step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE);
-
-            return step;
-        }
-
         public ShellExecutable createLookupHiveViewMaterializationStep(String 
jobId) {
             ShellExecutable step = new ShellExecutable();
             
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
@@ -223,7 +196,7 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName, boolean redistribute, String 
rowCountOutputDir) {
+        public static AbstractExecutable 
createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc 
flatTableDesc, String jobId, String cubeName) {
             StringBuilder hiveInitBuf = new StringBuilder();
             
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
             final KylinConfig kylinConfig = ((CubeSegment) 
flatTableDesc.getSegment()).getConfig();
@@ -231,12 +204,10 @@ public class HiveMRInput implements IMRInput {
             final String useDatabaseHql = "USE " + 
conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
             final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatTableDesc);
             final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatTableDesc, 
JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            String insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute);
+            String insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
 
             CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
-            step.setUseRedistribute(redistribute);
             step.setInitStatement(hiveInitBuf.toString());
-            step.setRowCountOutputDir(rowCountOutputDir);
             step.setCreateTableStatement(useDatabaseHql + dropTableHql + 
createTableHql + insertDataHqls);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);

http://git-wip-us.apache.org/repos/asf/kylin/blob/76a53da7/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
 
b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
index 6aba1ef..da0c082 100644
--- 
a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
+++ 
b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveCmdBuilderTest.java
@@ -68,9 +68,13 @@ public class HiveCmdBuilderTest {
         hiveCmdBuilder.addStatement("SHOW\n TABLES;");
 
         String cmd = hiveCmdBuilder.build();
-        assertTrue(cmd.startsWith("beeline -u jdbc_url -f") && 
cmd.contains(";rm -f"));
+        assertTrue(cmd.startsWith("beeline -u jdbc_url -f"));
 
         String hqlFile = cmd.substring(cmd.lastIndexOf("-f ") + 3).trim();
+        if (hqlFile.endsWith(";")) {
+            hqlFile = hqlFile.substring(0, hqlFile.length() - 1);
+        }
+
         String hqlStatement = FileUtils.readFileToString(new File(hqlFile), 
Charset.defaultCharset());
         assertEquals("USE default;" + lineSeparator + "DROP TABLE test;" + 
lineSeparator + "SHOW\n TABLES;" + lineSeparator, hqlStatement);
 

Reply via email to