http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java 
b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
deleted file mode 100644
index 8f77558..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
-    public MergeDictionaryStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-        KylinConfig conf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-
-        Collections.sort(mergingSegments);
-
-        try {
-            checkLookupSnapshotsMustIncremental(mergingSegments);
-
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            mgr.updateCube(cube);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
-        }
-    }
-
-    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        List<CubeSegment> result = 
Lists.newArrayListWithCapacity(mergingSegmentIds.size());
-        for (String id : mergingSegmentIds) {
-            result.add(cube.getSegmentById(id));
-        }
-        return result;
-    }
-
-    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> 
mergingSegments) {
-
-        // FIXME check each newer snapshot has only NEW rows but no MODIFIED 
rows
-    }
-
-    /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries For those dictionaries on lookup table, just copy it from
-     * any one of the merging segments, it's guaranteed to be 
consistent(checked
-     * in CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     * @throws IOException
-     */
-    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, 
CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) 
dictMgr.decideSourceData(cubeDesc.getModel(), 
cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
-            logger.info("Merging fact table dictionary on : " + col);
-            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
-            for (CubeSegment segment : mergingSegments) {
-                logger.info("Including fact table dictionary of segment : " + 
segment);
-                if (segment.getDictResPath(col) != null) {
-                    DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                    dictInfos.add(dictInfo);
-                }
-            }
-            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
-        }
-
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = mergingSegments.get(0).getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
-    }
-
-    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, 
CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws 
IOException {
-        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
-        if (dictInfo != null)
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-        return dictInfo;
-    }
-
-    /**
-     * make snapshots for the new segment by copying from one of the underlying
-     * merging segments. it's guaranteed to be consistent(checked in
-     * CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     */
-    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment 
newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : 
lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java 
b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index 0c28d85..0000000
--- 
a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-import java.io.IOException;
-
-/**
- * Created by qianzhou on 1/4/15.
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CONVERT_TO_HFILE_STEP_ID = 
"convertToHFileStepId";
-    private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId";
-    private static final String CREATE_FLAT_TABLE_STEP_ID = 
"createFlatTableStepId";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    public UpdateCubeInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setConvertToHFileStepId(String id) {
-        setParam(CONVERT_TO_HFILE_STEP_ID, id);
-    }
-
-    private String getConvertToHfileStepId() {
-        return getParam(CONVERT_TO_HFILE_STEP_ID);
-    }
-
-    public void setBaseCuboidStepId(String id) {
-        setParam(BASE_CUBOID_STEP_ID, id);
-    }
-
-    private String getBaseCuboidStepId() {
-        return getParam(BASE_CUBOID_STEP_ID);
-    }
-
-    public void setCreateFlatTableStepId(String id) {
-        setParam(CREATE_FLAT_TABLE_STEP_ID, id);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
-
-        Output baseCuboidOutput = 
executableManager.getOutput(getBaseCuboidStepId());
-        String sourceRecordsCount = 
baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT);
-        long sourceCount = 0l;
-        if (StringUtils.isNotEmpty(sourceRecordsCount)) {
-            sourceCount = Long.parseLong(sourceRecordsCount);
-        } else {
-            logger.warn("Can not get cube source record count.");
-        }
-
-        long sourceSize = 0l;
-        String sourceRecordsSize = 
baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE);
-        if (StringUtils.isNotEmpty(sourceRecordsSize)) {
-            sourceSize = Long.parseLong(sourceRecordsSize);
-        } else {
-            logger.warn("Can not get cube source record size.");
-        }
-
-        long size = 0;
-        boolean segmentReady = true;
-        if (!StringUtils.isBlank(getConvertToHfileStepId())) {
-            String cubeSizeString = 
executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
-            if (StringUtils.isNotEmpty(cubeSizeString)) {
-                size = Long.parseLong(cubeSizeString) / 1024;
-            } else {
-                logger.warn("Can't get cube segment size.");
-            }
-        } else {
-            // for the increment & merge case, the increment segment is only 
built to be merged, won't serve query by itself
-            segmentReady = false;
-        }
-
-        segment.setLastBuildJobID(getCubingJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(size);
-        segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSize);
-
-        try {
-            if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
-            } else {
-                cubeManager.updateCube(cube);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java 
b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index 660d38d..0000000
--- 
a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- * Created by qianzhou on 1/7/15.
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CONVERT_TO_HFILE_STEP_ID = 
"convertToHFileStepId";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    private final CubeManager cubeManager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public UpdateCubeInfoAfterMergeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
-        if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no 
segment with id:" + getSegmentId());
-        }
-
-        long cubeSize = 0l;
-        String cubeSizeString = 
executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN);
-        if (StringUtils.isNotEmpty(cubeSizeString)) {
-            cubeSize = Long.parseLong(cubeSizeString) / 1024;
-        } else {
-            logger.warn("Can not get cube segment size.");
-        }
-
-        // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no 
merging segments");
-        }
-        long sourceCount = 0L;
-        long sourceSize = 0L;
-        for (String id : mergingSegmentIds) {
-            CubeSegment segment = cube.getSegmentById(id);
-            sourceCount += segment.getInputRecords();
-            sourceSize += segment.getInputRecordsSize();
-        }
-
-        // update segment info
-        mergedSegment.setSizeKB(cubeSize);
-        mergedSegment.setInputRecords(sourceCount);
-        mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (IOException e) {
-            logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, 
e.getLocalizedMessage());
-        }
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setConvertToHFileStepId(String id) {
-        setParam(CONVERT_TO_HFILE_STEP_ID, id);
-    }
-
-    private String getConvertToHfileStepId() {
-        return getParam(CONVERT_TO_HFILE_STEP_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java 
b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
deleted file mode 100644
index 482f7a0..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class ExecutableDao {
-
-    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new 
JsonSerializer<ExecutablePO>(ExecutablePO.class);
-    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER 
= new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
-    private static final Logger logger = 
LoggerFactory.getLogger(ExecutableDao.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = 
new ConcurrentHashMap<KylinConfig, ExecutableDao>();
-
-    private ResourceStore store;
-
-    public static ExecutableDao getInstance(KylinConfig config) {
-        ExecutableDao r = CACHE.get(config);
-        if (r == null) {
-            r = new ExecutableDao(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-
-        }
-        return r;
-    }
-
-    private ExecutableDao(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.store = MetadataManager.getInstance(config).getStore();
-    }
-
-    private String pathOfJob(ExecutablePO job) {
-        return pathOfJob(job.getUuid());
-    }
-
-    private String pathOfJob(String uuid) {
-        return ResourceStore.EXECUTE_PATH_ROOT + "/" + uuid;
-    }
-
-    private String pathOfJobOutput(String uuid) {
-        return ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + uuid;
-    }
-
-    private ExecutablePO readJobResource(String path) throws IOException {
-        return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
-    }
-
-    private void writeJobResource(String path, ExecutablePO job) throws 
IOException {
-        store.putResource(path, job, JOB_SERIALIZER);
-    }
-
-    private ExecutableOutputPO readJobOutputResource(String path) throws 
IOException {
-        return store.getResource(path, ExecutableOutputPO.class, 
JOB_OUTPUT_SERIALIZER);
-    }
-
-    private long writeJobOutputResource(String path, ExecutableOutputPO 
output) throws IOException {
-        return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
-    }
-
-    public List<ExecutableOutputPO> getJobOutputs() throws PersistentException 
{
-        try {
-            ArrayList<String> resources = 
store.listResources(ResourceStore.EXECUTE_OUTPUT_ROOT);
-            if (resources == null || resources.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(resources);
-            String rangeStart = resources.get(0);
-            String rangeEnd = resources.get(resources.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, 
ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<ExecutablePO> getJobs() throws PersistentException {
-        try {
-            final List<String> jobIds = 
store.listResources(ResourceStore.EXECUTE_PATH_ROOT);
-            if (jobIds == null || jobIds.isEmpty()) {
-                return Collections.emptyList();
-            }
-            Collections.sort(jobIds);
-            String rangeStart = jobIds.get(0);
-            String rangeEnd = jobIds.get(jobIds.size() - 1);
-            return store.getAllResources(rangeStart, rangeEnd, 
ExecutablePO.class, JOB_SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public List<String> getJobIds() throws PersistentException {
-        try {
-            ArrayList<String> resources = 
store.listResources(ResourceStore.EXECUTE_PATH_ROOT);
-            if (resources == null) {
-                return Collections.emptyList();
-            }
-            ArrayList<String> result = 
Lists.newArrayListWithExpectedSize(resources.size());
-            for (String path : resources) {
-                result.add(path.substring(path.lastIndexOf("/") + 1));
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get all Jobs:", e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO getJob(String uuid) throws PersistentException {
-        try {
-            return readJobResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error get job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
-        try {
-            if (getJob(job.getUuid()) != null) {
-                throw new IllegalArgumentException("job id:" + job.getUuid() + 
" already exists");
-            }
-            writeJobResource(pathOfJob(job), job);
-            return job;
-        } catch (IOException e) {
-            logger.error("error save job:" + job.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJob(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJob(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public ExecutableOutputPO getJobOutput(String uuid) throws 
PersistentException {
-        try {
-            ExecutableOutputPO result = 
readJobOutputResource(pathOfJobOutput(uuid));
-            if (result == null) {
-                result = new ExecutableOutputPO();
-                result.setUuid(uuid);
-                return result;
-            }
-            return result;
-        } catch (IOException e) {
-            logger.error("error get job output id:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void addJobOutput(ExecutableOutputPO output) throws 
PersistentException {
-        try {
-            output.setLastModified(0);
-            writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void updateJobOutput(ExecutableOutputPO output) throws 
PersistentException {
-        try {
-            final long ts = 
writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
-            output.setLastModified(ts);
-        } catch (IOException e) {
-            logger.error("error update job output id:" + output.getUuid(), e);
-            throw new PersistentException(e);
-        }
-    }
-
-    public void deleteJobOutput(String uuid) throws PersistentException {
-        try {
-            store.deleteResource(pathOfJobOutput(uuid));
-        } catch (IOException e) {
-            logger.error("error delete job:" + uuid, e);
-            throw new PersistentException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java 
b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index f086558..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO extends RootPersistentEntity {
-
-    @JsonProperty("content")
-    private String content;
-
-    @JsonProperty("status")
-    private String status = "READY";
-
-    @JsonProperty("info")
-    private Map<String, String> info = Maps.newHashMap();
-
-    public String getContent() {
-        return content;
-    }
-
-    public void setContent(String content) {
-        this.content = content;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public Map<String, String> getInfo() {
-        return info;
-    }
-
-    public void setInfo(Map<String, String> info) {
-        this.info = info;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java 
b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index 97b8d0b..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("tasks")
-    private List<ExecutablePO> tasks;
-
-    @JsonProperty("type")
-    private String type;
-
-    @JsonProperty("params")
-    private Map<String, String> params = Maps.newHashMap();
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public List<ExecutablePO> getTasks() {
-        return tasks;
-    }
-
-    public void setTasks(List<ExecutablePO> tasks) {
-        this.tasks = tasks;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java 
b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
deleted file mode 100644
index 8edc8a0..0000000
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.engine;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
-    private static final Logger logger = 
LoggerFactory.getLogger(JobEngineConfig.class);
-    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-
-    private static File getJobConfig(String fileName) {
-        String path = System.getProperty(KylinConfig.KYLIN_CONF);
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path, fileName);
-        }
-
-        path = KylinConfig.getKylinHome();
-        if (StringUtils.isNotEmpty(path)) {
-            return new File(path + File.separator + "conf", fileName);
-        }
-        return null;
-    }
-
-    private String getHadoopJobConfFilePath(RealizationCapacity capaticy, 
boolean appendSuffix) throws IOException {
-        String hadoopJobConfFile;
-        if (capaticy != null && appendSuffix) {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + 
capaticy.toString().toLowerCase() + ".xml");
-        } else {
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
-        }
-
-        File jobConfig = getJobConfig(hadoopJobConfFile);
-        if (jobConfig == null || !jobConfig.exists()) {
-            logger.warn("fail to locate " + hadoopJobConfFile + ", trying to 
locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
-            jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
-            if (jobConfig == null || !jobConfig.exists()) {
-                logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + 
".xml");
-                throw new RuntimeException("fail to locate " + 
HADOOP_JOB_CONF_FILENAME + ".xml");
-            }
-        }
-        return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
-    }
-
-    public String getHadoopJobConfFilePath(RealizationCapacity capaticy) 
throws IOException {
-        String path = getHadoopJobConfFilePath(capaticy, true);
-        if (!StringUtils.isEmpty(path)) {
-            logger.info("Chosen job conf is : " + path);
-            return path;
-        } else {
-            path = getHadoopJobConfFilePath(capaticy, false);
-            if (!StringUtils.isEmpty(path)) {
-                logger.info("Chosen job conf is : " + path);
-                return path;
-            }
-        }
-        return "";
-    }
-
-    private void inputStreamToFile(InputStream ins, File file) throws 
IOException {
-        OutputStream os = new FileOutputStream(file);
-        int bytesRead = 0;
-        byte[] buffer = new byte[8192];
-        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
-            os.write(buffer, 0, bytesRead);
-        }
-        os.close();
-        ins.close();
-    }
-
-    // there should be no setters
-    private final KylinConfig config;
-
-    public JobEngineConfig(KylinConfig config) {
-        this.config = config;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public String getHdfsWorkingDirectory() {
-        return config.getHdfsWorkingDirectory();
-    }
-
-    /**
-     * @return the maxConcurrentJobLimit
-     */
-    public int getMaxConcurrentJobLimit() {
-        return config.getMaxConcurrentJobLimit();
-    }
-
-    /**
-     * @return the timeZone
-     */
-    public String getTimeZone() {
-        return config.getTimeZone();
-    }
-
-    /**
-     * @return the adminDls
-     */
-    public String getAdminDls() {
-        return config.getAdminDls();
-    }
-
-    /**
-     * @return the jobStepTimeout
-     */
-    public long getJobStepTimeout() {
-        return config.getJobStepTimeout();
-    }
-
-    /**
-     * @return the asyncJobCheckInterval
-     */
-    public int getAsyncJobCheckInterval() {
-        return config.getYarnStatusCheckIntervalSeconds();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((config == null) ? 0 : config.hashCode());
-        return result;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        JobEngineConfig other = (JobEngineConfig) obj;
-        if (config == null) {
-            if (other.config != null)
-                return false;
-        } else if (!config.equals(other.config))
-            return false;
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java 
b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
deleted file mode 100644
index 7956fc0..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public class ExecuteException extends Exception {
-
-    private static final long serialVersionUID = 5677121412192984281L;
-
-    public ExecuteException() {
-    }
-
-    public ExecuteException(String message) {
-        super(message);
-    }
-
-    public ExecuteException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ExecuteException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
 
b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
deleted file mode 100644
index d57d1b3..0000000
--- 
a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * Created by qianzhou on 12/26/14.
- */
-public class IllegalStateTranferException extends RuntimeException {
-
-    private static final long serialVersionUID = 8466551519300132702L;
-
-    public IllegalStateTranferException() {
-    }
-
-    public IllegalStateTranferException(String message) {
-        super(message);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public IllegalStateTranferException(Throwable cause) {
-        super(cause);
-    }
-
-    public IllegalStateTranferException(String message, Throwable cause, 
boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java 
b/job/src/main/java/org/apache/kylin/job/exception/JobException.java
deleted file mode 100644
index 9b6cef6..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * @author xduo
- * 
- */
-public class JobException extends Exception {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 
-     */
-    public JobException() {
-        super();
-    }
-
-    /**
-     * @param message
-     * @param cause
-     */
-    public JobException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * @param message
-     */
-    public JobException(String message) {
-        super(message);
-    }
-
-    /**
-     * @param cause
-     */
-    public JobException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/exception/LockException.java 
b/job/src/main/java/org/apache/kylin/job/exception/LockException.java
deleted file mode 100644
index 67568fc..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * Created by qianzhou on 12/17/14.
- */
-public class LockException extends Exception {
-    private static final long serialVersionUID = 2072745879281754945L;
-
-    public LockException() {
-    }
-
-    public LockException(String message) {
-        super(message);
-    }
-
-    public LockException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public LockException(Throwable cause) {
-        super(cause);
-    }
-
-    public LockException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java 
b/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
deleted file mode 100644
index 60401b5..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public class PersistentException extends Exception {
-    private static final long serialVersionUID = -4239863858506718998L;
-
-    public PersistentException() {
-    }
-
-    public PersistentException(String message) {
-        super(message);
-    }
-
-    public PersistentException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public PersistentException(Throwable cause) {
-        super(cause);
-    }
-
-    public PersistentException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java 
b/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
deleted file mode 100644
index 190db6e..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.exception;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public class SchedulerException extends Exception {
-    private static final long serialVersionUID = 349041244824274861L;
-
-    public SchedulerException() {
-    }
-
-    public SchedulerException(String message) {
-        super(message);
-    }
-
-    public SchedulerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public SchedulerException(Throwable cause) {
-        super(cause);
-    }
-
-    public SchedulerException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
deleted file mode 100644
index f7e4332..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LogTitlePrinter;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public abstract class AbstractExecutable implements Executable, Idempotent {
-
-    protected static final String SUBMITTER = "submitter";
-    protected static final String NOTIFY_LIST = "notify_list";
-    protected static final String START_TIME = "startTime";
-    protected static final String END_TIME = "endTime";
-
-    protected static final Logger logger = 
LoggerFactory.getLogger(AbstractExecutable.class);
-
-    private String name;
-    private String id;
-    private Map<String, String> params = Maps.newHashMap();
-
-    protected static ExecutableManager executableManager = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public AbstractExecutable() {
-        setId(UUID.randomUUID().toString());
-    }
-
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, 
info, null);
-    }
-
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext 
executableContext) {
-        setEndTime(System.currentTimeMillis());
-        if (!isDiscarded()) {
-            if (result.succeed()) {
-                executableManager.updateJobOutput(getId(), 
ExecutableState.SUCCEED, null, result.output());
-            } else {
-                executableManager.updateJobOutput(getId(), 
ExecutableState.ERROR, null, result.output());
-            }
-        } else {
-        }
-    }
-
-    protected void onExecuteError(Throwable exception, ExecutableContext 
executableContext) {
-        if (!isDiscarded()) {
-            executableManager.addJobInfo(getId(), END_TIME, 
Long.toString(System.currentTimeMillis()));
-            String output = null;
-            if (exception != null) {
-                final StringWriter out = new StringWriter();
-                exception.printStackTrace(new PrintWriter(out));
-                output = out.toString();
-            }
-            executableManager.updateJobOutput(getId(), ExecutableState.ERROR, 
null, output);
-        } else {
-        }
-    }
-
-    @Override
-    public final ExecuteResult execute(ExecutableContext executableContext) 
throws ExecuteException {
-
-        //print a eye-catching title in log
-        LogTitlePrinter.printTitle(this.getName());
-
-        Preconditions.checkArgument(executableContext instanceof 
DefaultContext);
-        ExecuteResult result;
-        try {
-            onExecuteStart(executableContext);
-            result = doWork(executableContext);
-        } catch (Throwable e) {
-            logger.error("error running Executable", e);
-            onExecuteError(e, executableContext);
-            throw new ExecuteException(e);
-        }
-        onExecuteFinished(result, executableContext);
-        return result;
-    }
-
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException;
-
-    @Override
-    public void cleanup() throws ExecuteException {
-
-    }
-
-    @Override
-    public boolean isRunnable() {
-        return this.getStatus() == ExecutableState.READY;
-    }
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public final String getId() {
-        return this.id;
-    }
-
-    public final void setId(String id) {
-        this.id = id;
-    }
-
-    @Override
-    public final ExecutableState getStatus() {
-        return executableManager.getOutput(this.getId()).getState();
-    }
-
-    @Override
-    public final Map<String, String> getParams() {
-        return Collections.unmodifiableMap(this.params);
-    }
-
-    public final String getParam(String key) {
-        return this.params.get(key);
-    }
-
-    public final void setParam(String key, String value) {
-        this.params.put(key, value);
-    }
-
-    public final void setParams(Map<String, String> params) {
-        this.params.putAll(params);
-    }
-
-    public final long getLastModified() {
-        return executableManager.getOutput(getId()).getLastModified();
-    }
-
-    public final void setSubmitter(String submitter) {
-        setParam(SUBMITTER, submitter);
-    }
-
-    public final List<String> getNotifyList() {
-        final String str = getParam(NOTIFY_LIST);
-        if (str != null) {
-            return Lists.newArrayList(StringUtils.split(str, ","));
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public final void setNotifyList(String notifications) {
-        setParam(NOTIFY_LIST, notifications);
-    }
-
-    public final void setNotifyList(List<String> notifications) {
-        setNotifyList(StringUtils.join(notifications, ","));
-    }
-
-    protected Pair<String, String> formatNotifications(ExecutableState state) {
-        return null;
-    }
-
-    protected final void notifyUserStatusChange(ExecutableState state) {
-        try {
-            List<String> users = Lists.newArrayList();
-            users.addAll(getNotifyList());
-            final String adminDls = 
KylinConfig.getInstanceFromEnv().getAdminDls();
-            if (null != adminDls) {
-                for (String adminDl : adminDls.split(",")) {
-                    users.add(adminDl);
-                }
-            }
-            if (users.isEmpty()) {
-                return;
-            }
-            final Pair<String, String> email = formatNotifications(state);
-            if (email == null) {
-                return;
-            }
-            logger.info("prepare to send email to:" + users);
-            logger.info("job name:" + getName());
-            logger.info("submitter:" + getSubmitter());
-            logger.info("notify list:" + users);
-            new MailService().sendMail(users, email.getLeft(), 
email.getRight());
-        } catch (Exception e) {
-            logger.error(e.getLocalizedMessage(), e);
-        }
-    }
-
-    public final String getSubmitter() {
-        return getParam(SUBMITTER);
-    }
-
-    @Override
-    public final Output getOutput() {
-        return executableManager.getOutput(getId());
-    }
-
-    protected long getExtraInfoAsLong(String key, long defaultValue) {
-        return getExtraInfoAsLong(executableManager.getOutput(getId()), key, 
defaultValue);
-    }
-
-    public static long getStartTime(Output output) {
-        return getExtraInfoAsLong(output, START_TIME, 0L);
-    }
-
-    public static long getEndTime(Output output) {
-        return getExtraInfoAsLong(output, END_TIME, 0L);
-    }
-
-    public static long getDuration(long startTime, long endTime) {
-        if (startTime == 0) {
-            return 0;
-        }
-        if (endTime == 0) {
-            return System.currentTimeMillis() - startTime;
-        } else {
-            return endTime - startTime;
-        }
-    }
-
-    public static String getExtraInfo(Output output, String key) {
-        return output.getExtra().get(key);
-    }
-
-    public static long getExtraInfoAsLong(Output output, String key, long 
defaultValue) {
-        final String str = output.getExtra().get(key);
-        if (str != null) {
-            return Long.parseLong(str);
-        } else {
-            return defaultValue;
-        }
-    }
-
-    protected final void addExtraInfo(String key, String value) {
-        executableManager.addJobInfo(getId(), key, value);
-    }
-
-    public final void setStartTime(long time) {
-        addExtraInfo(START_TIME, time + "");
-    }
-
-    public final void setEndTime(long time) {
-        addExtraInfo(END_TIME, time + "");
-    }
-
-    public final long getStartTime() {
-        return getExtraInfoAsLong(START_TIME, 0L);
-    }
-
-    public final long getEndTime() {
-        return getExtraInfoAsLong(END_TIME, 0L);
-    }
-
-    public final long getDuration() {
-        return getDuration(getStartTime(), getEndTime());
-    }
-
-    /*
-    * discarded is triggered by JobService, the Scheduler is not awake of that
-    *
-    * */
-    protected final boolean isDiscarded() {
-        final ExecutableState status = 
executableManager.getOutput(getId()).getState();
-        return status == ExecutableState.DISCARDED;
-    }
-
-    @Override
-    public String toString() {
-        return Objects.toStringHelper(this).add("id", getId()).add("name", 
getName()).add("state", getStatus()).toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java 
b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
deleted file mode 100644
index f372f38..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public interface ChainedExecutable extends Executable {
-
-    List<? extends AbstractExecutable> getTasks();
-
-    void addTask(AbstractExecutable executable);
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
 
b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
deleted file mode 100644
index 61140b5..0000000
--- 
a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.manager.ExecutableManager;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/16/14.
- */
-public class DefaultChainedExecutable extends AbstractExecutable implements 
ChainedExecutable {
-
-    private final List<AbstractExecutable> subTasks = Lists.newArrayList();
-
-    protected final ExecutableManager jobService = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public DefaultChainedExecutable() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
-        List<? extends Executable> executables = getTasks();
-        final int size = executables.size();
-        for (int i = 0; i < size; ++i) {
-            Executable subTask = executables.get(i);
-            if (subTask.isRunnable()) {
-                return subTask.execute(context);
-            }
-        }
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, null);
-    }
-
-    @Override
-    protected void onExecuteStart(ExecutableContext executableContext) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        final long startTime = getStartTime();
-        if (startTime > 0) {
-            jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, 
null);
-        } else {
-            jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, 
null);
-        }
-    }
-
-    @Override
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext 
executableContext) {
-        if (isDiscarded()) {
-            setEndTime(System.currentTimeMillis());
-            notifyUserStatusChange(ExecutableState.DISCARDED);
-        } else if (result.succeed()) {
-            List<? extends Executable> jobs = getTasks();
-            boolean allSucceed = true;
-            boolean hasError = false;
-            for (Executable task : jobs) {
-                final ExecutableState status = task.getStatus();
-                if (status == ExecutableState.ERROR) {
-                    hasError = true;
-                }
-                if (status != ExecutableState.SUCCEED) {
-                    allSucceed = false;
-                }
-            }
-            if (allSucceed) {
-                setEndTime(System.currentTimeMillis());
-                jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, 
null, null);
-                notifyUserStatusChange(ExecutableState.SUCCEED);
-            } else if (hasError) {
-                setEndTime(System.currentTimeMillis());
-                jobService.updateJobOutput(getId(), ExecutableState.ERROR, 
null, null);
-                notifyUserStatusChange(ExecutableState.ERROR);
-            } else {
-                jobService.updateJobOutput(getId(), ExecutableState.READY, 
null, null);
-            }
-        } else {
-            setEndTime(System.currentTimeMillis());
-            jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, 
null);
-            notifyUserStatusChange(ExecutableState.ERROR);
-        }
-    }
-
-    @Override
-    public List<AbstractExecutable> getTasks() {
-        return subTasks;
-    }
-
-    public final AbstractExecutable getTaskByName(String name) {
-        for (AbstractExecutable task : subTasks) {
-            if (task.getName() != null && 
task.getName().equalsIgnoreCase(name)) {
-                return task;
-            }
-        }
-        return null;
-    }
-
-    public void addTask(AbstractExecutable executable) {
-        executable.setId(getId() + "-" + String.format("%02d", 
subTasks.size()));
-        this.subTasks.add(executable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java 
b/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
deleted file mode 100644
index a2ee08e..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * Created by qianzhou on 1/6/15.
- */
-public class DefaultOutput implements Output {
-
-    private ExecutableState state;
-    private Map<String, String> extra;
-    private String verboseMsg;
-    private long lastModified;
-
-    @Override
-    public Map<String, String> getExtra() {
-        return extra;
-    }
-
-    @Override
-    public String getVerboseMsg() {
-        return verboseMsg;
-    }
-
-    @Override
-    public ExecutableState getState() {
-        return state;
-    }
-
-    @Override
-    public long getLastModified() {
-        return lastModified;
-    }
-
-    public void setState(ExecutableState state) {
-        this.state = state;
-    }
-
-    public void setExtra(Map<String, String> extra) {
-        this.extra = extra;
-    }
-
-    public void setVerboseMsg(String verboseMsg) {
-        this.verboseMsg = verboseMsg;
-    }
-
-    public void setLastModified(long lastModified) {
-        this.lastModified = lastModified;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int hashCode = state.hashCode();
-        hashCode = hashCode * prime + extra.hashCode();
-        hashCode = hashCode * prime + verboseMsg.hashCode();
-        hashCode = hashCode * prime + Long.valueOf(lastModified).hashCode();
-        return hashCode;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (!(obj instanceof DefaultOutput)) {
-            return false;
-        }
-        DefaultOutput another = ((DefaultOutput) obj);
-        if (this.state != another.state) {
-            return false;
-        }
-        if (!extra.equals(another.extra)) {
-            return false;
-        }
-        if (this.lastModified != another.lastModified) {
-            return false;
-        }
-        return StringUtils.equals(verboseMsg, another.verboseMsg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Executable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Executable.java 
b/job/src/main/java/org/apache/kylin/job/execution/Executable.java
deleted file mode 100644
index 2131655..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Executable.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public interface Executable {
-
-    String getId();
-
-    String getName();
-
-    ExecuteResult execute(ExecutableContext executableContext) throws 
ExecuteException;
-
-    ExecutableState getStatus();
-
-    Output getOutput();
-
-    boolean isRunnable();
-
-    Map<String, String> getParams();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java 
b/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
deleted file mode 100644
index 2886893..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.common.KylinConfig;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public interface ExecutableContext {
-
-    Object getSchedulerContext();
-
-    KylinConfig getConfig();
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java 
b/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
deleted file mode 100644
index 95089d2..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-import com.google.common.base.Supplier;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public enum ExecutableState {
-
-    READY, RUNNING, ERROR, STOPPED, DISCARDED, SUCCEED;
-
-    private static Multimap<ExecutableState, ExecutableState> 
VALID_STATE_TRANSFER;
-
-    static {
-        VALID_STATE_TRANSFER = Multimaps.newSetMultimap(Maps.<ExecutableState, 
Collection<ExecutableState>> newEnumMap(ExecutableState.class), new 
Supplier<Set<ExecutableState>>() {
-            @Override
-            public Set<ExecutableState> get() {
-                return new CopyOnWriteArraySet<ExecutableState>();
-            }
-        });
-
-        //scheduler
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, 
ExecutableState.RUNNING);
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.ERROR);
-        //user
-        VALID_STATE_TRANSFER.put(ExecutableState.READY, 
ExecutableState.DISCARDED);
-
-        //job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, 
ExecutableState.READY);
-        //job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, 
ExecutableState.SUCCEED);
-        //user
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, 
ExecutableState.DISCARDED);
-        //scheduler,job
-        VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, 
ExecutableState.ERROR);
-
-        VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, 
ExecutableState.DISCARDED);
-        VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, 
ExecutableState.READY);
-
-        VALID_STATE_TRANSFER.put(ExecutableState.ERROR, 
ExecutableState.DISCARDED);
-        VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.READY);
-    }
-
-    public boolean isFinalState() {
-        return this == SUCCEED || this == DISCARDED;
-    }
-
-    public static boolean isValidStateTransfer(ExecutableState from, 
ExecutableState to) {
-        return VALID_STATE_TRANSFER.containsEntry(from, to);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git 
a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java 
b/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
deleted file mode 100644
index 27a2407..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public final class ExecuteResult {
-
-    public static enum State {
-        SUCCEED, FAILED, ERROR, DISCARDED, STOPPED
-    }
-
-    private final State state;
-    private final String output;
-
-    public ExecuteResult(State state) {
-        this(state, "");
-    }
-
-    public ExecuteResult(State state, String output) {
-        Preconditions.checkArgument(state != null, "state cannot be null");
-        this.state = state;
-        this.output = output;
-    }
-
-    public State state() {
-        return state;
-    }
-
-    public boolean succeed() {
-        return state == State.SUCCEED;
-    }
-
-    public String output() {
-        return output;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java 
b/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
deleted file mode 100644
index 6515343..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import org.apache.kylin.job.exception.ExecuteException;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public interface Idempotent {
-
-    void cleanup() throws ExecuteException;
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Output.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/execution/Output.java 
b/job/src/main/java/org/apache/kylin/job/execution/Output.java
deleted file mode 100644
index f77c71a..0000000
--- a/job/src/main/java/org/apache/kylin/job/execution/Output.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.execution;
-
-import java.util.Map;
-
-/**
- * Created by qianzhou on 1/6/15.
- */
-public interface Output {
-
-    Map<String, String> getExtra();
-
-    String getVerboseMsg();
-
-    ExecutableState getState();
-
-    long getLastModified();
-}

Reply via email to