clean code
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5e13bba0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5e13bba0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5e13bba0 Branch: refs/heads/1.5.x-HBase1.1.3 Commit: 5e13bba0822cafeb02817e5e59c08a3a4b5020c9 Parents: 4662ada Author: Hongbin Ma <[email protected]> Authored: Fri Mar 11 14:03:45 2016 +0800 Committer: Hongbin Ma <[email protected]> Committed: Fri Mar 11 15:00:35 2016 +0800 ---------------------------------------------------------------------- .../kylin/common/persistence/ResourceStore.java | 26 ++----- .../java/org/apache/kylin/cube/CubeSegment.java | 31 +++++--- .../org/apache/kylin/cube/cuboid/Cuboid.java | 26 +++---- .../kylin/job/constant/ExecutableConstants.java | 20 ----- .../kylin/job/execution/AbstractExecutable.java | 27 +++---- .../kylin/engine/mr/BatchCubingJobBuilder.java | 25 ++++--- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 48 ++++++------ .../kylin/engine/mr/BatchMergeJobBuilder.java | 13 ++-- .../kylin/engine/mr/BatchMergeJobBuilder2.java | 22 +++--- .../org/apache/kylin/engine/mr/CubingJob.java | 56 ++++---------- .../kylin/engine/mr/JobBuilderSupport.java | 63 +++++++++------- .../engine/mr/common/AbstractHadoopJob.java | 33 ++++----- .../kylin/engine/mr/common/BatchConstants.java | 59 ++++++++++----- .../kylin/engine/mr/common/CubeStatsReader.java | 56 +++++++------- .../kylin/engine/mr/common/CuboidStatsUtil.java | 2 +- .../mr/invertedindex/BatchIIJobBuilder.java | 21 +++--- .../mr/invertedindex/InvertedIndexJob.java | 7 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 2 +- .../engine/mr/steps/CubingExecutableUtil.java | 78 ++++++++++++++++++++ .../kylin/engine/mr/steps/CuboidReducer.java | 2 +- .../engine/mr/steps/FactDistinctColumnsJob.java | 2 +- .../mr/steps/FactDistinctColumnsMapperBase.java | 2 +- .../mr/steps/FactDistinctColumnsReducer.java | 4 +- .../engine/mr/steps/HiveToBaseCuboidMapper.java | 2 +- .../engine/mr/steps/InMemCuboidMapper.java | 2 +- .../engine/mr/steps/InMemCuboidReducer.java | 2 +- .../engine/mr/steps/MergeDictionaryStep.java | 47 +----------- .../engine/mr/steps/MergeStatisticsStep.java | 66 ++--------------- .../kylin/engine/mr/steps/NDCuboidMapper.java | 4 +- .../engine/mr/steps/SaveStatisticsStep.java | 49 ++---------- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 38 ++-------- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 57 ++------------ .../apache/kylin/engine/spark/SparkCubing.java | 3 +- .../engine/spark/SparkCubingJobBuilder.java | 6 +- .../apache/kylin/rest/service/BasicService.java | 40 +--------- .../apache/kylin/rest/service/JobService.java | 51 +++++++------ .../cardinality/ColumnCardinalityMapper.java | 2 +- .../cardinality/HiveColumnCardinalityJob.java | 3 +- .../kylin/storage/hbase/HBaseResourceStore.java | 46 ++++-------- .../kylin/storage/hbase/steps/HBaseMRSteps.java | 78 ++++++++++---------- .../hbase/steps/HBaseStreamingOutput.java | 2 +- .../hbase/steps/RangeKeyDistributionJob.java | 10 +-- .../steps/RangeKeyDistributionReducer.java | 20 ++--- 43 files changed, 483 insertions(+), 670 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index ccae80b..88ee553 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -73,7 +73,7 @@ abstract public class ResourceStore { } return knownImpl; } - + private static ResourceStore createResourceStore(KylinConfig kylinConfig) { List<Throwable> es = new ArrayList<Throwable>(); logger.info("Using metadata url " + kylinConfig.getMetadataUrl() + " for resource store"); @@ -141,7 +141,7 @@ abstract public class ResourceStore { RawResource res = getResourceImpl(resPath); if (res == null) return null; - + DataInputStream din = new DataInputStream(res.inputStream); try { T r = serializer.deserialize(din); @@ -160,25 +160,9 @@ abstract public class ResourceStore { final public long getResourceTimestamp(String resPath) throws IOException { return getResourceTimestampImpl(norm(resPath)); } - + final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, Class<T> clazz, Serializer<T> serializer) throws IOException { - final List<RawResource> allResources = getAllResources(rangeStart, rangeEnd); - if (allResources.isEmpty()) { - return Collections.emptyList(); - } - List<T> result = Lists.newArrayList(); - try { - for (RawResource rawResource : allResources) { - final T element = serializer.deserialize(new DataInputStream(rawResource.inputStream)); - element.setLastModified(rawResource.timestamp); - result.add(element); - } - return result; - } finally { - for (RawResource rawResource : allResources) { - IOUtils.closeQuietly(rawResource.inputStream); - } - } + return getAllResources(rangeStart, rangeEnd, -1L, -1L, clazz, serializer); } final public <T extends RootPersistentEntity> List<T> getAllResources(String rangeStart, String rangeEnd, long timeStartInMillis, long timeEndInMillis, Class<T> clazz, Serializer<T> serializer) throws IOException { @@ -210,7 +194,7 @@ abstract public class ResourceStore { /** returns 0 if not exists */ abstract protected long getResourceTimestampImpl(String resPath) throws IOException; - + /** * overwrite a resource without write conflict check */ http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 67dce73..5b61c10 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -18,12 +18,12 @@ package org.apache.kylin.cube; -import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.Map; -import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; - +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonBackReference; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ShardingHash; @@ -37,11 +37,12 @@ import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; import org.apache.kylin.metadata.realization.IRealizationSegment; -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; -import com.fasterxml.jackson.annotation.JsonBackReference; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment { @@ -76,6 +77,9 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I private Map<Long, Short> cuboidShardNums = Maps.newHashMap(); @JsonProperty("total_shards") private int totalShards = 0; + @JsonProperty("blackout_cuboids") + private List<Long> blackoutCuboids = Lists.newArrayList(); + @JsonProperty("binary_signature") private String binarySignature; // a hash of cube schema and dictionary ID, used for sanity check @@ -391,6 +395,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I /** * get the number of shards where each cuboid will distribute + * * @return */ public Short getCuboidShardNum(Long cuboidId) { @@ -423,6 +428,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I return ret; } + public List<Long> getBlackoutCuboids() { + return this.blackoutCuboids; + } + @Override public IRealization getRealization() { return cubeInstance; http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java index 16b0287..89f5204 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java @@ -18,10 +18,14 @@ package org.apache.kylin.cube.cuboid; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping; @@ -31,13 +35,10 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.RowKeyColDesc; import org.apache.kylin.metadata.model.TblColRef; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; public class Cuboid implements Comparable<Cuboid> { @@ -231,8 +232,7 @@ public class Cuboid implements Comparable<Cuboid> { return true; } - hier: - for (HierarchyMask hierarchyMasks : hierarchyMaskList) { + hier: for (HierarchyMask hierarchyMasks : hierarchyMaskList) { long result = cuboidID & hierarchyMasks.fullMask; if (result > 0) { for (long mask : hierarchyMasks.allMasks) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index d370b0d..d3c1003 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -26,21 +26,15 @@ public final class ExecutableConstants { } public static final String YARN_APP_ID = "yarn_application_id"; - public static final String YARN_APP_URL = "yarn_application_tracking_url"; public static final String MR_JOB_ID = "mr_job_id"; public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written"; public static final String SOURCE_RECORDS_COUNT = "source_records_count"; public static final String SOURCE_RECORDS_SIZE = "source_records_size"; - public static final String GLOBAL_LISTENER_NAME = "ChainListener"; public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60; - public static final String CUBE_JOB_GROUP_NAME = "cube_job_group"; - - public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group"; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; - public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; @@ -57,23 +51,9 @@ public final class ExecutableConstants { public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info"; public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection"; public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS"; - public static final String STEP_NAME_BUILD_II = "Build Inverted Index"; public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile"; public static final String STEP_NAME_UPDATE_II_INFO = "Update Inverted Index Info"; - public static final String PROP_ENGINE_CONTEXT = "jobengineConfig"; - public static final String PROP_JOB_FLOW = "jobFlow"; - public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid"; - public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID"; - public static final String PROP_COMMAND = "command"; - // public static final String PROP_STORAGE_LOCATION = - // "storageLocationIdentifier"; - public static final String PROP_JOB_ASYNC = "jobAsync"; - public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor"; - public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput"; - public static final String PROP_JOB_KILLED = "jobKilled"; - public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows"; - public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 8d5fea5..83c61ae 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -18,10 +18,13 @@ package org.apache.kylin.job.execution; -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; + import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.KylinConfig; @@ -32,12 +35,10 @@ import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** */ @@ -117,12 +118,12 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } retry++; } while (((result != null && result.succeed() == false) || exception != null) && needRetry() == true); - + if (exception != null) { onExecuteError(exception, executableContext); throw new ExecuteException(exception); } - + onExecuteFinished(result, executableContext); return result; } @@ -164,7 +165,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { @Override public final Map<String, String> getParams() { - return Collections.unmodifiableMap(this.params); + return this.params; } public final String getParam(String key) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java index 3c10c09..45d03d1 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java @@ -22,6 +22,7 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.RowKeyDesc; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.BaseCuboidJob; import org.apache.kylin.engine.mr.steps.NDCuboidJob; @@ -92,12 +93,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); - appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); - appendExecCmdParameters(cmd, "level", "0"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); @@ -113,12 +114,12 @@ public class BatchCubingJobBuilder extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); - appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); - appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum)); ndCuboidStep.setMapReduceParams(cmd.toString()); ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 86439a8..5f4a3ed 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -22,8 +22,10 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.RowKeyDesc; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.BaseCuboidJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.mr.steps.InMemCuboidJob; import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; @@ -89,10 +91,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { private SaveStatisticsStep createSaveStatisticsStep(String jobId) { SaveStatisticsStep result = new SaveStatisticsStep(); result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS); - result.setCubeName(seg.getRealization().getName()); - result.setSegmentId(seg.getUuid()); - result.setStatisticsPath(getStatisticsPath(jobId)); - result.setCubingJobId(jobId); + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setStatisticsPath(getStatisticsPath(jobId), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); return result; } @@ -105,11 +107,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "output", cuboidRootPath); - appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName()); - appendExecCmdParameters(cmd, "cubingJobId", jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(InMemCuboidJob.class); @@ -126,13 +128,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); - appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); - appendExecCmdParameters(cmd, "level", "0"); - appendExecCmdParameters(cmd, "cubingJobId", jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, "FLAT_TABLE"); // marks flat table input + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[0]); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class); @@ -148,13 +150,13 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); - appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); - appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); - appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum)); - appendExecCmdParameters(cmd, "cubingJobId", jobId); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + (totalRowkeyColumnCount - dimNum)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); ndCuboidStep.setMapReduceParams(cmd.toString()); ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java index 831aa9d..6f1d445 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.steps.MergeCuboidJob; import org.apache.kylin.job.constant.ExecutableConstants; @@ -48,7 +49,7 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { public CubingJob build() { logger.info("MR_V1 new job to MERGE segment " + seg); - final CubeSegment cubeSegment = (CubeSegment)seg; + final CubeSegment cubeSegment = (CubeSegment) seg; final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config); final String jobId = result.getId(); @@ -84,11 +85,11 @@ public class BatchMergeJobBuilder extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", outputPath); - appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java index 008d489..e151674 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java @@ -22,7 +22,9 @@ import java.util.List; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.mr.steps.MergeCuboidJob; import org.apache.kylin.engine.mr.steps.MergeStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; @@ -79,10 +81,12 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) { MergeStatisticsStep result = new MergeStatisticsStep(); result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS); - result.setCubeName(seg.getCubeInstance().getName()); - result.setSegmentId(seg.getUuid()); - result.setMergingSegmentIds(mergingSegmentIds); - result.setMergedStatisticsPath(mergedStatisticsFolder); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams()); + CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams()); + return result; } @@ -92,11 +96,11 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport { StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc()); - appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", outputPath); - appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step"); mergeCuboidDataStep.setMapReduceParams(cmd.toString()); mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 979ff75..0325a09 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -24,17 +24,16 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.List; import java.util.TimeZone; import java.util.regex.Matcher; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; @@ -47,8 +46,8 @@ import org.apache.kylin.job.execution.Output; /** */ public class CubingJob extends DefaultChainedExecutable { - - public static enum AlgorithmEnum { + + public enum AlgorithmEnum { LAYER, INMEM } @@ -58,9 +57,6 @@ public class CubingJob extends DefaultChainedExecutable { public static final String CUBE_SIZE_BYTES = "byteSizeBytes"; public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime"; - private static final String CUBE_INSTANCE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - public static CubingJob createBuildJob(CubeSegment seg, String submitter, JobEngineConfig config) { return initCubingJob(seg, "BUILD", submitter, config); } @@ -73,8 +69,8 @@ public class CubingJob extends DefaultChainedExecutable { CubingJob result = new CubingJob(); SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss"); format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone())); - result.setCubeName(seg.getCubeInstance().getName()); - result.setSegmentId(seg.getUuid()); + CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + jobType + " - " + format.format(new Date(System.currentTimeMillis()))); result.setSubmitter(submitter); result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList()); @@ -85,29 +81,9 @@ public class CubingJob extends DefaultChainedExecutable { super(); } - void setCubeName(String name) { - setParam(CUBE_INSTANCE_NAME, name); - } - - public String getCubeName() { - return getParam(CUBE_INSTANCE_NAME); - } - - void setSegmentIds(List<String> segmentIds) { - setParam(SEGMENT_ID, StringUtils.join(segmentIds, ",")); - } - - void setSegmentId(String segmentId) { - setParam(SEGMENT_ID, segmentId); - } - - public String getSegmentIds() { - return getParam(SEGMENT_ID); - } - @Override protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) { - CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName()); + CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams())); final Output output = jobService.getOutput(getId()); String logMsg; state = output.getState(); @@ -131,7 +107,7 @@ public class CubingJob extends DefaultChainedExecutable { String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE; content = content.replaceAll("\\$\\{job_name\\}", getName()); content = content.replaceAll("\\$\\{result\\}", state.toString()); - content = content.replaceAll("\\$\\{cube_name\\}", getCubeName()); + content = content.replaceAll("\\$\\{cube_name\\}", CubingExecutableUtil.getCubeName(this.getParams())); content = content.replaceAll("\\$\\{source_records_count\\}", String.valueOf(findSourceRecordCount())); content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString()); content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins"); @@ -147,7 +123,7 @@ public class CubingJob extends DefaultChainedExecutable { logger.warn(e.getLocalizedMessage(), e); } - String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName(); + String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + CubingExecutableUtil.getCubeName(this.getParams()); return Pair.of(title, content); } @@ -174,11 +150,11 @@ public class CubingJob extends DefaultChainedExecutable { public void setMapReduceWaitTime(long t) { addExtraInfo(MAP_REDUCE_WAIT_TIME, t + ""); } - + public void setAlgorithm(AlgorithmEnum alg) { addExtraInfo("algorithm", alg.name()); } - + public AlgorithmEnum getAlgorithm() { String alg = getExtraInfo().get("algorithm"); return alg == null ? null : AlgorithmEnum.valueOf(alg); @@ -187,11 +163,11 @@ public class CubingJob extends DefaultChainedExecutable { public boolean isLayerCubing() { return AlgorithmEnum.LAYER == getAlgorithm(); } - + public boolean isInMemCubing() { return AlgorithmEnum.INMEM == getAlgorithm(); } - + public long findSourceRecordCount() { return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0")); } @@ -204,7 +180,7 @@ public class CubingJob extends DefaultChainedExecutable { // look for the info BACKWARD, let the last step that claims the cube size win return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0")); } - + public String findExtraInfo(String key, String dft) { return findExtraInfo(key, dft, false); } @@ -212,14 +188,14 @@ public class CubingJob extends DefaultChainedExecutable { public String findExtraInfoBackward(String key, String dft) { return findExtraInfo(key, dft, true); } - + private String findExtraInfo(String key, String dft, boolean backward) { ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks()); - + if (backward) { Collections.reverse(tasks); } - + for (AbstractExecutable child : tasks) { Output output = executableManager.getOutput(child.getId()); String value = output.getExtra().get(key); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index e3b07d8..a3fef8e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -23,21 +23,23 @@ import java.util.List; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.engine.mr.invertedindex.UpdateIIInfoAfterBuildStep; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob; import org.apache.kylin.engine.mr.steps.MergeDictionaryStep; import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep; import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; - -import com.google.common.base.Preconditions; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.realization.IRealizationSegment; +import com.google.common.base.Preconditions; + /** * Hold reusable steps for builders. */ @@ -67,14 +69,14 @@ public class JobBuilderSupport { result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); result.setMapReduceJobClass(FactDistinctColumnsJob.class); StringBuilder cmd = new StringBuilder(); - appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel()); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId)); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats)); - appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId)); - appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); - appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); + appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel()); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getFactDistinctColumnsPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats)); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_OUTPUT, getStatisticsPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT, String.valueOf(config.getConfig().getCubingInMemSamplingPercent())); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); result.setMapReduceParams(cmd.toString()); return result; @@ -85,9 +87,9 @@ public class JobBuilderSupport { HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable(); buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY); StringBuilder cmd = new StringBuilder(); - appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, seg.getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId)); buildDictionaryStep.setJobParams(cmd.toString()); buildDictionaryStep.setJobClass(CreateDictionaryJob.class); @@ -95,34 +97,38 @@ public class JobBuilderSupport { } public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) { - final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep(); - updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); - updateCubeInfoStep.setCubeName(seg.getRealization().getName()); - updateCubeInfoStep.setSegmentId(seg.getUuid()); - updateCubeInfoStep.setCubingJobId(jobId); - return updateCubeInfoStep; + final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + + return result; } public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) { MergeDictionaryStep result = new MergeDictionaryStep(); result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY); - result.setCubeName(seg.getRealization().getName()); - result.setSegmentId(seg.getUuid()); - result.setMergingSegmentIds(mergingSegmentIds); + + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams()); + return result; } public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) { UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); - result.setCubeName(seg.getRealization().getName()); - result.setSegmentId(seg.getUuid()); - result.setMergingSegmentIds(mergingSegmentIds); - result.setCubingJobId(jobId); - return result; - } + CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams()); + CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setCubingJobId(jobId, result.getParams()); + CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams()); + return result; + } public UpdateIIInfoAfterBuildStep createUpdateIIInfoAfterBuildStep(String jobId) { final UpdateIIInfoAfterBuildStep updateIIInfoStep = new UpdateIIInfoAfterBuildStep(); @@ -141,6 +147,7 @@ public class JobBuilderSupport { public String getRealizationRootPath(String jobId) { return getJobWorkingDir(jobId) + "/" + seg.getRealization().getName(); } + public String getCuboidRootPath(String jobId) { return getRealizationRootPath(jobId) + "/cuboid/"; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 61983d5..fe60ca8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -23,7 +23,7 @@ package org.apache.kylin.engine.mr.common; * */ -import static org.apache.hadoop.util.StringUtils.*; +import static org.apache.hadoop.util.StringUtils.formatTime; import java.io.File; import java.io.IOException; @@ -74,22 +74,21 @@ import org.slf4j.LoggerFactory; public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class); - protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname"); - protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename"); - protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName("cubingJobId").hasArg().isRequired(false).withDescription("ID of cubing job executable").create("cubingJobId"); - protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname"); - protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname"); - protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename"); - protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); - protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat"); - protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); - protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level"); - protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("partitions"); - protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename"); - - protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName("statisticsenabled").hasArg().isRequired(false).withDescription("Statistics enabled").create("statisticsenabled"); - protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName("statisticsoutput").hasArg().isRequired(false).withDescription("Statistics output").create("statisticsoutput"); - protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName("statisticssamplingpercent").hasArg().isRequired(false).withDescription("Statistics sampling percentage").create("statisticssamplingpercent"); + protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName(BatchConstants.ARG_JOB_NAME).hasArg().isRequired(true).withDescription("Job name. For example, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create(BatchConstants.ARG_JOB_NAME); + protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create(BatchConstants.ARG_CUBE_NAME); + protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName(BatchConstants.ARG_CUBING_JOB_ID).hasArg().isRequired(false).withDescription("ID of cubing job executable").create(BatchConstants.ARG_CUBING_JOB_ID); + protected static final Option OPTION_II_NAME = OptionBuilder.withArgName(BatchConstants.ARG_II_NAME).hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create(BatchConstants.ARG_II_NAME); + protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName( BatchConstants.ARG_SEGMENT_NAME).hasArg().isRequired(true).withDescription("Cube segment name").create( BatchConstants.ARG_SEGMENT_NAME); + protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg().isRequired(true).withDescription("Input path").create(BatchConstants.ARG_INPUT); + protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName(BatchConstants.ARG_INPUT_FORMAT).hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT); + protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); + protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); + protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION).hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION); + protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME).hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME); + + protected static final Option OPTION_STATISTICS_ENABLED = OptionBuilder.withArgName(BatchConstants.ARG_STATS_ENABLED).hasArg().isRequired(false).withDescription("Statistics enabled").create(BatchConstants.ARG_STATS_ENABLED); + protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT).hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT); + protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false).withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT); protected String name; protected boolean isAsync = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 6943f18..a614f4b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -20,8 +20,15 @@ package org.apache.kylin.engine.mr.common; public interface BatchConstants { + /** + * source data config + */ char INTERMEDIATE_TABLE_ROW_DELIMITER = 127; + /** + * ConFiGuration entry names for MR jobs + */ + String CFG_CUBE_NAME = "cube.name"; String CFG_CUBE_SEGMENT_NAME = "cube.segment.name"; String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level"; @@ -29,31 +36,47 @@ public interface BatchConstants { String CFG_II_NAME = "ii.name"; String CFG_II_SEGMENT_NAME = "ii.segment.name"; - String OUTPUT_PATH = "output.path"; - - String TABLE_NAME = "table.name"; - String TABLE_COLUMNS = "table.columns"; - + String CFG_OUTPUT_PATH = "output.path"; + String CFG_TABLE_NAME = "table.name"; String CFG_IS_MERGE = "is.merge"; String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter"; + String CFG_REGION_NUMBER_MIN = "region.number.min"; + String CFG_REGION_NUMBER_MAX = "region.number.max"; + String CFG_REGION_SPLIT_SIZE = "region.split.size"; + String CFG_HFILE_SIZE_GB = "hfile.size.gb"; - String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder"; - - String REGION_NUMBER_MIN = "region.number.min"; - String REGION_NUMBER_MAX = "region.number.max"; - String REGION_SPLIT_SIZE = "region.split.size"; - String HFILE_SIZE_GB = "hfile.size.gb"; - String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/"; String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/"; - + String CFG_STATISTICS_LOCAL_DIR = CFG_KYLIN_LOCAL_TEMP_DIR + "cuboidstatistics/"; String CFG_STATISTICS_ENABLED = "statistics.enabled"; - String CFG_STATISTICS_OUTPUT = "statistics.ouput"; + String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent"; - String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt"; - String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq"; + String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt"; + String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq"; - int COUNTER_MAX = 100000; - int ERROR_RECORD_THRESHOLD = 100; + /** + * command line ARGuments + */ + String ARG_INPUT = "input"; + String ARG_OUTPUT = "output"; + String ARG_JOB_NAME = "jobname"; + String ARG_CUBING_JOB_ID = "cubingJobId"; + String ARG_CUBE_NAME = "cubename"; + String ARG_II_NAME = "iiname"; + String ARG_SEGMENT_NAME = "segmentname"; + String ARG_PARTITION = "partitions"; + String ARG_STATS_ENABLED= "statisticsenabled"; + String ARG_STATS_OUTPUT= "statisticsoutput"; + String ARG_STATS_SAMPLING_PERCENT= "statisticssamplingpercent"; + String ARG_HTABLE_NAME= "htablename"; + String ARG_INPUT_FORMAT= "inputformat"; + String ARG_LEVEL= "level"; + + /** + * logger and counter + */ + String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder"; + int NORMAL_RECORD_LOG_THRESHOLD = 100000; + int ERROR_RECORD_LOG_THRESHOLD = 100; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 57e93c3..00b9e32 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -18,19 +18,8 @@ package org.apache.kylin.engine.mr.common; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintWriter; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; @@ -61,9 +50,16 @@ import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintWriter; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * This should be in cube module. It's here in engine-mr because currently stats @@ -76,7 +72,7 @@ public class CubeStatsReader { final CubeSegment seg; final int samplingPercentage; final double mapperOverlapRatioOfFirstBuild; // only makes sense for the first build, is meaningless after merge - final Map<Long, HyperLogLogPlusCounter> cuboidRowCountMap; + final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); @@ -112,7 +108,7 @@ public class CubeStatsReader { this.seg = cubeSegment; this.samplingPercentage = percentage; this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio; - this.cuboidRowCountMap = counterMap; + this.cuboidRowEstimatesHLL = counterMap; } finally { IOUtils.closeStream(reader); @@ -133,13 +129,13 @@ public class CubeStatsReader { return tempFile; } - public Map<Long, Long> getCuboidRowCountMap() { - return getCuboidRowCountMapFromSampling(cuboidRowCountMap, samplingPercentage); + public Map<Long, Long> getCuboidRowEstimatesHLL() { + return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage); } // return map of Cuboid ID => MB public Map<Long, Double> getCuboidSizeMap() { - return getCuboidSizeMapFromRowCount(seg, getCuboidRowCountMap()); + return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); } public double getMapperOverlapRatioOfFirstBuild() { @@ -147,15 +143,13 @@ public class CubeStatsReader { } public static Map<Long, Long> getCuboidRowCountMapFromSampling(Map<Long, HyperLogLogPlusCounter> hllcMap, int samplingPercentage) { - return Maps.transformValues(hllcMap, new Function<HyperLogLogPlusCounter, Long>() { - @Nullable - @Override - public Long apply(HyperLogLogPlusCounter input) { - // No need to adjust according sampling percentage. Assumption is that data set is far - // more than cardinality. Even a percentage of the data should already see all cardinalities. - return input.getCountEstimate(); - } - }); + Map<Long, Long> cuboidRowCountMap = Maps.newHashMap(); + for (Map.Entry<Long, HyperLogLogPlusCounter> entry : hllcMap.entrySet()) { + // No need to adjust according sampling percentage. Assumption is that data set is far + // more than cardinality. Even a percentage of the data should already see all cardinalities. + cuboidRowCountMap.put(entry.getKey(), entry.getValue().getCountEstimate()); + } + return cuboidRowCountMap; } public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { @@ -219,7 +213,7 @@ public class CubeStatsReader { } private void print(PrintWriter out) { - Map<Long, Long> cuboidRows = getCuboidRowCountMap(); + Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL(); Map<Long, Double> cuboidSizes = getCuboidSizeMap(); List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet()); Collections.sort(cuboids); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java index 02fe0f0..cb4b1cb 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java @@ -43,7 +43,7 @@ public class CuboidStatsUtil { } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map<Long, HyperLogLogPlusCounter> cuboidHLLMap, int samplingPercentage, double mapperOverlapRatio) throws IOException { - Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION); + Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); List<Long> allCuboids = new ArrayList<Long>(); allCuboids.addAll(cuboidHLLMap.keySet()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java index e7501b8..4841e64 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/BatchIIJobBuilder.java @@ -22,6 +22,7 @@ import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.MRUtil; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.MapReduceExecutable; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.job.constant.ExecutableConstants; @@ -29,9 +30,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BatchIIJobBuilder extends JobBuilderSupport { - + private static final Logger logger = LoggerFactory.getLogger(BatchIIJobBuilder.class); - + private final IMRBatchCubingInputSide inputSide; private final IMROutput.IMRBatchInvertedIndexingOutputSide outputSide; @@ -44,15 +45,15 @@ public class BatchIIJobBuilder extends JobBuilderSupport { public IIJob build() { logger.info("MR new job to BUILD segment " + seg); - final IIJob result = IIJob.createBuildJob((IISegment)seg, submitter, config); + final IIJob result = IIJob.createBuildJob((IISegment) seg, submitter, config); final String jobId = result.getId(); - + final String iiRootPath = getRealizationRootPath(jobId) + "/"; // Phase 1: Create Flat Table inputSide.addStepPhase1_CreateFlatTable(result); - + // Phase 2: Build Inverted Index - result.addTask(createInvertedIndexStep((IISegment)seg, iiRootPath)); + result.addTask(createInvertedIndexStep((IISegment) seg, iiRootPath)); outputSide.addStepPhase3_BuildII(result, iiRootPath); // Phase 3: Update Metadata & Cleanup @@ -71,13 +72,13 @@ public class BatchIIJobBuilder extends JobBuilderSupport { buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II); - appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName()); - appendExecCmdParameters(cmd, "output", iiOutputTempPath); - appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II); + appendExecCmdParameters(cmd, BatchConstants.ARG_II_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, iiOutputTempPath); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, ExecutableConstants.STEP_NAME_BUILD_II); buildIIStep.setMapReduceParams(cmd.toString()); buildIIStep.setMapReduceJobClass(InvertedIndexJob.class); return buildIIStep; } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java index 27505e6..9ea2411 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/invertedindex/InvertedIndexJob.java @@ -19,14 +19,12 @@ package org.apache.kylin.engine.mr.invertedindex; import java.io.IOException; -import java.util.ArrayList; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; @@ -39,8 +37,6 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,7 +102,6 @@ public class InvertedIndexJob extends AbstractHadoopJob { conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName()); } - private void setupMapper(IISegment segment) throws IOException { IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat(); @@ -127,7 +122,7 @@ public class InvertedIndexJob extends AbstractHadoopJob { FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); deletePath(job.getConfiguration(), output); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 3dddece..5fb9098 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -195,7 +195,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL // TODO expose errorRecordCounter as hadoop counter errorRecordCounter++; - if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { + if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) { if (ex instanceof IOException) throw (IOException) ex; else if (ex instanceof RuntimeException) http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java new file mode 100644 index 0000000..71f27f2 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@ -0,0 +1,78 @@ +package org.apache.kylin.engine.mr.steps; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; + +import com.google.common.collect.Lists; + +public class CubingExecutableUtil { + + public static final String CUBE_NAME = "cubeName"; + public static final String SEGMENT_ID = "segmentId"; + public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; + public static final String STATISTICS_PATH = "statisticsPath"; + public static final String CUBING_JOB_ID = "cubingJobId"; + public static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath"; + + public static void setStatisticsPath(String path, Map<String, String> params) { + params.put(STATISTICS_PATH, path); + } + + public static String getStatisticsPath(Map<String, String> params) { + return params.get(STATISTICS_PATH); + } + + public static void setCubeName(String cubeName, Map<String, String> params) { + params.put(CUBE_NAME, cubeName); + } + + public static String getCubeName(Map<String, String> params) { + return params.get(CUBE_NAME); + } + + public static void setSegmentId(String segmentId, Map<String, String> params) { + params.put(SEGMENT_ID, segmentId); + } + + public static String getSegmentId(Map<String, String> params) { + return params.get(SEGMENT_ID); + } + + public static void setMergingSegmentIds(List<String> ids, Map<String, String> params) { + params.put(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); + } + + public static List<String> getMergingSegmentIds(Map<String, String> params) { + final String ids = params.get(MERGING_SEGMENT_IDS); + if (ids != null) { + final String[] splitted = StringUtils.split(ids, ","); + ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); + for (String id : splitted) { + result.add(id); + } + return result; + } else { + return Collections.emptyList(); + } + } + + public static void setCubingJobId(String id, Map<String, String> params) { + params.put(CUBING_JOB_ID, id); + } + + public static String getCubingJobId(Map<String, String> params) { + return params.get(CUBING_JOB_ID); + } + + public static void setMergedStatisticsPath(String path, Map<String, String> params) { + params.put(MERGED_STATISTICS_PATH, path); + } + + public static String getMergedStatisticsPath(Map<String, String> params) { + return params.get(MERGED_STATISTICS_PATH); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java index 4dbb53e..f263d99 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java @@ -108,7 +108,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> { context.write(key, outputValue); counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { + if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index 1373e2c..2dabb7a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@ -125,7 +125,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob { job.setNumReduceTasks(numberOfReducers); FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); deletePath(job.getConfiguration(), output); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 1412dfb..cc7b6df 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -71,7 +71,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K ex.printStackTrace(System.err); errorRecordCounter++; - if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { + if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) { if (ex instanceof IOException) throw (IOException) ex; else if (ex instanceof RuntimeException) http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index f3e0290..f43834d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -128,7 +128,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException { final Configuration conf = context.getConfiguration(); final FileSystem fs = FileSystem.get(conf); - final String outputPath = conf.get(BatchConstants.OUTPUT_PATH); + final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH); final Path outputFile = new Path(outputPath, col.getName()); FSDataOutputStream out = null; @@ -176,7 +176,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri private void writeMapperAndCuboidStatistics(Context context) throws IOException { Configuration conf = context.getConfiguration(); FileSystem fs = FileSystem.get(conf); - FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION)); + FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME)); try { String msg; http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java index e2a49df..8f5557d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java @@ -41,7 +41,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O @Override public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException { counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { + if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index b094b98..e42a6b5 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -100,7 +100,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr while (!future.isDone()) { if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) { counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { + if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); } break; http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java index 9beacbb..e72c38b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java @@ -85,7 +85,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra context.write(outputKey, outputValue); counter++; - if (counter % BatchConstants.COUNTER_MAX == 0) { + if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) { logger.info("Handled " + counter + " records!"); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5e13bba0/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java index f247a8f..264ba9b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java @@ -25,7 +25,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -44,10 +43,6 @@ import com.google.common.collect.Lists; public class MergeDictionaryStep extends AbstractExecutable { - private static final String CUBE_NAME = "cubeName"; - private static final String SEGMENT_ID = "segmentId"; - private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; - public MergeDictionaryStep() { super(); } @@ -56,8 +51,8 @@ public class MergeDictionaryStep extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig conf = context.getConfig(); final CubeManager mgr = CubeManager.getInstance(conf); - final CubeInstance cube = mgr.getCube(getCubeName()); - final CubeSegment newSegment = cube.getSegmentById(getSegmentId()); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); final List<CubeSegment> mergingSegments = getMergingSegments(cube); Collections.sort(mergingSegments); @@ -79,7 +74,7 @@ public class MergeDictionaryStep extends AbstractExecutable { } private List<CubeSegment> getMergingSegments(CubeInstance cube) { - List<String> mergingSegmentIds = getMergingSegmentIds(); + List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams()); List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size()); for (String id : mergingSegmentIds) { result.add(cube.getSegmentById(id)); @@ -111,7 +106,7 @@ public class MergeDictionaryStep extends AbstractExecutable { CubeDesc cubeDesc = cube.getDescriptor(); for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) { - String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(),true, col).getTable(); + String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), true, col).getTable(); if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) { colsNeedMeringDict.add(col); } else { @@ -165,38 +160,4 @@ public class MergeDictionaryStep extends AbstractExecutable { } } - public void setCubeName(String cubeName) { - this.setParam(CUBE_NAME, cubeName); - } - - private String getCubeName() { - return getParam(CUBE_NAME); - } - - public void setSegmentId(String segmentId) { - this.setParam(SEGMENT_ID, segmentId); - } - - private String getSegmentId() { - return getParam(SEGMENT_ID); - } - - public void setMergingSegmentIds(List<String> ids) { - setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ",")); - } - - private List<String> getMergingSegmentIds() { - final String ids = getParam(MERGING_SEGMENT_IDS); - if (ids != null) { - final String[] splitted = StringUtils.split(ids, ","); - ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length); - for (String id : splitted) { - result.add(id); - } - return result; - } else { - return Collections.emptyList(); - } - } - }
