http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java deleted file mode 100644 index 2813596..0000000 --- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cmd; - -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xjiang - * - */ -public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput { - - protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class); - - protected StringBuilder output; - protected int exitCode; - protected JobStepStatusEnum status; - - public ShellCmdOutput() { - init(); - } - - private void init() { - output = new StringBuilder(); - exitCode = -1; - status = JobStepStatusEnum.NEW; - } - - @Override - public JobStepStatusEnum getStatus() { - return status; - } - - @Override - public void setStatus(JobStepStatusEnum s) { - this.status = s; - } - - @Override - public String getOutput() { - return output.toString(); - } - - @Override - public void appendOutput(String message) { - output.append(message).append(System.getProperty("line.separator")); - log.debug(message); - } - - @Override - public int getExitCode() { - return exitCode; - } - - @Override - public void setExitCode(int code) { - exitCode = code; - } - - @Override - public void reset() { - init(); - } - -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java deleted file mode 100644 index 873607c..0000000 --- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.common; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.mapreduce.Counters; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskCounter; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author xduo - * - */ -public class HadoopCmdOutput { - - protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class); - - private final StringBuilder output; - private final Job job; - - public HadoopCmdOutput(Job job, StringBuilder output) { - super(); - this.job = job; - this.output = output; - } - - public String getMrJobId() { - return getInfo().get(ExecutableConstants.MR_JOB_ID); - } - - public Map<String, String> getInfo() { - if (job != null) { - Map<String, String> status = new HashMap<String, String>(); - if (null != job.getJobID()) { - status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString()); - } - if (null != job.getTrackingURL()) { - status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString()); - } - return status; - } else { - return Collections.emptyMap(); - } - } - - private String mapInputRecords; - private String hdfsBytesWritten; - private String hdfsBytesRead; - - public String getMapInputRecords() { - return mapInputRecords; - } - - public String getHdfsBytesWritten() { - return hdfsBytesWritten; - } - - public String getHdfsBytesRead() { - return hdfsBytesRead; - } - - public void updateJobCounter() { - try { - Counters counters = job.getCounters(); - if (counters == null) { - String errorMsg = "no counters for job " + getMrJobId(); - log.warn(errorMsg); - output.append(errorMsg); - return; - } - this.output.append(counters.toString()).append("\n"); - log.debug(counters.toString()); - - mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue()); - hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue()); - hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue()); - } catch (Exception e) { - log.error(e.getLocalizedMessage(), e); - output.append(e.getLocalizedMessage()); - - mapInputRecords = "0"; - hdfsBytesWritten = "0"; - hdfsBytesRead = "0"; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java deleted file mode 100644 index dc412ce..0000000 --- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.common; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Constructor; - -import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; - -import com.google.common.base.Preconditions; - -/** - * Created by qianzhou on 12/26/14. - */ -public class HadoopShellExecutable extends AbstractExecutable { - - private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS"; - private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS"; - - public HadoopShellExecutable() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final String mapReduceJobClass = getJobClass(); - String params = getJobParams(); - Preconditions.checkNotNull(mapReduceJobClass); - Preconditions.checkNotNull(params); - try { - final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); - final AbstractHadoopJob job = constructor.newInstance(); - String[] args = params.trim().split("\\s+"); - logger.info("parameters of the HadoopShellExecutable:"); - logger.info(params); - int result; - StringBuilder log = new StringBuilder(); - try { - result = ToolRunner.run(job, args); - } catch (Exception ex) { - logger.error("error execute " + this.toString(), ex); - StringWriter stringWriter = new StringWriter(); - ex.printStackTrace(new PrintWriter(stringWriter)); - log.append(stringWriter.toString()).append("\n"); - result = 2; - } - log.append("result code:").append(result); - return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString()); - } catch (ReflectiveOperationException e) { - logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } catch (Exception e) { - logger.error("error execute " + this.toString(), e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) { - setParam(KEY_MR_JOB, clazzName.getName()); - } - - public String getJobClass() throws ExecuteException { - return getParam(KEY_MR_JOB); - } - - public void setJobParams(String param) { - setParam(KEY_PARAMS, param); - } - - public String getJobParams() { - return getParam(KEY_PARAMS); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java deleted file mode 100644 index 75c461b..0000000 --- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.common; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.util.HiveClient; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.datanucleus.store.types.backed.HashMap; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.collect.Lists; - -/** - * Created by qianzhou on 1/15/15. - */ -public class HqlExecutable extends AbstractExecutable { - - private static final String HQL = "hql"; - private static final String HIVE_CONFIG = "hive-config"; - - public HqlExecutable() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - try { - Map<String, String> configMap = getConfiguration(); - HiveClient hiveClient = new HiveClient(configMap); - - for (String hql : getHqls()) { - hiveClient.executeHQL(hql); - } - return new ExecuteResult(ExecuteResult.State.SUCCEED); - } catch (Exception e) { - logger.error("error run hive query:" + getHqls(), e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - public void setConfiguration(Map<String, String> configMap) { - if (configMap != null) { - String configStr = ""; - try { - configStr = JsonUtil.writeValueAsString(configMap); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - setParam(HIVE_CONFIG, configStr); - } - } - - @SuppressWarnings("unchecked") - private Map<String, String> getConfiguration() { - String configStr = getParam(HIVE_CONFIG); - Map<String, String> result = null; - if (configStr != null) { - try { - result = JsonUtil.readValue(configStr, HashMap.class); - } catch (Exception e) { - e.printStackTrace(); - } - } - - return result; - } - - public void setHqls(List<String> hqls) { - setParam(HQL, StringUtils.join(hqls, ";")); - } - - private List<String> getHqls() { - final String hqls = getParam(HQL); - if (hqls != null) { - return Lists.newArrayList(StringUtils.split(hqls, ";")); - } else { - return Collections.emptyList(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java deleted file mode 100644 index cb6e76c..0000000 --- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.common; - -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.lang.reflect.Constructor; -import java.util.Collections; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.RMHAUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.constant.JobStepStatusEnum; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.hadoop.AbstractHadoopJob; -import org.apache.kylin.job.tools.HadoopStatusChecker; - -import com.google.common.base.Preconditions; - -/** - * Created by qianzhou on 12/25/14. - */ -public class MapReduceExecutable extends AbstractExecutable { - - public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime"; - private static final String KEY_MR_JOB = "MR_JOB_CLASS"; - private static final String KEY_PARAMS = "MR_JOB_PARAMS"; - - public MapReduceExecutable() { - super(); - } - - @Override - protected void onExecuteStart(ExecutableContext executableContext) { - final Output output = executableManager.getOutput(getId()); - if (output.getExtra().containsKey(START_TIME)) { - final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID); - if (mrJobId == null) { - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); - return; - } - try { - Configuration conf = HadoopUtil.getCurrentConfiguration(); - Job job = new Cluster(conf).getJob(JobID.forName(mrJobId)); - if (job.getJobState() == JobStatus.State.FAILED) { - //remove previous mr job info - super.onExecuteStart(executableContext); - } else { - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); - } - } catch (IOException e) { - logger.warn("error get hadoop status"); - super.onExecuteStart(executableContext); - } catch (InterruptedException e) { - logger.warn("error get hadoop status"); - super.onExecuteStart(executableContext); - } - } else { - super.onExecuteStart(executableContext); - } - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - final String mapReduceJobClass = getMapReduceJobClass(); - String params = getMapReduceParams(); - Preconditions.checkNotNull(mapReduceJobClass); - Preconditions.checkNotNull(params); - try { - Job job; - final Map<String, String> extra = executableManager.getOutput(getId()).getExtra(); - if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { - Configuration conf = HadoopUtil.getCurrentConfiguration(); - job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); - logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed")); - } else { - final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor(); - final AbstractHadoopJob hadoopJob = constructor.newInstance(); - hadoopJob.setConf(HadoopUtil.getCurrentConfiguration()); - hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away - logger.info("parameters of the MapReduceExecutable:"); - logger.info(params); - String[] args = params.trim().split("\\s+"); - try { - //for async mr job, ToolRunner just return 0; - ToolRunner.run(hadoopJob, args); - } catch (Exception ex) { - StringBuilder log = new StringBuilder(); - logger.error("error execute " + this.toString(), ex); - StringWriter stringWriter = new StringWriter(); - ex.printStackTrace(new PrintWriter(stringWriter)); - log.append(stringWriter.toString()).append("\n"); - log.append("result code:").append(2); - return new ExecuteResult(ExecuteResult.State.ERROR, log.toString()); - } - job = hadoopJob.getJob(); - } - final StringBuilder output = new StringBuilder(); - final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); - - final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); - if (restStatusCheckUrl == null) { - logger.error("restStatusCheckUrl is null"); - return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); - } - String mrJobId = hadoopCmdOutput.getMrJobId(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig()); - JobStepStatusEnum status = JobStepStatusEnum.NEW; - while (!isDiscarded()) { - JobStepStatusEnum newStatus = statusChecker.checkStatus(); - if (status == JobStepStatusEnum.KILLED) { - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String> emptyMap(), "killed by admin"); - return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin"); - } - if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { - final long waitTime = System.currentTimeMillis() - getStartTime(); - setMapReduceWaitTime(waitTime); - } - status = newStatus; - executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo()); - if (status.isComplete()) { - hadoopCmdOutput.updateJobCounter(); - final Map<String, String> info = hadoopCmdOutput.getInfo(); - info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords()); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead()); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten()); - executableManager.addJobInfo(getId(), info); - - if (status == JobStepStatusEnum.FINISHED) { - return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); - } else { - return new ExecuteResult(ExecuteResult.State.FAILED, output.toString()); - } - } - Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000); - } - - // try to kill running map-reduce job to release resources. - if (job != null) { - try { - job.killJob(); - } catch (Exception e) { - logger.warn("failed to kill hadoop job: " + job.getJobID(), e); - } - } - return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString()); - - } catch (ReflectiveOperationException e) { - logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } catch (Exception e) { - logger.error("error execute " + this.toString(), e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - private String getRestStatusCheckUrl(Job job, KylinConfig config) { - final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl(); - if (yarnStatusCheckUrl != null) { - return yarnStatusCheckUrl; - } else { - logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration"); - } - String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, job.getConfiguration()); - if (HAUtil.isHAEnabled(job.getConfiguration())) { - YarnConfiguration conf = new YarnConfiguration(job.getConfiguration()); - String active = RMHAUtils.findActiveRMHAId(conf); - rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); - } - if (StringUtils.isEmpty(rmWebHost)) { - return null; - } - if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) { - //do nothing - } else { - rmWebHost = "http://" + rmWebHost; - } - logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost); - return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true"; - } - - public long getMapReduceWaitTime() { - return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L); - } - - public void setMapReduceWaitTime(long t) { - addExtraInfo(MAP_REDUCE_WAIT_TIME, t + ""); - } - - public String getMapReduceJobClass() throws ExecuteException { - return getParam(KEY_MR_JOB); - } - - public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) { - setParam(KEY_MR_JOB, clazzName.getName()); - } - - public String getMapReduceParams() { - return getParam(KEY_PARAMS); - } - - public void setMapReduceParams(String param) { - setParam(KEY_PARAMS, param); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java deleted file mode 100644 index 19aa915..0000000 --- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.common; - -import java.io.IOException; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.kylin.common.util.Logger; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; - -import com.google.common.collect.Maps; - -/** - * Created by qianzhou on 12/26/14. - */ -public class ShellExecutable extends AbstractExecutable { - - private static final String CMD = "cmd"; - - public ShellExecutable() { - super(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - try { - logger.info("executing:" + getCmd()); - final ShellExecutableLogger logger = new ShellExecutableLogger(); - final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger); - executableManager.addJobInfo(getId(), logger.getInfo()); - return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond()); - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - } - - public void setCmd(String cmd) { - setParam(CMD, cmd); - } - - public String getCmd() { - return getParam(CMD); - } - - private static class ShellExecutableLogger implements Logger { - - private final Map<String, String> info = Maps.newHashMap(); - - private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); - private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); - private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)"); - private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)"); - private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)"); - private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write"); - - // hive - private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)"); - private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS"); - - @Override - public void log(String message) { - Matcher matcher = PATTERN_APP_ID.matcher(message); - if (matcher.find()) { - String appId = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, appId); - } - - matcher = PATTERN_APP_URL.matcher(message); - if (matcher.find()) { - String appTrackingUrl = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl); - } - - matcher = PATTERN_JOB_ID.matcher(message); - if (matcher.find()) { - String mrJobID = matcher.group(1); - info.put(ExecutableConstants.MR_JOB_ID, mrJobID); - } - - matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - String hdfsWritten = matcher.group(1); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - - matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message); - if (matcher.find()) { - String sourceCount = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount); - } - - matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message); - if (matcher.find()) { - String sourceSize = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize); - } - - // hive - matcher = PATTERN_HIVE_APP_ID_URL.matcher(message); - if (matcher.find()) { - String jobId = matcher.group(1); - String trackingUrl = matcher.group(2); - info.put(ExecutableConstants.MR_JOB_ID, jobId); - info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); - } - - matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - // String hdfsRead = matcher.group(1); - String hdfsWritten = matcher.group(2); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - } - - Map<String, String> getInfo() { - return info; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java deleted file mode 100644 index 38f4a87..0000000 --- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.constant; - -/** - * @author George Song (ysong1) - * - */ -public interface BatchConstants { - - char INTERMEDIATE_TABLE_ROW_DELIMITER = 127; - - String CFG_CUBE_NAME = "cube.name"; - String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; - String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level"; - - String CFG_II_NAME = "ii.name"; - String CFG_II_SEGMENT_NAME = "ii.segment.name"; - - String OUTPUT_PATH = "output.path"; - - String TABLE_NAME = "table.name"; - String TABLE_COLUMNS = "table.columns"; - - String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter"; - - String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder"; - - String MAPPER_SAMPLE_NUMBER = "mapper.sample.number"; - String REGION_NUMBER = "region.number"; - String REGION_NUMBER_MIN = "region.number.min"; - String REGION_NUMBER_MAX = "region.number.max"; - String REGION_SPLIT_SIZE = "region.split.size"; - String CUBE_CAPACITY = "cube.capacity"; - - String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/"; - String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/"; - - int COUNTER_MAX = 100000; - int ERROR_RECORD_THRESHOLD = 100; -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java deleted file mode 100644 index 3d98b0b..0000000 --- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.constant; - -/** - * Created by qianzhou on 1/5/15. - */ -public final class ExecutableConstants { - - private ExecutableConstants() { - } - - public static final String YARN_APP_ID = "yarn_application_id"; - - public static final String YARN_APP_URL = "yarn_application_tracking_url"; - public static final String MR_JOB_ID = "mr_job_id"; - public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written"; - public static final String SOURCE_RECORDS_COUNT = "source_records_count"; - public static final String SOURCE_RECORDS_SIZE = "source_records_size"; - public static final String GLOBAL_LISTENER_NAME = "ChainListener"; - - public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60; - - public static final String CUBE_JOB_GROUP_NAME = "cube_job_group"; - - public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group"; - public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; - - public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; - public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; - public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; - public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data"; - public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits"; - public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable"; - public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile"; - public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table"; - public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary"; - public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data"; - public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; - public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection"; - - public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; - public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile"; - - public static final String PROP_ENGINE_CONTEXT = "jobengineConfig"; - public static final String PROP_JOB_FLOW = "jobFlow"; - public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid"; - public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID"; - public static final String PROP_COMMAND = "command"; - // public static final String PROP_STORAGE_LOCATION = - // "storageLocationIdentifier"; - public static final String PROP_JOB_ASYNC = "jobAsync"; - public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor"; - public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput"; - public static final String PROP_JOB_KILLED = "jobKilled"; - public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows"; - - public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>"; - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java deleted file mode 100644 index 9c8f083..0000000 --- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.constant; - -public enum JobStatusEnum { - - NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16); - - private final int code; - - private JobStatusEnum(int statusCode) { - this.code = statusCode; - } - - public static JobStatusEnum getByCode(int statusCode) { - for (JobStatusEnum status : values()) { - if (status.getCode() == statusCode) { - return status; - } - } - - return null; - } - - public int getCode() { - return this.code; - } - - public boolean isComplete() { - return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java deleted file mode 100644 index 0e4c18e..0000000 --- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.constant; - -/** - * @author xduo, ysong1 - * - */ -public enum JobStepCmdTypeEnum { - SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java deleted file mode 100644 index fbcfd97..0000000 --- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.constant; - -public enum JobStepStatusEnum { - NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64); - - private final int code; - - private JobStepStatusEnum(int statusCode) { - this.code = statusCode; - } - - public static JobStepStatusEnum getByCode(int statusCode) { - for (JobStepStatusEnum status : values()) { - if (status.getCode() == statusCode) { - return status; - } - } - - return null; - } - - public int getCode() { - return this.code; - } - - public boolean isComplete() { - return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode(); - } - - public boolean isRunable() { - return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode(); - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java deleted file mode 100644 index 182196c..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Date; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.kylin.common.util.StringUtil; -import org.apache.kylin.job.common.MapReduceExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.job.execution.Output; - -/** - * Created by qianzhou on 12/25/14. - */ -public class CubingJob extends DefaultChainedExecutable { - - public CubingJob() { - super(); - } - - private static final String CUBE_INSTANCE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime"; - - public void setCubeName(String name) { - setParam(CUBE_INSTANCE_NAME, name); - } - - public String getCubeName() { - return getParam(CUBE_INSTANCE_NAME); - } - - void setSegmentIds(List<String> segmentIds) { - setParam(SEGMENT_ID, StringUtils.join(segmentIds, ",")); - } - - void setSegmentId(String segmentId) { - setParam(SEGMENT_ID, segmentId); - } - - public String getSegmentIds() { - return getParam(SEGMENT_ID); - } - - @Override - protected Pair<String, String> formatNotifications(ExecutableState state) { - final Output output = jobService.getOutput(getId()); - String logMsg = ""; - switch (output.getState()) { - case ERROR: - logMsg = output.getVerboseMsg(); - break; - case DISCARDED: - break; - case SUCCEED: - break; - default: - return null; - } - String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE; - content = content.replaceAll("\\$\\{job_name\\}", getName()); - content = content.replaceAll("\\$\\{result\\}", state.toString()); - content = content.replaceAll("\\$\\{cube_name\\}", getCubeName()); - content = content.replaceAll("\\$\\{source_records_count\\}", StringUtil.noBlank(getSourceRecordCount(), "0")); - content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString()); - content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins"); - content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins"); - content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString()); - content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter")); - content = content.replaceAll("\\$\\{error_log\\}", StringUtil.noBlank(logMsg, "no error log")); - - try { - InetAddress inetAddress = InetAddress.getLocalHost(); - content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName()); - } catch (UnknownHostException e) { - logger.warn(e.getLocalizedMessage(), e); - } - - String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName(); - return Pair.of(title, content); - } - - @Override - protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { - long time = 0L; - for (AbstractExecutable task : getTasks()) { - final ExecutableState status = task.getStatus(); - if (status != ExecutableState.SUCCEED) { - break; - } - if (task instanceof MapReduceExecutable) { - time += ((MapReduceExecutable) task).getMapReduceWaitTime(); - } - } - setMapReduceWaitTime(time); - super.onExecuteFinished(result, executableContext); - } - - public long getMapReduceWaitTime() { - return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L); - } - - public void setMapReduceWaitTime(long t) { - addExtraInfo(MAP_REDUCE_WAIT_TIME, t + ""); - } - - - public final String getSourceRecordCount() { - for (AbstractExecutable task : getTasks()) { - if (ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID.equals(task.getName())) { - return getExtraInfo(task.getOutput(), ExecutableConstants.SOURCE_RECORDS_COUNT); - } - } - return "N/A"; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java deleted file mode 100644 index 80c030f..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ /dev/null @@ -1,476 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.TimeZone; - -import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.job.AbstractJobBuilder; -import org.apache.kylin.job.common.HadoopShellExecutable; -import org.apache.kylin.job.common.MapReduceExecutable; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.engine.JobEngineConfig; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.hadoop.cube.BaseCuboidJob; -import org.apache.kylin.job.hadoop.cube.CubeHFileJob; -import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob; -import org.apache.kylin.job.hadoop.cube.MergeCuboidJob; -import org.apache.kylin.job.hadoop.cube.NDCuboidJob; -import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob; -import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob; -import org.apache.kylin.job.hadoop.hbase.BulkLoadJob; -import org.apache.kylin.job.hadoop.hbase.CreateHTableJob; -import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * Created by qianzhou on 12/25/14. - */ -public final class CubingJobBuilder extends AbstractJobBuilder { - - public CubingJobBuilder(JobEngineConfig engineConfig) { - super(engineConfig); - } - - public CubingJob buildJob(CubeSegment seg) { - checkPreconditions(seg); - - final CubingJob result = initialJob(seg, "BUILD"); - final String jobId = result.getId(); - final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/"; - final List<String> toDeletePaths = Lists.newArrayList(); - - // cubing - Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePaths); - String intermediateHiveTableStepId = twoSteps.getFirst().getId(); - String baseCuboidStepId = twoSteps.getSecond().getId(); - - // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result); - - // update cube info - result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId)); - - final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); - final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId); - result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePaths)); - - return result; - } - - public CubingJob buildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) { - checkPreconditions(appendSegment, mergeSegment); - - CubingJob result = initialJob(mergeSegment, "BUILD"); - result.setSegmentIds(Lists.newArrayList(new String[] { appendSegment.getUuid(), mergeSegment.getUuid() })); - final String jobId = result.getId(); - final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/"; - final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/"; - List<String> mergingSegmentIds = Lists.newArrayList(); - List<String> mergingCuboidPaths = Lists.newArrayList(); - List<String> mergingHTables = Lists.newArrayList(); - final List<String> toDeletePaths = Lists.newArrayList(); - - // cubing the incremental segment - Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePaths); - final String intermediateHiveTableStepId = twoSteps.getFirst().getId(); - final String baseCuboidStepId = twoSteps.getSecond().getId(); - - // update the append segment info - result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, intermediateHiveTableStepId, baseCuboidStepId, null, jobId)); - - List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment); - Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); - - for (CubeSegment merging : mergingSegments) { - mergingSegmentIds.add(merging.getUuid()); - mergingHTables.add(merging.getStorageLocationIdentifier()); - if (merging.equals(appendSegment)) { - mergingCuboidPaths.add(appendRootPath + "*"); - } else { - mergingCuboidPaths.add(getPathToMerge(merging)); - } - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); - } - - // merge cuboid - addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result); - - // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result); - - // update cube info - result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths)); - - return result; - } - - public CubingJob mergeJob(CubeSegment seg) { - checkPreconditions(seg); - - CubingJob result = initialJob(seg, "MERGE"); - final String jobId = result.getId(); - final String mergedCuboidPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/"; - - List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg); - Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge"); - List<String> mergingSegmentIds = Lists.newArrayList(); - List<String> mergingCuboidPaths = Lists.newArrayList(); - List<String> mergingHTables = Lists.newArrayList(); - final List<String> toDeletePaths = Lists.newArrayList(); - - for (CubeSegment merging : mergingSegments) { - mergingSegmentIds.add(merging.getUuid()); - mergingCuboidPaths.add(getPathToMerge(merging)); - mergingHTables.add(merging.getStorageLocationIdentifier()); - toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID())); - } - - // merge cuboid - addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result); - - // convert htable - AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result); - - // update cube info - result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId)); - result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths)); - return result; - } - - void addMergeSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingCuboidPaths, String mergedCuboidPath, CubingJob result) { - - result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds)); - - String formattedPath = StringUtils.join(mergingCuboidPaths, ","); - result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath)); - } - - Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List<String> toDeletePaths) { - final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels(); - final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length; - - final String jobId = result.getId(); - final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); - final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); - final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId); - final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId); - final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); - - final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); - result.addTask(intermediateHiveTableStep); - - result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId)); - - result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); - - // base cuboid step - final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath); - result.addTask(baseCuboidStep); - - // n dim cuboid steps - for (int i = 1; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnsCount - i; - result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount)); - } - - toDeletePaths.add(intermediateHiveTableLocation); - toDeletePaths.add(factDistinctColumnsPath); - - return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep); - } - - AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { - final String jobId = result.getId(); - final String cuboidPath = cuboidRootPath + "*"; - - result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId)); - // create htable step - result.addTask(createCreateHTableStep(seg, jobId)); - // generate hfiles step - final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId); - result.addTask(convertCuboidToHfileStep); - // bulk load step - result.addTask(createBulkLoadStep(seg, jobId)); - - return convertCuboidToHfileStep; - } - - private CubingJob initialJob(CubeSegment seg, String type) { - CubingJob result = new CubingJob(); - SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); - format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone())); - result.setCubeName(seg.getCubeInstance().getName()); - result.setSegmentId(seg.getUuid()); - result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis()))); - result.setSubmitter(submitter); - result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList()); - return result; - } - - private void checkPreconditions(CubeSegment... segments) { - for (CubeSegment seg : segments) { - Preconditions.checkNotNull(seg, "segment cannot be null"); - } - Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null"); - } - - private void appendMapReduceParameters(StringBuilder builder, CubeSegment seg) { - try { - String jobConf = engineConfig.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity()); - if (jobConf != null && jobConf.length() > 0) { - builder.append(" -conf ").append(jobConf); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { - String[] paths = new String[groupRowkeyColumnsCount + 1]; - for (int i = 0; i <= groupRowkeyColumnsCount; i++) { - int dimNum = totalRowkeyColumnCount - i; - if (dimNum == totalRowkeyColumnCount) { - paths[i] = cuboidRootPath + "base_cuboid"; - } else { - paths[i] = cuboidRootPath + dimNum + "d_cuboid"; - } - } - return paths; - } - - private String getPathToMerge(CubeSegment seg) { - return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*"; - } - - private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) { - return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); - } - - private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { - return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns"; - } - - private String getHFilePath(CubeSegment seg, String jobId) { - return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); - } - - private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) { - MapReduceExecutable result = new MapReduceExecutable(); - result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); - result.setMapReduceJobClass(FactDistinctColumnsJob.class); - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, seg); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId)); - appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step"); - appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); - - result.setMapReduceParams(cmd.toString()); - return result; - } - - private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) { - // base cuboid job - HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); - buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", factDistinctColumnsPath); - - buildDictionaryStep.setJobParams(cmd.toString()); - buildDictionaryStep.setJobClass(CreateDictionaryJob.class); - return buildDictionaryStep; - } - - private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) { - // base cuboid job - MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); - - StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, seg); - - baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); - - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); - appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "level", "0"); - - baseCuboidStep.setMapReduceParams(cmd.toString()); - baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); - return baseCuboidStep; - } - - private MapReduceExecutable createNDimensionCuboidStep(CubeSegment seg, String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) { - // ND cuboid job - MapReduceExecutable ndCuboidStep = new MapReduceExecutable(); - - ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension"); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, seg); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); - appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step"); - appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); - - ndCuboidStep.setMapReduceParams(cmd.toString()); - ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); - return ndCuboidStep; - } - - private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId) { - MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); - rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, seg); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, jobId)); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step"); - - rowkeyDistributionStep.setMapReduceParams(cmd.toString()); - rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class); - return rowkeyDistributionStep; - } - - private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) { - HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); - createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000"); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - - createHtableStep.setJobParams(cmd.toString()); - createHtableStep.setJobClass(CreateHTableJob.class); - - return createHtableStep; - } - - private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId) { - MapReduceExecutable createHFilesStep = new MapReduceExecutable(); - createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, seg); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step"); - - createHFilesStep.setMapReduceParams(cmd.toString()); - createHFilesStep.setMapReduceJobClass(CubeHFileJob.class); - - return createHFilesStep; - } - - private HadoopShellExecutable createBulkLoadStep(CubeSegment seg, String jobId) { - HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable(); - bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE); - - StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId)); - appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - - bulkLoadStep.setJobParams(cmd.toString()); - bulkLoadStep.setJobClass(BulkLoadJob.class); - - return bulkLoadStep; - - } - - private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) { - final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep(); - updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); - updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName()); - updateCubeInfoStep.setSegmentId(seg.getUuid()); - updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId); - updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId); - updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId); - updateCubeInfoStep.setCubingJobId(jobId); - return updateCubeInfoStep; - } - - private MergeDictionaryStep createMergeDictionaryStep(CubeSegment seg, List<String> mergingSegmentIds) { - MergeDictionaryStep result = new MergeDictionaryStep(); - result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY); - result.setCubeName(seg.getCubeInstance().getName()); - result.setSegmentId(seg.getUuid()); - result.setMergingSegmentIds(mergingSegmentIds); - return result; - } - - private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) { - MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable(); - mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID); - StringBuilder cmd = new StringBuilder(); - - appendMapReduceParameters(cmd, seg); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", outputPath); - appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); - - mergeCuboidDataStep.setMapReduceParams(cmd.toString()); - mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); - return mergeCuboidDataStep; - } - - private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) { - UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep(); - result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); - result.setCubeName(seg.getCubeInstance().getName()); - result.setSegmentId(seg.getUuid()); - result.setMergingSegmentIds(mergingSegmentIds); - result.setConvertToHFileStepId(convertToHFileStepId); - result.setCubingJobId(jobId); - return result; - } - - private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) { - GarbageCollectionStep result = new GarbageCollectionStep(); - result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); - result.setOldHTables(oldHtables); - result.setOldHiveTable(hiveIntermediateTable); - result.setOldHdfsPaths(oldHdsfPaths); - return result; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java deleted file mode 100644 index f2f1fc0..0000000 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.cube; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.job.cmd.ShellCmdOutput; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.realization.IRealizationConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge) - */ -public class GarbageCollectionStep extends AbstractExecutable { - - private static final String OLD_HTABLES = "oldHTables"; - - private static final String OLD_HIVE_TABLE = "oldHiveTable"; - - private static final String OLD_HDFS_PATHS = "oldHdfsPaths"; - - private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class); - - private StringBuffer output; - - public GarbageCollectionStep() { - super(); - output = new StringBuffer(); - } - - @Override - protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { - - try { - dropHBaseTable(context); - dropHiveTable(context); - dropHdfsPath(context); - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - output.append("\n").append(e.getLocalizedMessage()); - return new ExecuteResult(ExecuteResult.State.ERROR, output.toString()); - } - - return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); - } - - private void dropHiveTable(ExecutableContext context) throws IOException { - final String hiveTable = this.getOldHiveTable(); - if (StringUtils.isNotEmpty(hiveTable)) { - final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; - final String dropHiveCMD = "hive -e \"" + dropSQL + "\""; - logger.info("executing: " + dropHiveCMD); - ShellCmdOutput shellCmdOutput = new ShellCmdOutput(); - context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput); - logger.debug("Dropped Hive table " + hiveTable + " \n"); - output.append(shellCmdOutput.getOutput() + " \n"); - output.append("Dropped Hive table " + hiveTable + " \n"); - } - - } - - private void dropHBaseTable(ExecutableContext context) throws IOException { - List<String> oldTables = getOldHTables(); - if (oldTables != null && oldTables.size() > 0) { - String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; - try { - admin = new HBaseAdmin(conf); - for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); - String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); - if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); - } - admin.deleteTable(table); - logger.debug("Dropped HBase table " + table); - output.append("Dropped HBase table " + table + " \n"); - } else { - logger.debug("Skipped HBase table " + table); - output.append("Skipped HBase table " + table + " \n"); - } - } - } - - } finally { - if (admin != null) - try { - admin.close(); - } catch (IOException e) { - logger.error(e.getLocalizedMessage()); - } - } - } - } - - private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException { - if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { - logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri()); - output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n"); - for (String path : oldHdfsPaths) { - if (path.endsWith("*")) - path = path.substring(0, path.length() - 1); - - Path oldPath = new Path(path); - if (fileSystem.exists(oldPath)) { - fileSystem.delete(oldPath, true); - logger.debug("Dropped HDFS path: " + path); - output.append("Dropped HDFS path \"" + path + "\" \n"); - } else { - logger.debug("HDFS path not exists: " + path); - output.append("HDFS path not exists: \"" + path + "\" \n"); - } - } - } - } - - private void dropHdfsPath(ExecutableContext context) throws IOException { - List<String> oldHdfsPaths = this.getOldHdfsPaths(); - FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration()); - dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); - - if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) { - fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()); - dropHdfsPathOnCluster(oldHdfsPaths, fileSystem); - } - - } - - public void setOldHTables(List<String> tables) { - setArrayParam(OLD_HTABLES, tables); - } - - private List<String> getOldHTables() { - return getArrayParam(OLD_HTABLES); - } - - public void setOldHdfsPaths(List<String> paths) { - setArrayParam(OLD_HDFS_PATHS, paths); - } - - private List<String> getOldHdfsPaths() { - return getArrayParam(OLD_HDFS_PATHS); - } - - private void setArrayParam(String paramKey, List<String> paramValues) { - setParam(paramKey, StringUtils.join(paramValues, ",")); - } - - private List<String> getArrayParam(String paramKey) { - final String ids = getParam(paramKey); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - - public void setOldHiveTable(String hiveTable) { - setParam(OLD_HIVE_TABLE, hiveTable); - } - - private String getOldHiveTable() { - return getParam(OLD_HIVE_TABLE); - } - -}
