http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java deleted file mode 100644 index 8f77558..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.DimensionDesc; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.model.TblColRef; - -import com.google.common.collect.Lists; - -public class MergeDictionaryStep extends AbstractExecutable { - - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; - - public MergeDictionaryStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - KylinConfig conf = context.getConfig(); - final CubeManager mgr = CubeManager.getInstance(conf); - final CubeInstance cube = mgr.getCube(getCubeName()); - final CubeSegment newSegment = cube.getSegmentById(getSegmentId()); - final List<CubeSegment> mergingSegments = getMergingSegments(cube); - - Collections.sort(mergingSegments); - - try { - checkLookupSnapshotsMustIncremental(mergingSegments); - - makeDictForNewSegment(conf, cube, newSegment, mergingSegments); - makeSnapshotForNewSegment(cube, newSegment, mergingSegments); - - mgr.updateCube(cube); - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to merge dictionary or lookup snapshots", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - private List<CubeSegment> getMergingSegments(CubeInstance cube) { - List<String> mergingSegmentIds = getMergingSegmentIds(); - List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size()); - for (String id : mergingSegmentIds) { - result.add(cube.getSegmentById(id)); - } - return result; - } - - private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) { - - // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows - } - - /** - * For the new segment, we need to create dictionaries for it, too. For - * those dictionaries on fact table, create it by merging underlying - * dictionaries For those dictionaries on lookup table, just copy it from - * any one of the merging segments, it's guaranteed to be consistent(checked - * in CubeSegmentValidator) - * - * @param cube - * @param newSeg - * @throws IOException - */ - private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException { - HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>(); - HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>(); - DictionaryManager dictMgr = DictionaryManager.getInstance(conf); - - CubeDesc cubeDesc = cube.getDescriptor(); - for (DimensionDesc dim : cubeDesc.getDimensions()) { - for (TblColRef col : dim.getColumnRefs()) { - if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) { - String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; - if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { - colsNeedMeringDict.add(col); - } else { - colsNeedCopyDict.add(col); - } - } - } - } - - for (TblColRef col : colsNeedMeringDict) { - logger.info("Merging fact table dictionary on : " + col); - List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>(); - for (CubeSegment segment : mergingSegments) { - logger.info("Including fact table dictionary of segment : " + segment); - if (segment.getDictResPath(col) != null) { - DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col)); - dictInfos.add(dictInfo); - } - } - mergeDictionaries(dictMgr, newSeg, dictInfos, col); - } - - for (TblColRef col : colsNeedCopyDict) { - String path = mergingSegments.get(0).getDictResPath(col); - newSeg.putDictResPath(col, path); - } - } - - private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException { - DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts); - if (dictInfo != null) - cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); - return dictInfo; - } - - /** - * make snapshots for the new segment by copying from one of the underlying - * merging segments. it's guaranteed to be consistent(checked in - * CubeSegmentValidator) - * - * @param cube - * @param newSeg - */ - private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) { - CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1); - for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) { - newSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); - } - } - - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setMergingSegmentIds(List<String> ids) { - setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); - } - - private List<String> getMergingSegmentIds() { - final String ids = getParam(MERGING_SEGMENT_IDS); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java deleted file mode 100644 index 0c28d85..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.job.execution.Output; - -import java.io.IOException; - -/** - * Created by qianzhou on 1/4/15. - */ -public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { - - private static final String SEGMENT_ID = "segmentId"; - private static final String CUBE_NAME = "cubeName"; - private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; - private static final String BASE_CUBOID_STEP_ID = "baseCuboidStepId"; - private static final String CREATE_FLAT_TABLE_STEP_ID = "createFlatTableStepId"; - private static final String CUBING_JOB_ID = "cubingJobId"; - - public UpdateCubeInfoAfterBuildStep() { - super(); - } - - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setConvertToHFileStepId(String id) { - setParam(CONVERT_TO_HFILE_STEP_ID, id); - } - - private String getConvertToHfileStepId() { - return getParam(CONVERT_TO_HFILE_STEP_ID); - } - - public void setBaseCuboidStepId(String id) { - setParam(BASE_CUBOID_STEP_ID, id); - } - - private String getBaseCuboidStepId() { - return getParam(BASE_CUBOID_STEP_ID); - } - - public void setCreateFlatTableStepId(String id) { - setParam(CREATE_FLAT_TABLE_STEP_ID, id); - } - - public void setCubingJobId(String id) { - setParam(CUBING_JOB_ID, id); - } - - private String getCubingJobId() { - return getParam(CUBING_JOB_ID); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); - final CubeInstance cube = cubeManager.getCube(getCubeName()); - final CubeSegment segment = cube.getSegmentById(getSegmentId()); - - Output baseCuboidOutput = executableManager.getOutput(getBaseCuboidStepId()); - String sourceRecordsCount = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_COUNT); - long sourceCount = 0l; - if (StringUtils.isNotEmpty(sourceRecordsCount)) { - sourceCount = Long.parseLong(sourceRecordsCount); - } else { - logger.warn("Can not get cube source record count."); - } - - long sourceSize = 0l; - String sourceRecordsSize = baseCuboidOutput.getExtra().get(ExecutableConstants.SOURCE_RECORDS_SIZE); - if (StringUtils.isNotEmpty(sourceRecordsSize)) { - sourceSize = Long.parseLong(sourceRecordsSize); - } else { - logger.warn("Can not get cube source record size."); - } - - long size = 0; - boolean segmentReady = true; - if (!StringUtils.isBlank(getConvertToHfileStepId())) { - String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN); - if (StringUtils.isNotEmpty(cubeSizeString)) { - size = Long.parseLong(cubeSizeString) / 1024; - } else { - logger.warn("Can't get cube segment size."); - } - } else { - // for the increment & merge case, the increment segment is only built to be merged, won't serve query by itself - segmentReady = false; - } - - segment.setLastBuildJobID(getCubingJobId()); - segment.setLastBuildTime(System.currentTimeMillis()); - segment.setSizeKB(size); - segment.setInputRecords(sourceCount); - segment.setInputRecordsSize(sourceSize); - - try { - if (segmentReady) { - cubeManager.promoteNewlyBuiltSegments(cube, segment); - } else { - cubeManager.updateCube(cube); - } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } catch (IOException e) { - logger.error("fail to update cube after build", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java deleted file mode 100644 index 660d38d..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; - -import com.google.common.collect.Lists; - -/** - * Created by qianzhou on 1/7/15. - */ -public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { - - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; - private static final String CONVERT_TO_HFILE_STEP_ID = "convertToHFileStepId"; - private static final String CUBING_JOB_ID = "cubingJobId"; - - private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); - - public UpdateCubeInfoAfterMergeStep() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final CubeInstance cube = cubeManager.getCube(getCubeName()); - - CubeSegment mergedSegment = cube.getSegmentById(getSegmentId()); - if (mergedSegment == null) { - return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId()); - } - - long cubeSize = 0l; - String cubeSizeString = executableManager.getOutput(getConvertToHfileStepId()).getExtra().get(ExecutableConstants.HDFS_BYTES_WRITTEN); - if (StringUtils.isNotEmpty(cubeSizeString)) { - cubeSize = Long.parseLong(cubeSizeString) / 1024; - } else { - logger.warn("Can not get cube segment size."); - } - - // collect source statistics - List<String> mergingSegmentIds = getMergingSegmentIds(); - if (mergingSegmentIds.isEmpty()) { - return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments"); - } - long sourceCount = 0L; - long sourceSize = 0L; - for (String id : mergingSegmentIds) { - CubeSegment segment = cube.getSegmentById(id); - sourceCount += segment.getInputRecords(); - sourceSize += segment.getInputRecordsSize(); - } - - // update segment info - mergedSegment.setSizeKB(cubeSize); - mergedSegment.setInputRecords(sourceCount); - mergedSegment.setInputRecordsSize(sourceSize); - mergedSegment.setLastBuildJobID(getCubingJobId()); - mergedSegment.setLastBuildTime(System.currentTimeMillis()); - - try { - cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment); - return new ExecuteResult(ExecuteResult.State.SUCCEED); - } catch (IOException e) { - logger.error("fail to update cube after merge", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setMergingSegmentIds(List<String> ids) { - setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); - } - - private List<String> getMergingSegmentIds() { - final String ids = getParam(MERGING_SEGMENT_IDS); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - - public void setConvertToHFileStepId(String id) { - setParam(CONVERT_TO_HFILE_STEP_ID, id); - } - - private String getConvertToHfileStepId() { - return getParam(CONVERT_TO_HFILE_STEP_ID); - } - - public void setCubingJobId(String id) { - setParam(CUBING_JOB_ID, id); - } - - private String getCubingJobId() { - return getParam(CUBING_JOB_ID); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java deleted file mode 100644 index 482f7a0..0000000 --- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.metadata.MetadataManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - */ -public class ExecutableDao { - - private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class); - private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class); - private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class); - private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>(); - - private ResourceStore store; - - public static ExecutableDao getInstance(KylinConfig config) { - ExecutableDao r = CACHE.get(config); - if (r == null) { - r = new ExecutableDao(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - - } - return r; - } - - private ExecutableDao(KylinConfig config) { - logger.info("Using metadata url: " + config); - this.store = MetadataManager.getInstance(config).getStore(); - } - - private String pathOfJob(ExecutablePO job) { - return pathOfJob(job.getUuid()); - } - - private String pathOfJob(String uuid) { - return ResourceStore.EXECUTE_PATH_ROOT + "/" + uuid; - } - - private String pathOfJobOutput(String uuid) { - return ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + uuid; - } - - private ExecutablePO readJobResource(String path) throws IOException { - return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER); - } - - private void writeJobResource(String path, ExecutablePO job) throws IOException { - store.putResource(path, job, JOB_SERIALIZER); - } - - private ExecutableOutputPO readJobOutputResource(String path) throws IOException { - return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); - } - - private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException { - return store.putResource(path, output, JOB_OUTPUT_SERIALIZER); - } - - public List<ExecutableOutputPO> getJobOutputs() throws PersistentException { - try { - ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_ROOT); - if (resources == null || resources.isEmpty()) { - return Collections.emptyList(); - } - Collections.sort(resources); - String rangeStart = resources.get(0); - String rangeEnd = resources.get(resources.size() - 1); - return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER); - } catch (IOException e) { - logger.error("error get all Jobs:", e); - throw new PersistentException(e); - } - } - - public List<ExecutablePO> getJobs() throws PersistentException { - try { - final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_PATH_ROOT); - if (jobIds == null || jobIds.isEmpty()) { - return Collections.emptyList(); - } - Collections.sort(jobIds); - String rangeStart = jobIds.get(0); - String rangeEnd = jobIds.get(jobIds.size() - 1); - return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER); - } catch (IOException e) { - logger.error("error get all Jobs:", e); - throw new PersistentException(e); - } - } - - public List<String> getJobIds() throws PersistentException { - try { - ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_PATH_ROOT); - if (resources == null) { - return Collections.emptyList(); - } - ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size()); - for (String path : resources) { - result.add(path.substring(path.lastIndexOf("/") + 1)); - } - return result; - } catch (IOException e) { - logger.error("error get all Jobs:", e); - throw new PersistentException(e); - } - } - - public ExecutablePO getJob(String uuid) throws PersistentException { - try { - return readJobResource(pathOfJob(uuid)); - } catch (IOException e) { - logger.error("error get job:" + uuid, e); - throw new PersistentException(e); - } - } - - public ExecutablePO addJob(ExecutablePO job) throws PersistentException { - try { - if (getJob(job.getUuid()) != null) { - throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists"); - } - writeJobResource(pathOfJob(job), job); - return job; - } catch (IOException e) { - logger.error("error save job:" + job.getUuid(), e); - throw new PersistentException(e); - } - } - - public void deleteJob(String uuid) throws PersistentException { - try { - store.deleteResource(pathOfJob(uuid)); - } catch (IOException e) { - logger.error("error delete job:" + uuid, e); - throw new PersistentException(e); - } - } - - public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException { - try { - ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid)); - if (result == null) { - result = new ExecutableOutputPO(); - result.setUuid(uuid); - return result; - } - return result; - } catch (IOException e) { - logger.error("error get job output id:" + uuid, e); - throw new PersistentException(e); - } - } - - public void addJobOutput(ExecutableOutputPO output) throws PersistentException { - try { - output.setLastModified(0); - writeJobOutputResource(pathOfJobOutput(output.getUuid()), output); - } catch (IOException e) { - logger.error("error update job output id:" + output.getUuid(), e); - throw new PersistentException(e); - } - } - - public void updateJobOutput(ExecutableOutputPO output) throws PersistentException { - try { - final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output); - output.setLastModified(ts); - } catch (IOException e) { - logger.error("error update job output id:" + output.getUuid(), e); - throw new PersistentException(e); - } - } - - public void deleteJobOutput(String uuid) throws PersistentException { - try { - store.deleteResource(pathOfJobOutput(uuid)); - } catch (IOException e) { - logger.error("error delete job:" + uuid, e); - throw new PersistentException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java deleted file mode 100644 index f086558..0000000 --- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.util.Map; - -import org.apache.kylin.common.persistence.RootPersistentEntity; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; - -/** - * Created by qianzhou on 12/15/14. - */ -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ExecutableOutputPO extends RootPersistentEntity { - - @JsonProperty("content") - private String content; - - @JsonProperty("status") - private String status = "READY"; - - @JsonProperty("info") - private Map<String, String> info = Maps.newHashMap(); - - public String getContent() { - return content; - } - - public void setContent(String content) { - this.content = content; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public Map<String, String> getInfo() { - return info; - } - - public void setInfo(Map<String, String> info) { - this.info = info; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java deleted file mode 100644 index 97b8d0b..0000000 --- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.persistence.RootPersistentEntity; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; - -/** - * Created by qianzhou on 12/15/14. - */ -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ExecutablePO extends RootPersistentEntity { - - @JsonProperty("name") - private String name; - - @JsonProperty("tasks") - private List<ExecutablePO> tasks; - - @JsonProperty("type") - private String type; - - @JsonProperty("params") - private Map<String, String> params = Maps.newHashMap(); - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public List<ExecutablePO> getTasks() { - return tasks; - } - - public void setTasks(List<ExecutablePO> tasks) { - this.tasks = tasks; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public Map<String, String> getParams() { - return params; - } - - public void setParams(Map<String, String> params) { - this.params = params; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java deleted file mode 100644 index 8edc8a0..0000000 --- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.engine; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.tools.OptionsHelper; -import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author ysong1 - */ -public class JobEngineConfig { - private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class); - public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf"; - - private static File getJobConfig(String fileName) { - String path = System.getProperty(KylinConfig.KYLIN_CONF); - if (StringUtils.isNotEmpty(path)) { - return new File(path, fileName); - } - - path = KylinConfig.getKylinHome(); - if (StringUtils.isNotEmpty(path)) { - return new File(path + File.separator + "conf", fileName); - } - return null; - } - - private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException { - String hadoopJobConfFile; - if (capaticy != null && appendSuffix) { - hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml"); - } else { - hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml"); - } - - File jobConfig = getJobConfig(hadoopJobConfFile); - if (jobConfig == null || !jobConfig.exists()) { - logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml"); - jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml"); - if (jobConfig == null || !jobConfig.exists()) { - logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml"); - throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml"); - } - } - return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath()); - } - - public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException { - String path = getHadoopJobConfFilePath(capaticy, true); - if (!StringUtils.isEmpty(path)) { - logger.info("Chosen job conf is : " + path); - return path; - } else { - path = getHadoopJobConfFilePath(capaticy, false); - if (!StringUtils.isEmpty(path)) { - logger.info("Chosen job conf is : " + path); - return path; - } - } - return ""; - } - - private void inputStreamToFile(InputStream ins, File file) throws IOException { - OutputStream os = new FileOutputStream(file); - int bytesRead = 0; - byte[] buffer = new byte[8192]; - while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) { - os.write(buffer, 0, bytesRead); - } - os.close(); - ins.close(); - } - - // there should be no setters - private final KylinConfig config; - - public JobEngineConfig(KylinConfig config) { - this.config = config; - } - - public KylinConfig getConfig() { - return config; - } - - public String getHdfsWorkingDirectory() { - return config.getHdfsWorkingDirectory(); - } - - /** - * @return the maxConcurrentJobLimit - */ - public int getMaxConcurrentJobLimit() { - return config.getMaxConcurrentJobLimit(); - } - - /** - * @return the timeZone - */ - public String getTimeZone() { - return config.getTimeZone(); - } - - /** - * @return the adminDls - */ - public String getAdminDls() { - return config.getAdminDls(); - } - - /** - * @return the jobStepTimeout - */ - public long getJobStepTimeout() { - return config.getJobStepTimeout(); - } - - /** - * @return the asyncJobCheckInterval - */ - public int getAsyncJobCheckInterval() { - return config.getYarnStatusCheckIntervalSeconds(); - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((config == null) ? 0 : config.hashCode()); - return result; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - JobEngineConfig other = (JobEngineConfig) obj; - if (config == null) { - if (other.config != null) - return false; - } else if (!config.equals(other.config)) - return false; - return true; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java deleted file mode 100644 index 7956fc0..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * Created by qianzhou on 12/15/14. - */ -public class ExecuteException extends Exception { - - private static final long serialVersionUID = 5677121412192984281L; - - public ExecuteException() { - } - - public ExecuteException(String message) { - super(message); - } - - public ExecuteException(String message, Throwable cause) { - super(message, cause); - } - - public ExecuteException(Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java deleted file mode 100644 index d57d1b3..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * Created by qianzhou on 12/26/14. - */ -public class IllegalStateTranferException extends RuntimeException { - - private static final long serialVersionUID = 8466551519300132702L; - - public IllegalStateTranferException() { - } - - public IllegalStateTranferException(String message) { - super(message); - } - - public IllegalStateTranferException(String message, Throwable cause) { - super(message, cause); - } - - public IllegalStateTranferException(Throwable cause) { - super(cause); - } - - public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/JobException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java b/job/src/main/java/org/apache/kylin/job/exception/JobException.java deleted file mode 100644 index 9b6cef6..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * @author xduo - * - */ -public class JobException extends Exception { - - private static final long serialVersionUID = 1L; - - /** - * - */ - public JobException() { - super(); - } - - /** - * @param message - * @param cause - */ - public JobException(String message, Throwable cause) { - super(message, cause); - } - - /** - * @param message - */ - public JobException(String message) { - super(message); - } - - /** - * @param cause - */ - public JobException(Throwable cause) { - super(cause); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/LockException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/LockException.java b/job/src/main/java/org/apache/kylin/job/exception/LockException.java deleted file mode 100644 index 67568fc..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * Created by qianzhou on 12/17/14. - */ -public class LockException extends Exception { - private static final long serialVersionUID = 2072745879281754945L; - - public LockException() { - } - - public LockException(String message) { - super(message); - } - - public LockException(String message, Throwable cause) { - super(message, cause); - } - - public LockException(Throwable cause) { - super(cause); - } - - public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java b/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java deleted file mode 100644 index 60401b5..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/PersistentException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * Created by qianzhou on 12/15/14. - */ -public class PersistentException extends Exception { - private static final long serialVersionUID = -4239863858506718998L; - - public PersistentException() { - } - - public PersistentException(String message) { - super(message); - } - - public PersistentException(String message, Throwable cause) { - super(message, cause); - } - - public PersistentException(Throwable cause) { - super(cause); - } - - public PersistentException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java b/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java deleted file mode 100644 index 190db6e..0000000 --- a/job/src/main/java/org/apache/kylin/job/exception/SchedulerException.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.exception; - -/** - * Created by qianzhou on 12/15/14. - */ -public class SchedulerException extends Exception { - private static final long serialVersionUID = 349041244824274861L; - - public SchedulerException() { - } - - public SchedulerException(String message) { - super(message); - } - - public SchedulerException(String message, Throwable cause) { - super(message, cause); - } - - public SchedulerException(Throwable cause) { - super(cause); - } - - public SchedulerException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java deleted file mode 100644 index f7e4332..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.LogTitlePrinter; -import org.apache.kylin.common.util.MailService; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.impl.threadpool.DefaultContext; -import org.apache.kylin.job.manager.ExecutableManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - */ -public abstract class AbstractExecutable implements Executable, Idempotent { - - protected static final String SUBMITTER = "submitter"; - protected static final String NOTIFY_LIST = "notify_list"; - protected static final String START_TIME = "startTime"; - protected static final String END_TIME = "endTime"; - - protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); - - private String name; - private String id; - private Map<String, String> params = Maps.newHashMap(); - - protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - - public AbstractExecutable() { - setId(UUID.randomUUID().toString()); - } - - protected void onExecuteStart(ExecutableContext executableContext) { - Map<String, String> info = Maps.newHashMap(); - info.put(START_TIME, Long.toString(System.currentTimeMillis())); - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null); - } - - protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { - setEndTime(System.currentTimeMillis()); - if (!isDiscarded()) { - if (result.succeed()) { - executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); - } else { - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); - } - } else { - } - } - - protected void onExecuteError(Throwable exception, ExecutableContext executableContext) { - if (!isDiscarded()) { - executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis())); - String output = null; - if (exception != null) { - final StringWriter out = new StringWriter(); - exception.printStackTrace(new PrintWriter(out)); - output = out.toString(); - } - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output); - } else { - } - } - - @Override - public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException { - - //print a eye-catching title in log - LogTitlePrinter.printTitle(this.getName()); - - Preconditions.checkArgument(executableContext instanceof DefaultContext); - ExecuteResult result; - try { - onExecuteStart(executableContext); - result = doWork(executableContext); - } catch (Throwable e) { - logger.error("error running Executable", e); - onExecuteError(e, executableContext); - throw new ExecuteException(e); - } - onExecuteFinished(result, executableContext); - return result; - } - - protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException; - - @Override - public void cleanup() throws ExecuteException { - - } - - @Override - public boolean isRunnable() { - return this.getStatus() == ExecutableState.READY; - } - - @Override - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public final String getId() { - return this.id; - } - - public final void setId(String id) { - this.id = id; - } - - @Override - public final ExecutableState getStatus() { - return executableManager.getOutput(this.getId()).getState(); - } - - @Override - public final Map<String, String> getParams() { - return Collections.unmodifiableMap(this.params); - } - - public final String getParam(String key) { - return this.params.get(key); - } - - public final void setParam(String key, String value) { - this.params.put(key, value); - } - - public final void setParams(Map<String, String> params) { - this.params.putAll(params); - } - - public final long getLastModified() { - return executableManager.getOutput(getId()).getLastModified(); - } - - public final void setSubmitter(String submitter) { - setParam(SUBMITTER, submitter); - } - - public final List<String> getNotifyList() { - final String str = getParam(NOTIFY_LIST); - if (str != null) { - return Lists.newArrayList(StringUtils.split(str, ",")); - } else { - return Collections.emptyList(); - } - } - - public final void setNotifyList(String notifications) { - setParam(NOTIFY_LIST, notifications); - } - - public final void setNotifyList(List<String> notifications) { - setNotifyList(StringUtils.join(notifications, ",")); - } - - protected Pair<String, String> formatNotifications(ExecutableState state) { - return null; - } - - protected final void notifyUserStatusChange(ExecutableState state) { - try { - List<String> users = Lists.newArrayList(); - users.addAll(getNotifyList()); - final String adminDls = KylinConfig.getInstanceFromEnv().getAdminDls(); - if (null != adminDls) { - for (String adminDl : adminDls.split(",")) { - users.add(adminDl); - } - } - if (users.isEmpty()) { - return; - } - final Pair<String, String> email = formatNotifications(state); - if (email == null) { - return; - } - logger.info("prepare to send email to:" + users); - logger.info("job name:" + getName()); - logger.info("submitter:" + getSubmitter()); - logger.info("notify list:" + users); - new MailService().sendMail(users, email.getLeft(), email.getRight()); - } catch (Exception e) { - logger.error(e.getLocalizedMessage(), e); - } - } - - public final String getSubmitter() { - return getParam(SUBMITTER); - } - - @Override - public final Output getOutput() { - return executableManager.getOutput(getId()); - } - - protected long getExtraInfoAsLong(String key, long defaultValue) { - return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue); - } - - public static long getStartTime(Output output) { - return getExtraInfoAsLong(output, START_TIME, 0L); - } - - public static long getEndTime(Output output) { - return getExtraInfoAsLong(output, END_TIME, 0L); - } - - public static long getDuration(long startTime, long endTime) { - if (startTime == 0) { - return 0; - } - if (endTime == 0) { - return System.currentTimeMillis() - startTime; - } else { - return endTime - startTime; - } - } - - public static String getExtraInfo(Output output, String key) { - return output.getExtra().get(key); - } - - public static long getExtraInfoAsLong(Output output, String key, long defaultValue) { - final String str = output.getExtra().get(key); - if (str != null) { - return Long.parseLong(str); - } else { - return defaultValue; - } - } - - protected final void addExtraInfo(String key, String value) { - executableManager.addJobInfo(getId(), key, value); - } - - public final void setStartTime(long time) { - addExtraInfo(START_TIME, time + ""); - } - - public final void setEndTime(long time) { - addExtraInfo(END_TIME, time + ""); - } - - public final long getStartTime() { - return getExtraInfoAsLong(START_TIME, 0L); - } - - public final long getEndTime() { - return getExtraInfoAsLong(END_TIME, 0L); - } - - public final long getDuration() { - return getDuration(getStartTime(), getEndTime()); - } - - /* - * discarded is triggered by JobService, the Scheduler is not awake of that - * - * */ - protected final boolean isDiscarded() { - final ExecutableState status = executableManager.getOutput(getId()).getState(); - return status == ExecutableState.DISCARDED; - } - - @Override - public String toString() { - return Objects.toStringHelper(this).add("id", getId()).add("name", getName()).add("state", getStatus()).toString(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java deleted file mode 100644 index f372f38..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/ChainedExecutable.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.List; - -/** - * Created by qianzhou on 12/15/14. - */ -public interface ChainedExecutable extends Executable { - - List<? extends AbstractExecutable> getTasks(); - - void addTask(AbstractExecutable executable); - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java deleted file mode 100644 index 61140b5..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.manager.ExecutableManager; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Created by qianzhou on 12/16/14. - */ -public class DefaultChainedExecutable extends AbstractExecutable implements ChainedExecutable { - - private final List<AbstractExecutable> subTasks = Lists.newArrayList(); - - protected final ExecutableManager jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - - public DefaultChainedExecutable() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - List<? extends Executable> executables = getTasks(); - final int size = executables.size(); - for (int i = 0; i < size; ++i) { - Executable subTask = executables.get(i); - if (subTask.isRunnable()) { - return subTask.execute(context); - } - } - return new ExecuteResult(ExecuteResult.State.SUCCEED, null); - } - - @Override - protected void onExecuteStart(ExecutableContext executableContext) { - Map<String, String> info = Maps.newHashMap(); - info.put(START_TIME, Long.toString(System.currentTimeMillis())); - final long startTime = getStartTime(); - if (startTime > 0) { - jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); - } else { - jobService.updateJobOutput(getId(), ExecutableState.RUNNING, info, null); - } - } - - @Override - protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { - if (isDiscarded()) { - setEndTime(System.currentTimeMillis()); - notifyUserStatusChange(ExecutableState.DISCARDED); - } else if (result.succeed()) { - List<? extends Executable> jobs = getTasks(); - boolean allSucceed = true; - boolean hasError = false; - for (Executable task : jobs) { - final ExecutableState status = task.getStatus(); - if (status == ExecutableState.ERROR) { - hasError = true; - } - if (status != ExecutableState.SUCCEED) { - allSucceed = false; - } - } - if (allSucceed) { - setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null); - notifyUserStatusChange(ExecutableState.SUCCEED); - } else if (hasError) { - setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null); - notifyUserStatusChange(ExecutableState.ERROR); - } else { - jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); - } - } else { - setEndTime(System.currentTimeMillis()); - jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, null); - notifyUserStatusChange(ExecutableState.ERROR); - } - } - - @Override - public List<AbstractExecutable> getTasks() { - return subTasks; - } - - public final AbstractExecutable getTaskByName(String name) { - for (AbstractExecutable task : subTasks) { - if (task.getName() != null && task.getName().equalsIgnoreCase(name)) { - return task; - } - } - return null; - } - - public void addTask(AbstractExecutable executable) { - executable.setId(getId() + "-" + String.format("%02d", subTasks.size())); - this.subTasks.add(executable); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java b/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java deleted file mode 100644 index a2ee08e..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/DefaultOutput.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; - -/** - * Created by qianzhou on 1/6/15. - */ -public class DefaultOutput implements Output { - - private ExecutableState state; - private Map<String, String> extra; - private String verboseMsg; - private long lastModified; - - @Override - public Map<String, String> getExtra() { - return extra; - } - - @Override - public String getVerboseMsg() { - return verboseMsg; - } - - @Override - public ExecutableState getState() { - return state; - } - - @Override - public long getLastModified() { - return lastModified; - } - - public void setState(ExecutableState state) { - this.state = state; - } - - public void setExtra(Map<String, String> extra) { - this.extra = extra; - } - - public void setVerboseMsg(String verboseMsg) { - this.verboseMsg = verboseMsg; - } - - public void setLastModified(long lastModified) { - this.lastModified = lastModified; - } - - @Override - public int hashCode() { - final int prime = 31; - int hashCode = state.hashCode(); - hashCode = hashCode * prime + extra.hashCode(); - hashCode = hashCode * prime + verboseMsg.hashCode(); - hashCode = hashCode * prime + Long.valueOf(lastModified).hashCode(); - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof DefaultOutput)) { - return false; - } - DefaultOutput another = ((DefaultOutput) obj); - if (this.state != another.state) { - return false; - } - if (!extra.equals(another.extra)) { - return false; - } - if (this.lastModified != another.lastModified) { - return false; - } - return StringUtils.equals(verboseMsg, another.verboseMsg); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Executable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/Executable.java b/job/src/main/java/org/apache/kylin/job/execution/Executable.java deleted file mode 100644 index 2131655..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/Executable.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.Map; - -import org.apache.kylin.job.exception.ExecuteException; - -/** - * Created by qianzhou on 12/15/14. - */ -public interface Executable { - - String getId(); - - String getName(); - - ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException; - - ExecutableState getStatus(); - - Output getOutput(); - - boolean isRunnable(); - - Map<String, String> getParams(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java deleted file mode 100644 index 2886893..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableContext.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import org.apache.kylin.common.KylinConfig; - -/** - * Created by qianzhou on 12/15/14. - */ -public interface ExecutableContext { - - Object getSchedulerContext(); - - KylinConfig getConfig(); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java b/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java deleted file mode 100644 index 95089d2..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/ExecutableState.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; - -import com.google.common.base.Supplier; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; - -/** - * Created by qianzhou on 12/15/14. - */ -public enum ExecutableState { - - READY, RUNNING, ERROR, STOPPED, DISCARDED, SUCCEED; - - private static Multimap<ExecutableState, ExecutableState> VALID_STATE_TRANSFER; - - static { - VALID_STATE_TRANSFER = Multimaps.newSetMultimap(Maps.<ExecutableState, Collection<ExecutableState>> newEnumMap(ExecutableState.class), new Supplier<Set<ExecutableState>>() { - @Override - public Set<ExecutableState> get() { - return new CopyOnWriteArraySet<ExecutableState>(); - } - }); - - //scheduler - VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.RUNNING); - VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.ERROR); - //user - VALID_STATE_TRANSFER.put(ExecutableState.READY, ExecutableState.DISCARDED); - - //job - VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.READY); - //job - VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.SUCCEED); - //user - VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.DISCARDED); - //scheduler,job - VALID_STATE_TRANSFER.put(ExecutableState.RUNNING, ExecutableState.ERROR); - - VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.DISCARDED); - VALID_STATE_TRANSFER.put(ExecutableState.STOPPED, ExecutableState.READY); - - VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.DISCARDED); - VALID_STATE_TRANSFER.put(ExecutableState.ERROR, ExecutableState.READY); - } - - public boolean isFinalState() { - return this == SUCCEED || this == DISCARDED; - } - - public static boolean isValidStateTransfer(ExecutableState from, ExecutableState to) { - return VALID_STATE_TRANSFER.containsEntry(from, to); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java deleted file mode 100644 index 27a2407..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import com.google.common.base.Preconditions; - -/** - * Created by qianzhou on 12/15/14. - */ -public final class ExecuteResult { - - public static enum State { - SUCCEED, FAILED, ERROR, DISCARDED, STOPPED - } - - private final State state; - private final String output; - - public ExecuteResult(State state) { - this(state, ""); - } - - public ExecuteResult(State state, String output) { - Preconditions.checkArgument(state != null, "state cannot be null"); - this.state = state; - this.output = output; - } - - public State state() { - return state; - } - - public boolean succeed() { - return state == State.SUCCEED; - } - - public String output() { - return output; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java b/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java deleted file mode 100644 index 6515343..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/Idempotent.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import org.apache.kylin.job.exception.ExecuteException; - -/** - * Created by qianzhou on 12/15/14. - */ -public interface Idempotent { - - void cleanup() throws ExecuteException; -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/execution/Output.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/execution/Output.java b/job/src/main/java/org/apache/kylin/job/execution/Output.java deleted file mode 100644 index f77c71a..0000000 --- a/job/src/main/java/org/apache/kylin/job/execution/Output.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.execution; - -import java.util.Map; - -/** - * Created by qianzhou on 1/6/15. - */ -public interface Output { - - Map<String, String> getExtra(); - - String getVerboseMsg(); - - ExecutableState getState(); - - long getLastModified(); -}
