http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/pom.xml
----------------------------------------------------------------------
diff --git a/job/pom.xml b/job/pom.xml
deleted file mode 100644
index c62477e..0000000
--- a/job/pom.xml
+++ /dev/null
@@ -1,257 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-     http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.kylin</groupId>
-        <artifactId>kylin</artifactId>
-        <version>1.3-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>kylin-job</artifactId>
-    <name>Kylin:Job</name>
-    <url>http://maven.apache.org</url>
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-            <version>${project.parent.version}</version>
-        </dependency>
-
-        <!--Kylin Jar -->
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-cube</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-invertedindex</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-httpclient</groupId>
-            <artifactId>commons-httpclient</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-daemon</groupId>
-            <artifactId>commons-daemon</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-email</artifactId>
-            <version>1.1</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
-        <!-- Env & Test -->
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-hadoop2-compat</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-server</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.maven</groupId>
-            <artifactId>maven-model</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hive.hcatalog</groupId>
-            <artifactId>hive-hcatalog-core</artifactId>
-            <version>${hive-hcatalog.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-testing-util</artifactId>
-            <version>${hbase-hadoop2.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>2.3</version>
-
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <minimizeJar>false</minimizeJar>
-                            
<shadedArtifactAttached>true</shadedArtifactAttached>
-                            <shadedClassifierName>job</shadedClassifierName>
-                            <filters>
-                                <filter>
-                                    <artifact>*:*</artifact>
-                                    <excludes>
-                                        <exclude>META-INF/*.SF</exclude>
-                                        <exclude>META-INF/*.DSA</exclude>
-                                        <exclude>META-INF/*.RSA</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java 
b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
deleted file mode 100644
index 87c4705..0000000
--- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc;
-
-public abstract class AbstractJobBuilder {
-
-    protected static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    protected JobEngineConfig engineConfig;
-    protected String submitter;
-
-    public AbstractJobBuilder(JobEngineConfig engineConfig) {
-        this.engineConfig = engineConfig;
-    }
-
-    public AbstractJobBuilder setSubmitter(String submitter) {
-        this.submitter = submitter;
-        return this;
-    }
-
-    public String getSubmitter() {
-        return submitter;
-    }
-
-    protected StringBuilder appendExecCmdParameters(StringBuilder cmd, String 
paraName, String paraValue) {
-        return cmd.append(" -").append(paraName).append(" ").append(paraValue);
-    }
-
-    // return in full-qualified name, that is "dbname.tablename"
-    protected String getIntermediateHiveTableName(IJoinedFlatTableDesc 
intermediateTableDesc, String jobUuid) {
-        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() 
+ "." + intermediateTableDesc.getTableName(jobUuid);
-    }
-
-    protected String getIntermediateHiveTableLocation(IJoinedFlatTableDesc 
intermediateTableDesc, String jobUUID) {
-        return getJobWorkingDir(jobUUID) + "/" + 
intermediateTableDesc.getTableName(jobUUID);
-    }
-
-    protected AbstractExecutable 
createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, 
String jobId) {
-
-        final String useDatabaseHql = "USE " + 
engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";";
-        final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId);
-        final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, 
getJobWorkingDir(jobId), jobId);
-        String insertDataHqls;
-        try {
-            insertDataHqls = 
JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobId, 
this.engineConfig);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException("Failed to generate insert data SQL for 
intermediate table.");
-        }
-
-        ShellExecutable step = new ShellExecutable();
-        StringBuffer buf = new StringBuffer();
-        buf.append("hive ");
-        buf.append(" -e \"");
-        buf.append(useDatabaseHql + "\n");
-        buf.append(dropTableHql + "\n");
-        buf.append(createTableHql + "\n");
-        buf.append(insertDataHqls + "\n");
-        buf.append("\"");
-
-        step.setCmd(buf.toString());
-        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-
-        return step;
-    }
-
-    protected String getJobWorkingDir(String uuid) {
-        return engineConfig.getHdfsWorkingDirectory() + JOB_WORKING_DIR_PREFIX 
+ uuid;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java 
b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
deleted file mode 100644
index 0a08709..0000000
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ /dev/null
@@ -1,781 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeDescManager;
-import org.apache.kylin.cube.CubeDescUpgrader;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.v1.CubeInstance;
-import org.apache.kylin.cube.model.v1.CubeSegment;
-import org.apache.kylin.cube.model.v1.CubeSegmentStatusEnum;
-import org.apache.kylin.cube.model.v1.CubeStatusEnum;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.cube.CubingJob;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * This is the utility class to migrate the Kylin metadata format from v1 to 
v2;
- *
- * @author shaoshi
- */
-public class CubeMetadataUpgrade {
-
-    private KylinConfig config = null;
-    private ResourceStore store;
-
-    private List<String> updatedResources = Lists.newArrayList();
-    private List<String> errorMsgs = Lists.newArrayList();
-
-    private static final Log logger = 
LogFactory.getLog(CubeMetadataUpgrade.class);
-
-    public CubeMetadataUpgrade(String newMetadataUrl) {
-        KylinConfig.destoryInstance();
-        System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
-        KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
-
-        config = KylinConfig.getInstanceFromEnv();
-        store = getStore();
-    }
-
-    public void upgrade() {
-
-        upgradeTableDesc();
-        upgradeTableDesceExd();
-        upgradeCubeDesc();
-        upgradeProjectInstance();
-        upgradeCubeInstance();
-        upgradeJobInstance();
-        copyDictionaryForFK();
-        verify();
-
-    }
-
-    public void cleanup() {
-        MetadataManager.getInstance(config).reload();
-        CubeDescManager.getInstance(config);
-        CubeManager cubeManager = CubeManager.getInstance(config);
-
-        List<String> activeResourceList = Lists.newArrayList();
-        for (org.apache.kylin.cube.CubeInstance cube : 
cubeManager.listAllCubes()) {
-            for (org.apache.kylin.cube.CubeSegment segment : 
cube.getSegments()) {
-                activeResourceList.addAll(segment.getSnapshotPaths());
-                activeResourceList.addAll(segment.getDictionaryPaths());
-            }
-        }
-
-        List<String> toDeleteResource = Lists.newArrayList();
-        List<String> activeResource = Lists.newArrayList();
-        try {
-            ArrayList<String> snapshotTables = 
getStore().listResources(ResourceStore.SNAPSHOT_RESOURCE_ROOT);
-
-            for (String snapshotTable : snapshotTables) {
-                ArrayList<String> snapshotNames = 
getStore().listResources(snapshotTable);
-                if (snapshotNames != null)
-                    for (String snapshot : snapshotNames) {
-                        if (!activeResourceList.contains(snapshot)) {
-                            toDeleteResource.add(snapshot);
-
-                        } else {
-                            activeResource.add(snapshot);
-                        }
-                    }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        try {
-            ArrayList<String> dictTables = 
getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT);
-
-            for (String table : dictTables) {
-                ArrayList<String> tableColNames = 
getStore().listResources(table);
-                if (tableColNames != null)
-                    for (String tableCol : tableColNames) {
-                        ArrayList<String> dictionaries = 
getStore().listResources(tableCol);
-                        if (dictionaries != null)
-                            for (String dict : dictionaries)
-                                if (!activeResourceList.contains(dict)) {
-                                    toDeleteResource.add(dict);
-                                } else {
-                                    activeResource.add(dict);
-                                }
-                    }
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        if (toDeleteResource.size() > 0) {
-            logger.info("The following resources is never needed, will be 
dropped, number :" + toDeleteResource.size());
-
-            for (String s : toDeleteResource) {
-                logger.info(s);
-                try {
-                    getStore().deleteResource(s);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-
-    }
-
-    public void verify() {
-        MetadataManager.getInstance(config).reload();
-        CubeDescManager.clearCache();
-        CubeDescManager.getInstance(config);
-        CubeManager.getInstance(config);
-        ProjectManager.getInstance(config);
-        //cleanup();
-
-    }
-
-    private List<String> listResourceStore(String pathRoot) {
-        List<String> paths = null;
-        try {
-            paths = store.collectResourceRecursively(pathRoot, 
MetadataConstants.FILE_SURFIX);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            errorMsgs.add("Get IOException when scan resource store at: " + 
ResourceStore.CUBE_DESC_RESOURCE_ROOT);
-        }
-
-        return paths;
-    }
-
-    private void upgradeCubeDesc() {
-        logger.info("Reloading Cube Metadata from folder " + 
store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
-
-        List<String> paths = 
listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
-        for (String path : paths) {
-
-            try {
-                CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
-                CubeDesc ndesc = upgrade.upgrade();
-                ndesc.setSignature(ndesc.calculateSignature());
-
-                getStore().putResource(ndesc.getModel().getResourcePath(), 
ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
-                getStore().putResource(ndesc.getResourcePath(), ndesc, 
CubeDescManager.CUBE_DESC_SERIALIZER);
-                updatedResources.add(ndesc.getResourcePath());
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + 
e.getLocalizedMessage());
-            }
-        }
-
-    }
-
-    private void upgradeTableDesc() {
-        List<String> paths = 
listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
-        for (String path : paths) {
-            TableDesc t;
-            try {
-                t = store.getResource(path, TableDesc.class, 
MetadataManager.TABLE_SERIALIZER);
-                t.init();
-
-                // if it only has 1 "." in the path, delete the old resource 
if it exists
-                if (path.substring(path.indexOf(".")).length() == 
MetadataConstants.FILE_SURFIX.length()) {
-                    getStore().deleteResource(path);
-                    // the new source will be new;
-                    t.setLastModified(0);
-                    getStore().putResource(t.getResourcePath(), t, 
MetadataManager.TABLE_SERIALIZER);
-                    updatedResources.add(t.getResourcePath());
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + 
e.getLocalizedMessage());
-            }
-
-        }
-
-    }
-
-    @SuppressWarnings("unchecked")
-    private void upgradeTableDesceExd() {
-
-        List<String> paths = 
listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
-        for (String path : paths) {
-            Map<String, String> attrs = Maps.newHashMap();
-
-            InputStream is = null;
-            try {
-                is = store.getResource(path).inputStream;
-                if (is == null) {
-                    continue;
-                }
-                try {
-                    attrs.putAll(JsonUtil.readValue(is, HashMap.class));
-                } finally {
-                    if (is != null)
-                        is.close();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: 
" + e.getLocalizedMessage());
-            }
-
-            // parse table identity from file name
-            String file = path;
-            if (file.indexOf("/") > -1) {
-                file = file.substring(file.lastIndexOf("/") + 1);
-            }
-            String tableIdentity = file.substring(0, file.length() - 
MetadataConstants.FILE_SURFIX.length()).toUpperCase();
-
-            // for metadata upgrade, convert resource path to new pattern 
(<DB>.<TABLE>.json)
-            if (tableIdentity.indexOf(".") < 0) {
-                tableIdentity = appendDBName(tableIdentity);
-                try {
-                    getMetadataManager().saveTableExd(tableIdentity, attrs);
-                    //delete old resoruce if it exists;
-                    getStore().deleteResource(path);
-                    updatedResources.add(path);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    errorMsgs.add("Upgrade TableDescExd at '" + path + "' 
failed: " + e.getLocalizedMessage());
-                }
-
-            }
-
-        }
-
-    }
-
-    public String appendDBName(String table) {
-
-        if (table.indexOf(".") > 0)
-            return table;
-
-        Map<String, TableDesc> map = 
this.getMetadataManager().getAllTablesMap();
-
-        int count = 0;
-        String result = null;
-        for (TableDesc t : map.values()) {
-            if (t.getName().equalsIgnoreCase(table)) {
-                result = t.getIdentity();
-                count++;
-            }
-        }
-
-        if (count == 1)
-            return result;
-
-        if (count > 1) {
-            errorMsgs.add("There are more than 1 table named with '" + table + 
"' in different database; ");
-        }
-
-        if (count == 0) {
-            errorMsgs.add("No table definition for '" + table + "'; any 
project, cube refers it should remove the reference;");
-        }
-
-        return null;
-    }
-
-    private void upgradeProjectInstance() {
-        List<String> paths = 
listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
-        for (String path : paths) {
-            try {
-                org.apache.kylin.cube.model.v1.ProjectInstance oldPrj = 
store.getResource(path, org.apache.kylin.cube.model.v1.ProjectInstance.class, 
new 
JsonSerializer<org.apache.kylin.cube.model.v1.ProjectInstance>(org.apache.kylin.cube.model.v1.ProjectInstance.class));
-
-                ProjectInstance newPrj = new ProjectInstance();
-                newPrj.setUuid(oldPrj.getUuid());
-                newPrj.setName(oldPrj.getName());
-                newPrj.setOwner(oldPrj.getOwner());
-                newPrj.setDescription(oldPrj.getDescription());
-                newPrj.setLastModified(oldPrj.getLastModified());
-                
newPrj.setCreateTimeUTC(RootPersistentEntity.parseTime(oldPrj.getCreateTime()));
-                newPrj.setStatus(oldPrj.getStatus());
-                List<RealizationEntry> realizationEntries = 
Lists.newArrayList();
-                for (String cube : oldPrj.getCubes()) {
-                    RealizationEntry entry = new RealizationEntry();
-                    entry.setType(RealizationType.CUBE);
-                    entry.setRealization(cube);
-                    realizationEntries.add(entry);
-                }
-                newPrj.setRealizationEntries(realizationEntries);
-
-                Set<String> tables = Sets.newHashSet();
-                for (String table : oldPrj.getTables()) {
-                    String tb = this.appendDBName(table);
-                    if (tb != null)
-                        tables.add(this.appendDBName(tb));
-                }
-                newPrj.setTables(tables);
-
-                store.putResource(newPrj.getResourcePath(), newPrj, 
ProjectManager.PROJECT_SERIALIZER);
-                updatedResources.add(path);
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade Project at '" + path + "' failed: " + 
e.getLocalizedMessage());
-            }
-        }
-
-    }
-
-    private void upgradeCubeInstance() {
-
-        ResourceStore store = getStore();
-        List<String> paths = 
listResourceStore(ResourceStore.CUBE_RESOURCE_ROOT);
-        for (String path : paths) {
-
-            CubeInstance cubeInstance = null;
-            try {
-                cubeInstance = store.getResource(path, CubeInstance.class, new 
JsonSerializer<CubeInstance>(CubeInstance.class));
-                cubeInstance.setConfig(config);
-
-                org.apache.kylin.cube.CubeInstance newInstance = new 
org.apache.kylin.cube.CubeInstance();
-                newInstance.setName(cubeInstance.getName());
-                newInstance.setDescName(cubeInstance.getDescName());
-                newInstance.setOwner(cubeInstance.getOwner());
-                newInstance.setUuid(cubeInstance.getUuid());
-                newInstance.setVersion(cubeInstance.getVersion());
-                
newInstance.setCreateTimeUTC(RootPersistentEntity.parseTime(cubeInstance.getCreateTime()));
-                newInstance.setLastModified(cubeInstance.getLastModified());
-
-                //status
-                if (cubeInstance.getStatus() == CubeStatusEnum.BUILDING) {
-                    newInstance.setStatus(RealizationStatusEnum.BUILDING);
-                } else if (cubeInstance.getStatus() == 
CubeStatusEnum.DESCBROKEN) {
-                    newInstance.setStatus(RealizationStatusEnum.DESCBROKEN);
-                } else if (cubeInstance.getStatus() == 
CubeStatusEnum.DISABLED) {
-                    newInstance.setStatus(RealizationStatusEnum.DISABLED);
-                } else if (cubeInstance.getStatus() == CubeStatusEnum.READY) {
-                    newInstance.setStatus(RealizationStatusEnum.READY);
-                }
-
-                List<org.apache.kylin.cube.CubeSegment> newSegments = 
Lists.newArrayList();
-                // segment
-                for (CubeSegment segment : cubeInstance.getSegments()) {
-                    org.apache.kylin.cube.CubeSegment newSeg = new 
org.apache.kylin.cube.CubeSegment();
-                    newSegments.add(newSeg);
-
-                    newSeg.setUuid(segment.getUuid());
-                    newSeg.setName(segment.getName());
-                    
newSeg.setStorageLocationIdentifier(segment.getStorageLocationIdentifier());
-                    newSeg.setDateRangeStart(segment.getDateRangeStart());
-                    newSeg.setDateRangeEnd(segment.getDateRangeEnd());
-
-                    if (segment.getStatus() == CubeSegmentStatusEnum.NEW) {
-                        newSeg.setStatus(SegmentStatusEnum.NEW);
-                    } else if (segment.getStatus() == 
CubeSegmentStatusEnum.READY) {
-                        newSeg.setStatus(SegmentStatusEnum.READY);
-                    } else if (segment.getStatus() == 
CubeSegmentStatusEnum.READY_PENDING) {
-                        newSeg.setStatus(SegmentStatusEnum.READY_PENDING);
-                    }
-
-                    newSeg.setSizeKB(segment.getSizeKB());
-                    newSeg.setInputRecords(segment.getSourceRecords());
-                    newSeg.setInputRecordsSize(segment.getSourceRecordsSize());
-                    newSeg.setLastBuildTime(segment.getLastBuildTime());
-                    newSeg.setLastBuildJobID(segment.getLastBuildJobID());
-                    
newSeg.setCreateTimeUTC(RootPersistentEntity.parseTime(segment.getCreateTime()));
-                    newSeg.setBinarySignature(segment.getBinarySignature());
-
-                    ConcurrentHashMap<String, String> newDictionaries = new 
ConcurrentHashMap<String, String>();
-
-                    for (Map.Entry<String, String> e : 
segment.getDictionaries().entrySet()) {
-                        String key = e.getKey();
-                        String[] tableCol = StringUtils.split(key, "/");
-                        key = appendDBName(tableCol[0]) + "/" + tableCol[1];
-                        newDictionaries.put(key, e.getValue());
-                    }
-                    newSeg.setDictionaries(newDictionaries);
-
-                    ConcurrentHashMap<String, String> newSnapshots = new 
ConcurrentHashMap<String, String>();
-
-                    for (Map.Entry<String, String> e : 
segment.getSnapshots().entrySet()) {
-                        newSnapshots.put(appendDBName(e.getKey()), 
e.getValue());
-                    }
-                    newSeg.setSnapshots(newSnapshots);
-                }
-
-                newInstance.setSegments(newSegments);
-                store.putResource(newInstance.getResourcePath(), newInstance, 
CubeManager.CUBE_SERIALIZER);
-            } catch (Exception e) {
-                logger.error("Error during load cube instance " + path, e);
-            }
-        }
-    }
-
-    private void copyDictionaryForFK() {
-        CubeManager cubeManager = CubeManager.getInstance(config);
-        List<org.apache.kylin.cube.CubeInstance> cubeInstances = 
cubeManager.listAllCubes();
-
-        Set<String> changedCubes = Sets.newHashSet();
-        for (org.apache.kylin.cube.CubeInstance newInstance : cubeInstances) {
-
-            boolean updated = false;
-            DataModelDesc dataModelDesc = null;
-            try {
-                String modelName = 
this.getCubeDescManager().getCubeDesc(newInstance.getDescName()).getModelName();
-                dataModelDesc = 
this.getMetadataManager().getDataModelDesc(modelName);
-                Map<String, String> pkToFK = Maps.newHashMap();
-                for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
-                    if (lookupDesc.getJoin() != null) {
-                        JoinDesc join = lookupDesc.getJoin();
-                        for (int i = 0; i < join.getForeignKey().length; i++) {
-                            pkToFK.put(lookupDesc.getTable() + "/" + 
join.getPrimaryKey()[i], dataModelDesc.getFactTable() + "/" + 
join.getForeignKey()[i]);
-                        }
-                    }
-                }
-
-                List<Pair<String, String>> newDictionaries = 
Lists.newArrayList();
-
-                // segment
-                for (org.apache.kylin.cube.CubeSegment newSeg : 
newInstance.getSegments()) {
-
-                    for (Map.Entry<String, String> e : 
newSeg.getDictionaries().entrySet()) {
-                        String key = e.getKey();
-                        if (pkToFK.containsKey(key) && 
!newSeg.getDictionaries().containsKey(pkToFK.get(key))) {
-                            logger.debug("Duplicate dictionary for FK " + 
pkToFK.get(key) + " in cube " + newInstance.getName());
-                            changedCubes.add(newInstance.getName());
-                            newDictionaries.add(new Pair<String, 
String>(pkToFK.get(key), e.getValue()));
-
-                        }
-                    }
-                    for (Pair<String, String> dict : newDictionaries) {
-                        newSeg.getDictionaries().put(dict.getFirst(), 
dict.getSecond());
-                        updated = true;
-                    }
-                }
-
-                if (updated)
-                    store.putResource(newInstance.getResourcePath(), 
newInstance, CubeManager.CUBE_SERIALIZER);
-            } catch (Exception e) {
-                logger.error("Error during upgrade cube instance " + 
newInstance.getName(), e);
-            }
-        }
-
-        if (changedCubes.size() > 0)
-            logger.info("Updated these cubeInstances: " + changedCubes);
-    }
-
-    private MetadataManager getMetadataManager() {
-        return MetadataManager.getInstance(config);
-    }
-
-    private CubeDescManager getCubeDescManager() {
-        return CubeDescManager.getInstance(config);
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(config);
-    }
-
-    private ExecutableManager getExecutableManager() {
-        return ExecutableManager.getInstance(config);
-    }
-
-    private void upgradeJobInstance() {
-        try {
-            List<String> paths = 
getStore().collectResourceRecursively(ResourceStore.JOB_PATH_ROOT, "");
-            for (String path : paths) {
-                upgradeJobInstance(path);
-            }
-
-            for (String folder : new String[] { ResourceStore.JOB_PATH_ROOT, 
ResourceStore.JOB_OUTPUT_PATH_ROOT }) {
-                for (String res : getStore().listResources(folder)) {
-                    getStore().deleteResource(res);
-                }
-                getStore().deleteResource(folder);
-            }
-        } catch (IOException ex) {
-            errorMsgs.add("upgrade job failed" + ex.getLocalizedMessage());
-            throw new RuntimeException(ex);
-        }
-
-    }
-
-    private ExecutableState parseState(JobStatusEnum state) {
-        switch (state) {
-        case NEW:
-        case PENDING:
-            return ExecutableState.READY;
-        case RUNNING:
-            return ExecutableState.RUNNING;
-        case FINISHED:
-            return ExecutableState.SUCCEED;
-        case ERROR:
-            return ExecutableState.ERROR;
-        case DISCARDED:
-            return ExecutableState.DISCARDED;
-        default:
-            return ExecutableState.DISCARDED;
-        }
-    }
-
-    private ExecutableState parseState(JobStepStatusEnum state) {
-        switch (state) {
-        case NEW:
-        case PENDING:
-        case WAITING:
-            return ExecutableState.READY;
-        case RUNNING:
-            return ExecutableState.RUNNING;
-        case FINISHED:
-            return ExecutableState.SUCCEED;
-        case ERROR:
-            return ExecutableState.ERROR;
-        case DISCARDED:
-            return ExecutableState.DISCARDED;
-        default:
-            return ExecutableState.DISCARDED;
-        }
-
-    }
-
-    private void upgradeJobInstance(String path) throws IOException {
-        JobInstance job = getStore().getResource(path, JobInstance.class, new 
JsonSerializer<JobInstance>(JobInstance.class));
-        long lastModified = job.getLastModified();
-        if (System.currentTimeMillis() - lastModified > 2592000000l) {
-            // old than 30 days, skip;
-            return;
-        }
-        CubingJob cubingJob = new CubingJob();
-        cubingJob.setId(job.getId());
-        cubingJob.setName(job.getName());
-        cubingJob.setCubeName(job.getRelatedCube());
-        cubingJob.setSubmitter(job.getSubmitter());
-        for (JobInstance.JobStep step : job.getSteps()) {
-            final AbstractExecutable executable = parseToExecutable(step);
-            cubingJob.addTask(executable);
-        }
-        getExecutableManager().addJob(cubingJob);
-
-        cubingJob.setStartTime(job.getExecStartTime());
-        cubingJob.setEndTime(job.getExecEndTime());
-        cubingJob.setMapReduceWaitTime(job.getMrWaiting());
-        getExecutableManager().resetJobOutput(cubingJob.getId(), 
parseState(job.getStatus()), job.getStatus().toString());
-
-        for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
-            final JobInstance.JobStep jobStep = job.getSteps().get(i);
-            final String outputPath = ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" 
+ job.getId() + "." + i;
-            final InputStream inputStream = 
getStore().getResource(outputPath).inputStream;
-
-            String output = null;
-            if (inputStream != null) {
-                @SuppressWarnings("unchecked")
-                HashMap<String, String> job_output = 
JsonUtil.readValue(inputStream, HashMap.class);
-
-                if (job_output != null) {
-                    output = job_output.get("output");
-                }
-                org.apache.commons.io.IOUtils.closeQuietly(inputStream);
-            }
-            updateJobStepOutput(jobStep, output, cubingJob.getTasks().get(i));
-        }
-    }
-
-    private void updateJobStepOutput(JobInstance.JobStep step, String output, 
AbstractExecutable task) {
-        task.setStartTime(step.getExecStartTime());
-        task.setEndTime(step.getExecEndTime());
-        if (task instanceof MapReduceExecutable) {
-            ((MapReduceExecutable) 
task).setMapReduceWaitTime(step.getExecWaitTime() * 1000);
-        }
-        getExecutableManager().resetJobOutput(task.getId(), 
parseState(step.getStatus()), output);
-    }
-
-    private AbstractExecutable parseToExecutable(JobInstance.JobStep step) {
-        AbstractExecutable result;
-        switch (step.getCmdType()) {
-        case SHELL_CMD_HADOOP: {
-            ShellExecutable executable = new ShellExecutable();
-            executable.setCmd(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_FACTDISTINCT: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(FactDistinctColumnsJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_BASECUBOID: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(BaseCuboidJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_NDCUBOID: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(NDCuboidJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(RangeKeyDistributionJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_CONVERTHFILE: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(CubeHFileJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_MERGECUBOID: {
-            MapReduceExecutable executable = new MapReduceExecutable();
-            executable.setMapReduceJobClass(MergeCuboidJob.class);
-            executable.setMapReduceParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_NO_MR_DICTIONARY: {
-            HadoopShellExecutable executable = new HadoopShellExecutable();
-            executable.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-            executable.setJobClass(CreateDictionaryJob.class);
-            executable.setJobParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE: {
-            HadoopShellExecutable executable = new HadoopShellExecutable();
-            executable.setJobClass(CreateHTableJob.class);
-            executable.setJobParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        case JAVA_CMD_HADOOP_NO_MR_BULKLOAD: {
-            HadoopShellExecutable executable = new HadoopShellExecutable();
-            executable.setJobClass(BulkLoadJob.class);
-            executable.setJobParams(step.getExecCmd());
-            result = executable;
-            break;
-        }
-        default:
-            throw new RuntimeException("invalid step type:" + 
step.getCmdType());
-        }
-        result.setName(step.getName());
-        return result;
-    }
-
-    public static void main(String[] args) {
-
-        if (!(args != null && (args.length == 1 || args.length == 2))) {
-            System.out.println("Usage: java CubeMetadataUpgrade 
<metadata_export_folder> <verify>; e.g, /export/kylin/meta ");
-            return;
-        }
-
-        String exportFolder = args[0];
-        boolean verify = false;
-        if (args.length == 2 && "verify".equals(args[1])) {
-            System.out.println("Only verify the metadata in folder " + 
exportFolder);
-            verify = true;
-        }
-
-        CubeMetadataUpgrade instance = null;
-        if (verify) {
-            instance = new CubeMetadataUpgrade(exportFolder);
-            instance.verify();
-            instance.copyDictionaryForFK();
-        } else {
-            File oldMetaFolder = new File(exportFolder);
-            if (!oldMetaFolder.exists()) {
-                System.out.println("Provided folder doesn't exist: '" + 
exportFolder + "'");
-                return;
-            }
-
-            if (!oldMetaFolder.isDirectory()) {
-                System.out.println("Provided folder is not a directory: '" + 
exportFolder + "'");
-                return;
-            }
-
-            String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
-            try {
-                FileUtils.deleteDirectory(new File(newMetadataUrl));
-                FileUtils.copyDirectory(oldMetaFolder, new 
File(newMetadataUrl));
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            instance = new CubeMetadataUpgrade(newMetadataUrl);
-            instance.upgrade();
-            
logger.info("=================================================================");
-            logger.info("Run CubeMetadataUpgrade completed;");
-
-        }
-
-        
logger.info("=================================================================");
-        if (instance.errorMsgs.size() > 0) {
-            logger.info("Here are the error/warning messages, you may need 
check:");
-            for (String s : instance.errorMsgs) {
-                logger.warn(s);
-            }
-        } else {
-            logger.info("No error or warning messages; The migration is 
success.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JobInstance.java 
b/job/src/main/java/org/apache/kylin/job/JobInstance.java
deleted file mode 100644
index 82d4753..0000000
--- a/job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
-public class JobInstance extends RootPersistentEntity implements 
Comparable<JobInstance> {
-
-    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-
-    public static String getStepIdentity(JobInstance jobInstance, JobStep 
jobStep) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + 
"." + jobStep.getSequenceID();
-    }
-
-    public static String getJobIdentity(JobInstance jobInstance) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
-    }
-
-    public static String getJobWorkingDir(JobInstance jobInstance, 
JobEngineConfig engineConfig) {
-        return getJobWorkingDir(jobInstance.getUuid(), 
engineConfig.getHdfsWorkingDirectory());
-    }
-
-    public static String getJobWorkingDir(String jobUuid, String 
hdfsWorkdingDir) {
-        if (jobUuid == null || jobUuid.equals("")) {
-            throw new IllegalArgumentException("jobUuid can't be null or 
empty");
-        }
-        return hdfsWorkdingDir + JOB_WORKING_DIR_PREFIX + jobUuid;
-    }
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("type")
-    private CubeBuildTypeEnum type; // java implementation
-    @JsonProperty("duration")
-    private long duration;
-    @JsonProperty("related_cube")
-    private String relatedCube;
-    @JsonProperty("related_segment")
-    private String relatedSegment;
-    @JsonProperty("exec_start_time")
-    private long execStartTime;
-    @JsonProperty("exec_end_time")
-    private long execEndTime;
-    @JsonProperty("mr_waiting")
-    private long mrWaiting = 0;
-    @JsonManagedReference
-    @JsonProperty("steps")
-    private List<JobStep> steps;
-    @JsonProperty("submitter")
-    private String submitter;
-    @JsonProperty("job_status")
-    private JobStatusEnum status;
-
-    public JobStep getRunningStep() {
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || 
step.getStatus().equals(JobStepStatusEnum.WAITING)) {
-                return step;
-            }
-        }
-
-        return null;
-    }
-
-    @JsonProperty("progress")
-    public double getProgress() {
-        int completedStepCount = 0;
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
-                completedStepCount++;
-            }
-        }
-
-        return 100.0 * completedStepCount / steps.size();
-    }
-
-    public JobStatusEnum getStatus() {
-        return this.status;
-    }
-
-    public void setStatus(JobStatusEnum status) {
-        this.status = status;
-    }
-
-    //    @JsonProperty("job_status")
-    //    public JobStatusEnum getStatus() {
-    //
-    //        // JobStatusEnum finalJobStatus;
-    //        int compositResult = 0;
-    //
-    //        // if steps status are all NEW, then job status is NEW
-    //        // if steps status are all FINISHED, then job status is FINISHED
-    //        // if steps status are all PENDING, then job status is PENDING
-    //        // if steps status are FINISHED and PENDING, the job status is 
PENDING
-    //        // if one of steps status is RUNNING, then job status is RUNNING
-    //        // if one of steps status is ERROR, then job status is ERROR
-    //        // if one of steps status is KILLED, then job status is KILLED
-    //        // default status is RUNNING
-    //
-    //        System.out.println(this.getName());
-    //
-    //        for (JobStep step : this.getSteps()) {
-    //            //System.out.println("step: " + step.getSequenceID() + "'s 
status:" + step.getStatus());
-    //            compositResult = compositResult | step.getStatus().getCode();
-    //        }
-    //
-    //        System.out.println();
-    //
-    //        if (compositResult == JobStatusEnum.FINISHED.getCode()) {
-    //            return JobStatusEnum.FINISHED;
-    //        } else if (compositResult == JobStatusEnum.NEW.getCode()) {
-    //            return JobStatusEnum.NEW;
-    //        } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
-    //            return JobStatusEnum.PENDING;
-    //        } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | 
JobStatusEnum.PENDING.getCode())) {
-    //            return JobStatusEnum.PENDING;
-    //        } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == 
JobStatusEnum.ERROR.getCode()) {
-    //            return JobStatusEnum.ERROR;
-    //        } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) 
== JobStatusEnum.DISCARDED.getCode()) {
-    //            return JobStatusEnum.DISCARDED;
-    //        } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == 
JobStatusEnum.RUNNING.getCode()) {
-    //            return JobStatusEnum.RUNNING;
-    //        }
-    //
-    //        return JobStatusEnum.RUNNING;
-    //    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public CubeBuildTypeEnum getType() {
-        return type;
-    }
-
-    public void setType(CubeBuildTypeEnum type) {
-        this.type = type;
-    }
-
-    public long getDuration() {
-        return duration;
-    }
-
-    public void setDuration(long duration) {
-        this.duration = duration;
-    }
-
-    public String getRelatedCube() {
-        return relatedCube;
-    }
-
-    public void setRelatedCube(String relatedCube) {
-        this.relatedCube = relatedCube;
-    }
-
-    public String getRelatedSegment() {
-        return relatedSegment;
-    }
-
-    public void setRelatedSegment(String relatedSegment) {
-        this.relatedSegment = relatedSegment;
-    }
-
-    /**
-     * @return the execStartTime
-     */
-    public long getExecStartTime() {
-        return execStartTime;
-    }
-
-    /**
-     * @param execStartTime the execStartTime to set
-     */
-    public void setExecStartTime(long execStartTime) {
-        this.execStartTime = execStartTime;
-    }
-
-    /**
-     * @return the execEndTime
-     */
-    public long getExecEndTime() {
-        return execEndTime;
-    }
-
-    /**
-     * @param execEndTime the execEndTime to set
-     */
-    public void setExecEndTime(long execEndTime) {
-        this.execEndTime = execEndTime;
-    }
-
-    public long getMrWaiting() {
-        return this.mrWaiting;
-    }
-
-    public void setMrWaiting(long mrWaiting) {
-        this.mrWaiting = mrWaiting;
-    }
-
-    public List<JobStep> getSteps() {
-        if (steps == null) {
-            steps = Lists.newArrayList();
-        }
-        return steps;
-    }
-
-    public void clearSteps() {
-        getSteps().clear();
-    }
-
-    public void addSteps(Collection<JobStep> steps) {
-        this.getSteps().addAll(steps);
-    }
-
-    public void addStep(JobStep step) {
-        getSteps().add(step);
-    }
-
-    public void addStep(int index, JobStep step) {
-        getSteps().add(index, step);
-    }
-
-    public JobStep findStep(String stepName) {
-        for (JobStep step : getSteps()) {
-            if (stepName.equals(step.getName())) {
-                return step;
-            }
-        }
-        return null;
-    }
-
-    public String getSubmitter() {
-        return submitter;
-    }
-
-    public void setSubmitter(String submitter) {
-        this.submitter = submitter;
-    }
-
-    @JsonIgnoreProperties(ignoreUnknown = true)
-    public static class JobStep implements Comparable<JobStep> {
-
-        @JsonBackReference
-        private JobInstance jobInstance;
-
-        @JsonProperty("id")
-        private String id;
-
-        @JsonProperty("name")
-        private String name;
-
-        @JsonProperty("sequence_id")
-        private int sequenceID;
-
-        @JsonProperty("exec_cmd")
-        private String execCmd;
-
-        @JsonProperty("interrupt_cmd")
-        private String InterruptCmd;
-
-        @JsonProperty("exec_start_time")
-        private long execStartTime;
-        @JsonProperty("exec_end_time")
-        private long execEndTime;
-        @JsonProperty("exec_wait_time")
-        private long execWaitTime;
-
-        @JsonProperty("step_status")
-        private JobStepStatusEnum status;
-
-        @JsonProperty("cmd_type")
-        private JobStepCmdTypeEnum cmdType = 
JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
-
-        @JsonProperty("info")
-        private ConcurrentHashMap<String, String> info = new 
ConcurrentHashMap<String, String>();
-
-        @JsonProperty("run_async")
-        private boolean runAsync = false;
-
-        private ConcurrentHashMap<String, String> getInfo() {
-            return info;
-        }
-
-        public void putInfo(String key, String value) {
-            getInfo().put(key, value);
-        }
-
-        public String getInfo(String key) {
-            return getInfo().get(key);
-        }
-
-        public void clearInfo() {
-            getInfo().clear();
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public int getSequenceID() {
-            return sequenceID;
-        }
-
-        public void setSequenceID(int sequenceID) {
-            this.sequenceID = sequenceID;
-        }
-
-        public String getExecCmd() {
-            return execCmd;
-        }
-
-        public void setExecCmd(String execCmd) {
-            this.execCmd = execCmd;
-        }
-
-        public JobStepStatusEnum getStatus() {
-            return status;
-        }
-
-        public void setStatus(JobStepStatusEnum status) {
-            this.status = status;
-        }
-
-        public String getId() {
-            return id;
-        }
-
-        public void setId(String id) {
-            this.id = id;
-        }
-
-        /**
-         * @return the execStartTime
-         */
-        public long getExecStartTime() {
-            return execStartTime;
-        }
-
-        /**
-         * @param execStartTime the execStartTime to set
-         */
-        public void setExecStartTime(long execStartTime) {
-            this.execStartTime = execStartTime;
-        }
-
-        /**
-         * @return the execEndTime
-         */
-        public long getExecEndTime() {
-            return execEndTime;
-        }
-
-        /**
-         * @param execEndTime the execEndTime to set
-         */
-        public void setExecEndTime(long execEndTime) {
-            this.execEndTime = execEndTime;
-        }
-
-        public long getExecWaitTime() {
-            return execWaitTime;
-        }
-
-        public void setExecWaitTime(long execWaitTime) {
-            this.execWaitTime = execWaitTime;
-        }
-
-        public String getInterruptCmd() {
-            return InterruptCmd;
-        }
-
-        public void setInterruptCmd(String interruptCmd) {
-            InterruptCmd = interruptCmd;
-        }
-
-        public JobStepCmdTypeEnum getCmdType() {
-            return cmdType;
-        }
-
-        public void setCmdType(JobStepCmdTypeEnum cmdType) {
-            this.cmdType = cmdType;
-        }
-
-        /**
-         * @return the runAsync
-         */
-        public boolean isRunAsync() {
-            return runAsync;
-        }
-
-        /**
-         * @param runAsync the runAsync to set
-         */
-        public void setRunAsync(boolean runAsync) {
-            this.runAsync = runAsync;
-        }
-
-        /**
-         * @return the jobInstance
-         */
-        public JobInstance getJobInstance() {
-            return jobInstance;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((name == null) ? 0 : name.hashCode());
-            result = prime * result + sequenceID;
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            JobStep other = (JobStep) obj;
-            if (name == null) {
-                if (other.name != null)
-                    return false;
-            } else if (!name.equals(other.name))
-                return false;
-            if (sequenceID != other.sequenceID)
-                return false;
-            return true;
-        }
-
-        @Override
-        public int compareTo(JobStep o) {
-            if (this.sequenceID < o.sequenceID) {
-                return -1;
-            } else if (this.sequenceID > o.sequenceID) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    }
-
-    @Override
-    public int compareTo(JobInstance o) {
-        return o.lastModified < this.lastModified ? -1 : o.lastModified > 
this.lastModified ? 1 : 0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
deleted file mode 100644
index eb6d27b..0000000
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc;
-import org.apache.kylin.job.hadoop.hive.IntermediateColumnDesc;
-import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class JoinedFlatTable {
-
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String storageDfsDir, String jobUUID) {
-        StringBuilder ddl = new StringBuilder();
-
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + 
intermediateTableDesc.getTableName(jobUUID) + "\n");
-
-        ddl.append("(" + "\n");
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) 
{
-            IntermediateColumnDesc col = 
intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(colName(col.getCanonicalName()) + " " + 
SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n");
-        }
-        ddl.append(")" + "\n");
-
-        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
-        ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + 
intermediateTableDesc.getTableName(jobUUID) + "';").append("\n");
-        // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
-        // ";\n");
-        return ddl.toString();
-    }
-
-    public static String generateDropTableStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String jobUUID) {
-        StringBuilder ddl = new StringBuilder();
-        ddl.append("DROP TABLE IF EXISTS " + 
intermediateTableDesc.getTableName(jobUUID) + ";").append("\n");
-        return ddl.toString();
-    }
-
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws 
IOException {
-        StringBuilder sql = new StringBuilder();
-
-        File hadoopPropertiesFile = new 
File(engineConfig.getHadoopJobConfFilePath(intermediateTableDesc.getCapacity()));
-
-        if (hadoopPropertiesFile.exists()) {
-            DocumentBuilderFactory factory = 
DocumentBuilderFactory.newInstance();
-            DocumentBuilder builder;
-            Document doc;
-            try {
-                builder = factory.newDocumentBuilder();
-                doc = builder.parse(hadoopPropertiesFile);
-                NodeList nl = doc.getElementsByTagName("property");
-                for (int i = 0; i < nl.getLength(); i++) {
-                    String name = 
doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
-                    String value = 
doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
-                    if (name.equals("tmpjars") == false) {
-                        sql.append("SET " + name + "=" + value + 
";").append("\n");
-                    }
-                }
-
-            } catch (ParserConfigurationException e) {
-                throw new IOException(e);
-            } catch (SAXException e) {
-                throw new IOException(e);
-            }
-        }
-
-        // hard coded below mr parameters to enable map-side join
-        sql.append("SET hive.exec.compress.output=true;").append("\n");
-        sql.append("SET hive.auto.convert.join.noconditionaltask = 
true;").append("\n");
-        sql.append("SET hive.auto.convert.join.noconditionaltask.size = 
300000000;").append("\n");
-        sql.append("INSERT OVERWRITE TABLE " + 
intermediateTableDesc.getTableName(jobUUID) + " " + 
generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
-
-        return sql.toString();
-    }
-
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc) {
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + "\n");
-        String tableAlias;
-        Map<String, String> tableAliasMap = 
buildTableAliasMap(intermediateTableDesc.getDataModel());
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) 
{
-            IntermediateColumnDesc col = 
intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                sql.append(",");
-            }
-            tableAlias = tableAliasMap.get(col.getTableName());
-            sql.append(tableAlias + "." + col.getColumnName() + "\n");
-        }
-        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
-        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
-        return sql.toString();
-    }
-
-    private static Map<String, String> buildTableAliasMap(DataModelDesc 
dataModelDesc) {
-        Map<String, String> tableAliasMap = new HashMap<String, String>();
-
-        addTableAlias(dataModelDesc.getFactTable(), tableAliasMap);
-
-        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null) {
-                addTableAlias(lookupDesc.getTable(), tableAliasMap);
-            }
-        }
-        return tableAliasMap;
-    }
-
-    // The table alias used to be "FACT_TABLE" and "LOOKUP_#", but that's too 
unpredictable
-    // for those who want to write a filter. (KYLIN-900)
-    // Also yet don't support joining the same table more than once, since 
table name is the map key.
-    private static void addTableAlias(String table, Map<String, String> 
tableAliasMap) {
-        String alias;
-        int cut = table.lastIndexOf('.');
-        if (cut < 0)
-            alias = table;
-        else
-            alias = table.substring(cut + 1);
-        
-        tableAliasMap.put(table, alias);
-    }
-
-    private static void appendJoinStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        Set<String> dimTableCache = new HashSet<String>();
-
-        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
-        String factTableName = dataModelDesc.getFactTable();
-        String factTableAlias = tableAliasMap.get(factTableName);
-        sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
-
-        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
-                String dimTableName = lookupDesc.getTable();
-                if (!dimTableCache.contains(dimTableName)) {
-                    TblColRef[] pk = join.getPrimaryKeyColumns();
-                    TblColRef[] fk = join.getForeignKeyColumns();
-                    if (pk.length != fk.length) {
-                        throw new RuntimeException("Invalid join condition of 
lookup table:" + lookupDesc);
-                    }
-                    sql.append(joinType + " JOIN " + dimTableName + " as " + 
tableAliasMap.get(dimTableName) + "\n");
-                    sql.append("ON ");
-                    for (int i = 0; i < pk.length; i++) {
-                        if (i > 0) {
-                            sql.append(" AND ");
-                        }
-                        sql.append(factTableAlias + "." + fk[i].getName() + " 
= " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
-                    }
-                    sql.append("\n");
-
-                    dimTableCache.add(dimTableName);
-                }
-            }
-        }
-    }
-
-    private static void appendWhereStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
-            return;//TODO: for now only cube segments support filter and 
partition
-        }
-        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) 
intermediateTableDesc;
-
-        boolean hasCondition = false;
-        StringBuilder whereBuilder = new StringBuilder();
-        whereBuilder.append("WHERE");
-
-        CubeDesc cubeDesc = desc.getCubeDesc();
-        DataModelDesc model = cubeDesc.getModel();
-
-        if (model.getFilterCondition() != null && 
model.getFilterCondition().equals("") == false) {
-            whereBuilder.append(" 
(").append(model.getFilterCondition()).append(") ");
-            hasCondition = true;
-        }
-
-        CubeSegment cubeSegment = desc.getCubeSegment();
-
-        if (null != cubeSegment) {
-            PartitionDesc partDesc = model.getPartitionDesc();
-            long dateStart = cubeSegment.getDateRangeStart();
-            long dateEnd = cubeSegment.getDateRangeEnd();
-
-            if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
-                whereBuilder.append(hasCondition ? " AND (" : " (");
-                
whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc,
 dateStart, dateEnd, tableAliasMap));
-                whereBuilder.append(")\n");
-                hasCondition = true;
-            }
-        }
-
-        if (hasCondition) {
-            sql.append(whereBuilder.toString());
-        }
-    }
-
-    private static String colName(String canonicalColName) {
-        return canonicalColName.replace(".", "_");
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java 
b/job/src/main/java/org/apache/kylin/job/Scheduler.java
deleted file mode 100644
index cc2dff9..0000000
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job;
-
-import org.apache.kylin.common.lock.JobLock;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.Executable;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public interface Scheduler<T extends Executable> {
-
-    void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws 
SchedulerException;
-
-    void shutdown() throws SchedulerException;
-
-    boolean stop(T executable) throws SchedulerException;
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java 
b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 27b8e87..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-/**
- * Created by qianzhou on 12/4/14.
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
-    @Override
-    public void log(String message) {
-        this.appendOutput(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java 
b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 7dfdb13..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public interface ICommandOutput extends Logger {
-
-    public void setStatus(JobStepStatusEnum status);
-
-    public JobStepStatusEnum getStatus();
-
-    public void appendOutput(String message);
-
-    public String getOutput();
-
-    public void setExitCode(int exitCode);
-
-    public int getExitCode();
-
-    public void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java 
b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 4aaf4b8..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- * 
- */
-public interface IJobCommand {
-
-    public ICommandOutput execute() throws JobException;
-
-    public void cancel() throws JobException;
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java 
b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index 9ad35cb..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmd implements IJobCommand {
-
-    private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
-    private final String executeCommand;
-    private final ICommandOutput output;
-    private final boolean isAsync;
-    private final CliCommandExecutor cliCommandExecutor;
-
-    private FutureTask<Integer> future;
-
-    protected ShellCmd(String executeCmd, ICommandOutput out, String host, 
String user, String password, boolean async) {
-        this.executeCommand = executeCmd;
-        this.output = out;
-        cliCommandExecutor = new CliCommandExecutor();
-        cliCommandExecutor.setRunAtRemote(host, user, password);
-        this.isAsync = async;
-    }
-
-    public ShellCmd(String executeCmd, String host, String user, String 
password, boolean async) {
-        this(executeCmd, new ShellCmdOutput(), host, user, password, async);
-    }
-
-    @Override
-    public ICommandOutput execute() throws JobException {
-
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        future = new FutureTask<Integer>(new Callable<Integer>() {
-            public Integer call() throws JobException, IOException {
-                executor.shutdown();
-                return executeCommand(executeCommand);
-            }
-        });
-        executor.execute(future);
-
-        int exitCode = -1;
-        if (!isAsync) {
-            try {
-                exitCode = future.get();
-                log.info("finish executing");
-            } catch (CancellationException e) {
-                log.debug("Command is cancelled");
-                exitCode = -2;
-            } catch (Exception e) {
-                throw new JobException("Error when execute job " + 
executeCommand, e);
-            } finally {
-                if (exitCode == 0) {
-                    output.setStatus(JobStepStatusEnum.FINISHED);
-                } else if (exitCode == -2) {
-                    output.setStatus(JobStepStatusEnum.DISCARDED);
-                } else {
-                    output.setStatus(JobStepStatusEnum.ERROR);
-                }
-                output.setExitCode(exitCode);
-            }
-        }
-        return output;
-    }
-
-    protected int executeCommand(String command) throws JobException, 
IOException {
-        output.reset();
-        output.setStatus(JobStepStatusEnum.RUNNING);
-        return cliCommandExecutor.execute(command, output).getFirst();
-    }
-
-    @Override
-    public void cancel() throws JobException {
-        future.cancel(true);
-    }
-
-    public static void main(String[] args) throws JobException {
-        ShellCmdOutput output = new ShellCmdOutput();
-        ShellCmd shellCmd = new ShellCmd(args[0], output, args[1], args[2], 
args[3], false);
-        shellCmd.execute();
-
-        
System.out.println("============================================================================");
-        System.out.println(output.getExitCode());
-        System.out.println(output.getOutput());
-    }
-}

Reply via email to