http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 4d89e1a..ce5888d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -18,27 +18,27 @@ package org.apache.kylin.metadata.model; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.measure.MeasureTypeFactory; -import org.apache.kylin.measure.basic.BasicMeasureType; -import org.apache.kylin.metadata.datatype.DataType; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.measure.basic.BasicMeasureType; +import org.apache.kylin.metadata.datatype.DataType; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class FunctionDesc { +public class FunctionDesc implements Serializable { public static FunctionDesc newInstance(String expression, ParameterDesc param, String returnType) { FunctionDesc r = new FunctionDesc();
http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java index 6489244..dd1500b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinDesc.java @@ -18,17 +18,18 @@ package org.apache.kylin.metadata.model; -import java.util.Arrays; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import java.io.Serializable; +import java.util.Arrays; + /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class JoinDesc { +public class JoinDesc implements Serializable { // inner, left, right, outer... @JsonProperty("type") http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java index 5d0409a..51e5787 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinTableDesc.java @@ -25,8 +25,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; + @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class JoinTableDesc { +public class JoinTableDesc implements Serializable { @JsonProperty("table") private String table; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java index a0b267d..c132d0e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/JoinsTree.java @@ -18,15 +18,16 @@ package org.apache.kylin.metadata.model; +import com.google.common.base.Preconditions; + +import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; - -public class JoinsTree { +public class JoinsTree implements Serializable { final Map<String, Chain> tableChains = new LinkedHashMap<>(); @@ -111,7 +112,7 @@ public class JoinsTree { return chain.join; } - static class Chain { + static class Chain implements java.io.Serializable { TableRef table; // pk side JoinDesc join; Chain fkSide; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java index 253b06b..c0719d2 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java @@ -18,18 +18,19 @@ package org.apache.kylin.metadata.model; -import java.util.Objects; - import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; +import java.util.Objects; + /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class MeasureDesc { +public class MeasureDesc implements Serializable { @JsonProperty("name") private String name; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java index 6460f71..3c5c5f1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ModelDimensionDesc.java @@ -18,17 +18,17 @@ package org.apache.kylin.metadata.model; -import java.util.List; - -import org.apache.kylin.common.util.StringUtil; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kylin.common.util.StringUtil; + +import java.io.Serializable; +import java.util.List; /** */ @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ModelDimensionDesc { +public class ModelDimensionDesc implements Serializable { @JsonProperty("table") private String table; @JsonProperty("columns") http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java index c14d061..8ad20a8 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java @@ -18,21 +18,22 @@ package org.apache.kylin.metadata.model; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.List; - import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.List; + /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class ParameterDesc { +public class ParameterDesc implements Serializable { public static ParameterDesc newInstance(Object... objs) { if (objs.length == 0) http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java index 9925990..c6e6425 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java @@ -27,10 +27,12 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Serializable; + /** */ @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class PartitionDesc { +public class PartitionDesc implements Serializable { public static enum PartitionType { APPEND, // @@ -175,7 +177,7 @@ public class PartitionDesc { String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive); } - public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder { + public static class DefaultPartitionConditionBuilder implements IPartitionConditionBuilder, java.io.Serializable { @Override public String buildDateRangeCondition(PartitionDesc partDesc, long startInclusive, long endExclusive) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java index 7089eba..0d9b442 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableRef.java @@ -18,25 +18,28 @@ package org.apache.kylin.metadata.model; +import com.google.common.collect.Maps; + +import java.io.Serializable; import java.util.Collection; import java.util.Collections; import java.util.Map; -import com.google.common.collect.Maps; +public class TableRef implements Serializable { -public class TableRef { - - final private DataModelDesc model; + final transient private DataModelDesc model; final private String alias; final private TableDesc table; final private Map<String, TblColRef> columns; + final private String modelName; TableRef(DataModelDesc model, String alias, TableDesc table) { this.model = model; + this.modelName = model.getName(); this.alias = alias; this.table = table; this.columns = Maps.newLinkedHashMap(); - + for (ColumnDesc col : table.getColumns()) { columns.put(col.getName(), new TblColRef(this, col)); } @@ -94,7 +97,7 @@ public class TableRef { TableRef t = (TableRef) o; - if ((model == null ? t.model == null : model.getName().equals(t.model.getName())) == false) + if ((modelName == null ? t.modelName != null : modelName.equals(t.modelName)) == false) return false; if ((alias == null ? t.alias == null : alias.equals(t.alias)) == false) return false; @@ -107,7 +110,7 @@ public class TableRef { @Override public int hashCode() { int result = 0; - result = 31 * result + model.getName().hashCode(); + result = 31 * result + modelName.hashCode(); result = 31 * result + alias.hashCode(); result = 31 * result + table.getIdentity().hashCode(); return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 6eba3c2..0da2a8e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -32,6 +32,7 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob; import org.apache.kylin.engine.mr.steps.SaveStatisticsStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.execution.AbstractExecutable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } - private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey(); final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel(); final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length; @@ -100,7 +101,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return result; } - private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { + protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { // base cuboid job MapReduceExecutable cubeStep = new MapReduceExecutable(); @@ -117,7 +118,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { cubeStep.setMapReduceParams(cmd.toString()); cubeStep.setMapReduceJobClass(getInMemCuboidJob()); - cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES); +// cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES); return cubeStep; } @@ -144,7 +145,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { baseCuboidStep.setMapReduceParams(cmd.toString()); baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob()); - baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); +// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return baseCuboidStep; } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index 5f5814b..47695b8 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -75,6 +75,7 @@ public class JobBuilderSupport { appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Fact_Distinct_Columns_" + seg.getRealization().getName() + "_Step"); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId); result.setMapReduceParams(cmd.toString()); + result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES); return result; } @@ -192,7 +193,7 @@ public class JobBuilderSupport { return buf.append(" -").append(paraName).append(" ").append(paraValue); } - public String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { + public static String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) { String[] paths = new String[groupRowkeyColumnsCount + 1]; for (int i = 0; i <= groupRowkeyColumnsCount; i++) { int dimNum = totalRowkeyColumnCount - i; @@ -205,4 +206,13 @@ public class JobBuilderSupport { return paths; } + public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) { + if (level == 0) { + return cuboidRootPath + "base_cuboid"; + } else { + return cuboidRootPath + level + "level_cuboid"; + } + } + + } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java new file mode 100644 index 0000000..8d8496c --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import com.google.common.collect.Sets; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +public class BaseCuboidBuilder implements java.io.Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class); + public static final String HIVE_NULL = "\\N"; + public static final byte[] ONE = Bytes.toBytes("1"); + protected String cubeName; + protected Cuboid baseCuboid; + protected CubeDesc cubeDesc; + protected CubeSegment cubeSegment; + protected Set<String> nullStrs; + protected CubeJoinedFlatTableEnrich intermediateTableDesc; + protected MeasureIngester<?>[] aggrIngesters; + protected Map<TblColRef, Dictionary<String>> dictionaryMap; + protected AbstractRowKeyEncoder rowKeyEncoder; + protected BufferedMeasureCodec measureCodec; + + protected KylinConfig kylinConfig; + + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc, + AbstractRowKeyEncoder rowKeyEncoder, BufferedMeasureCodec measureCodec, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) { + this.kylinConfig = kylinConfig; + this.cubeDesc = cubeDesc; + this.cubeSegment = cubeSegment; + this.intermediateTableDesc = intermediateTableDesc; + this.rowKeyEncoder = rowKeyEncoder; + this.measureCodec = measureCodec; + this.aggrIngesters = aggrIngesters; + this.dictionaryMap = dictionaryMap; + + init(); + } + + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) { + this.kylinConfig = kylinConfig; + this.cubeDesc = cubeDesc; + this.cubeSegment = cubeSegment; + this.intermediateTableDesc = intermediateTableDesc; + + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); + measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); + dictionaryMap = cubeSegment.buildDictionaryMap(); + + init(); + } + + private void init() { + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + initNullBytes(); + } + + private void initNullBytes() { + nullStrs = Sets.newHashSet(); + nullStrs.add(HIVE_NULL); + String[] nullStrings = cubeDesc.getNullStrings(); + if (nullStrings != null) { + for (String s : nullStrings) { + nullStrs.add(s); + } + } + } + + protected boolean isNull(String v) { + return nullStrs.contains(v); + } + + public byte[] buildKey(String[] flatRow) { + int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + List<TblColRef> columns = baseCuboid.getColumns(); + String[] colValues = new String[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow); + } + return rowKeyEncoder.encode(colValues); + } + + public ByteBuffer buildValue(String[] flatRow) { + return measureCodec.encode(buildValueObjects(flatRow)); + } + + public Object[] buildValueObjects(String[] flatRow) { + Object[] measures = new Object[cubeDesc.getMeasures().size()]; + for (int i = 0; i < measures.length; i++) { + measures[i] = buildValueOf(i, flatRow); + } + + return measures; + } + + public void resetAggrs() { + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + aggrIngesters[i].reset(); + } + } + + private Object buildValueOf(int idxOfMeasure, String[] flatRow) { + MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + String[] inputToMeasure = new String[paramCount]; + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int colParamIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount()) { + value = "1"; + } else if (param.isColumnType()) { + value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow); + } else { + value = param.getValue(); + } + inputToMeasure[i] = value; + } + + return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); + } + + private String getCell(int i, String[] flatRow) { + if (isNull(flatRow[i])) + return null; + else + return flatRow[i]; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java new file mode 100644 index 0000000..4e98618 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + */ +public class NDCuboidBuilder implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(NDCuboidBuilder.class); + protected String cubeName; + protected String segmentID; + protected CubeSegment cubeSegment; + private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; + private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; + private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); + + public NDCuboidBuilder(CubeSegment cubeSegment) { + this.cubeSegment = cubeSegment; + this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + this.rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); + } + + public NDCuboidBuilder(CubeSegment cubeSegment, RowKeyEncoderProvider rowKeyEncoderProvider) { + this.cubeSegment = cubeSegment; + this.rowKeyEncoderProvider = rowKeyEncoderProvider; + this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + } + + + public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); + + int offset = 0; + + // rowkey columns + long mask = Long.highestOneBit(parentCuboid.getId()); + long parentCuboidId = parentCuboid.getId(); + long childCuboidId = childCuboid.getId(); + long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); + int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId + for (int i = 0; i < parentCuboidIdActualLength; i++) { + if ((mask & parentCuboidId) > 0) {// if the this bit position equals + // 1 + if ((mask & childCuboidId) > 0) {// if the child cuboid has this + // column + System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); + offset += splitBuffers[index].length; + } + index++; + } + mask = mask >> 1; + } + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); + + return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 7b719e0..d08e29a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -18,38 +18,25 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; /** */ @@ -59,131 +46,37 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K public static final byte[] ONE = Bytes.toBytes("1"); protected String cubeName; protected String segmentID; - protected Cuboid baseCuboid; protected CubeInstance cube; protected CubeDesc cubeDesc; protected CubeSegment cubeSegment; - protected Set<String> nullStrs; - protected CubeJoinedFlatTableEnrich intermediateTableDesc; - protected String intermediateTableRowDelimiter; - protected byte byteRowDelimiter; protected int counter; - protected MeasureIngester<?>[] aggrIngesters; - protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected Object[] measures; - protected AbstractRowKeyEncoder rowKeyEncoder; - protected BufferedMeasureCodec measureCodec; private int errorRecordCounter; protected Text outputKey = new Text(); protected Text outputValue = new Text(); + private BaseCuboidBuilder baseCuboidBuilder; + @Override protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); - intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); - if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { - throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); - } - - byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0]; - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - - cube = CubeManager.getInstance(config).getCube(cubeName); + final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata(); + cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); cubeDesc = cube.getDescriptor(); cubeSegment = cube.getSegmentById(segmentID); + CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - - intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - - measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - measures = new Object[cubeDesc.getMeasures().size()]; - - aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - dictionaryMap = cubeSegment.buildDictionaryMap(); - - initNullBytes(); - } - - private void initNullBytes() { - nullStrs = Sets.newHashSet(); - nullStrs.add(HIVE_NULL); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullStrs.add(s); - } - } - } - - protected boolean isNull(String v) { - return nullStrs.contains(v); - } - - protected byte[] buildKey(String[] flatRow) { - int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); - List<TblColRef> columns = baseCuboid.getColumns(); - String[] colValues = new String[columns.size()]; - for (int i = 0; i < columns.size(); i++) { - colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow); - } - return rowKeyEncoder.encode(colValues); } - private ByteBuffer buildValue(String[] flatRow) { - - for (int i = 0; i < measures.length; i++) { - measures[i] = buildValueOf(i, flatRow); - } - - return measureCodec.encode(measures); - } - - private Object buildValueOf(int idxOfMeasure, String[] flatRow) { - MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int colParamIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow); - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private String getCell(int i, String[] flatRow) { - if (isNull(flatRow[i])) - return null; - else - return flatRow[i]; - } protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException { - byte[] rowKey = buildKey(flatRow); + byte[] rowKey = baseCuboidBuilder.buildKey(flatRow); outputKey.set(rowKey, 0, rowKey.length); - ByteBuffer valueBuf = buildValue(flatRow); + ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 01cdd4a..b924edc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -18,29 +18,27 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Collection; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.kv.RowKeyEncoder; -import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; + /** * @author George Song (ysong1) * @@ -59,10 +57,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private int handleCounter; private int skipCounter; - private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; - private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); private RowKeySplitter rowKeySplitter; - private RowKeyEncoderProvider rowKeyEncoderProvider; + + private NDCuboidBuilder ndCuboidBuilder; @Override protected void setup(Context context) throws IOException { @@ -76,48 +73,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); cubeSegment = cube.getSegmentById(segmentID); cubeDesc = cube.getDescriptor(); - + ndCuboidBuilder = new NDCuboidBuilder(cubeSegment); // initialize CubiodScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); - rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); - rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); } - private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { - RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); - - int offset = 0; - - // rowkey columns - long mask = Long.highestOneBit(parentCuboid.getId()); - long parentCuboidId = parentCuboid.getId(); - long childCuboidId = childCuboid.getId(); - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); - int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & parentCuboidId) > 0) {// if the this bit position equals - // 1 - if ((mask & childCuboidId) > 0) {// if the child cuboid has this - // column - System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); - offset += splitBuffers[index].length; - } - index++; - } - mask = mask >> 1; - } - int fullKeySize = rowkeyEncoder.getBytesLength(); - while (newKeyBuf.array().length < fullKeySize) { - newKeyBuf.set(new byte[newKeyBuf.length() * 2]); - } - newKeyBuf.set(0, fullKeySize); - - rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); - - return fullKeySize; - } @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { @@ -143,8 +105,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - int fullKeySize = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); - outputKey.set(newKeyBuf.array(), 0, fullKeySize); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(result.getSecond().array(), 0, result.getFirst()); context.write(outputKey, value); } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/pom.xml ---------------------------------------------------------------------- diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml index a7cffdd..264f4c9 100644 --- a/engine-spark/pom.xml +++ b/engine-spark/pom.xml @@ -47,6 +47,11 @@ <artifactId>kylin-core-job</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-mr</artifactId> + </dependency> + <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java new file mode 100644 index 0000000..a7a4151 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.MRBatchCubingEngine2; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +/** + */ +public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 { + @Override + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { + return new SparkBatchCubingJobBuilder2(newSegment, submitter).build(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java new file mode 100644 index 0000000..57b6432 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark; + +import org.apache.hadoop.util.ClassUtil; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { + + private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class); + + public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { + super(newSegment, submitter); + } + + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { + + } + + @Override + protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { + IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); + final SparkExecutable sparkExecutable = new SparkExecutable(); + sparkExecutable.setClassName(SparkCubingV3.class.getName()); + sparkExecutable.setParam(SparkCubingV3.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkCubingV3.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkCubingV3.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingV3.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME + sparkExecutable.setParam(SparkCubingV3.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); + + StringBuilder jars = new StringBuilder(); + + StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration")); // htrace-core.jar + StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration")); + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection")); // hbase-client.jar + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration")); // hbase-common.jar + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer")); // hbase-protocol.jar + + StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); + sparkExecutable.setJars(jars.toString()); + // sparkExecutable.setJars("/Users/shishaofeng/.m2/repository/org/cloudera/htrace/htrace-core/2.01/htrace-core-2.01.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-protocol/0.98.8-hadoop2/hbase-protocol-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-common/0.98.8-hadoop2/hbase-common-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2//repository/org/apache/hbase/hbase-client/0.98.8-hadoop2/hbase-client-0.98.8-hadoop2.jar"); + + sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark"); + return sparkExecutable; + + } + + private String findJar(String className) { + try { + return ClassUtil.findContainingJar(Class.forName(className)); + } catch (ClassNotFoundException e) { + logger.error("failed to locate jar for class " + className, e); + } + + return ""; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 6e894dd..4dd2276 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -84,6 +84,7 @@ import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -153,6 +154,20 @@ public class SparkCubing extends AbstractApplication { return options; } + public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException { + File metaDir = new File(folder); + if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) { + System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); + logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath()); + kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); + return kylinConfig; + } else { + return KylinConfig.getInstanceFromEnv(); + } + } + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { ClassUtil.addClasspath(confPath); final File[] files = new File(confPath).listFiles(new FileFilter() { @@ -462,7 +477,7 @@ public class SparkCubing extends AbstractApplication { }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); } - private static void prepare() throws Exception { + public static void prepare() throws Exception { final File file = new File(SparkFiles.get("kylin.properties")); final String confPath = file.getParentFile().getAbsolutePath(); System.out.println("conf directory:" + confPath); @@ -526,12 +541,18 @@ public class SparkCubing extends AbstractApplication { } } - private Collection<String> getKyroClasses() { + public static Collection<String> getKyroClasses() { Set<Class> kyroClasses = Sets.newHashSet(); kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class)); + kyroClasses.add(HashMap.class); kyroClasses.add(org.apache.spark.sql.Row[].class); kyroClasses.add(org.apache.spark.sql.Row.class); @@ -541,11 +562,15 @@ public class SparkCubing extends AbstractApplication { kyroClasses.add(org.apache.spark.sql.types.StructField.class); kyroClasses.add(org.apache.spark.sql.types.DateType$.class); kyroClasses.add(org.apache.spark.sql.types.Metadata.class); - kyroClasses.add(Object[].class); kyroClasses.add(org.apache.spark.sql.types.StringType$.class); kyroClasses.add(Hashing.murmur3_128().getClass()); - kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class); + kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class); + kyroClasses.add(Object[].class); + kyroClasses.add(int[].class); + kyroClasses.add(byte[].class); kyroClasses.add(byte[][].class); + kyroClasses.add(String[].class); + kyroClasses.add(String[][].class); kyroClasses.add(org.apache.spark.sql.types.Decimal.class); kyroClasses.add(scala.math.BigDecimal.class); kyroClasses.add(java.math.BigDecimal.class); @@ -553,6 +578,52 @@ public class SparkCubing extends AbstractApplication { kyroClasses.add(java.math.RoundingMode.class); kyroClasses.add(java.util.ArrayList.class); kyroClasses.add(java.util.LinkedList.class); + kyroClasses.add(java.util.HashSet.class); + kyroClasses.add(java.util.LinkedHashSet.class); + kyroClasses.add(java.util.LinkedHashMap.class); + kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class); + + kyroClasses.add(java.util.HashMap.class); + kyroClasses.add(java.util.Properties.class); + kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class); + kyroClasses.add(org.apache.kylin.common.util.Array.class); + kyroClasses.add(org.apache.kylin.metadata.model.Segments.class); + kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class); + kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class); + kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class); + kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class); + kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class); + kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class); + kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); + kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class); + kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class); + kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class); + kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class); ArrayList<String> result = Lists.newArrayList(); for (Class kyroClass : kyroClasses) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java new file mode 100644 index 0000000..6f2915a --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingV3.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.HiveContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses; + +/** + */ +public class SparkCubingV3 extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCubing.class); + + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + + private Options options; + + public SparkCubingV3() { + options = new Options(); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_CONF_PATH); + options.addOption(OPTION_OUTPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { + ClassUtil.addClasspath(confPath); + final File[] files = new File(confPath).listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + if (pathname.getAbsolutePath().endsWith(".xml")) { + return true; + } + if (pathname.getAbsolutePath().endsWith(".properties")) { + return true; + } + return false; + } + }); + for (File file : files) { + sc.addFile(file.getAbsolutePath()); + } + } + + + private static final void prepare() { + final File file = new File(SparkFiles.get("kylin.properties")); + final String confPath = file.getParentFile().getAbsolutePath(); + System.out.println("conf directory:" + confPath); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + ClassUtil.addClasspath(confPath); + } + + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + + SparkConf conf = new SparkConf().setAppName("Cubing Application"); + //serialization conf + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrationRequired", "true"); + final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { + @Override + public boolean apply(@Nullable String input) { + return input != null && input.trim().length() > 0; + } + }); + conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); + + JavaSparkContext sc = new JavaSparkContext(conf); + setupClasspath(sc, confPath); + HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); + + HiveContext sqlContext = new HiveContext(sc.sc()); + final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable); + + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName); + final CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + + final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); + final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); + final Broadcast<BufferedMeasureCodec> vCodec = sc.broadcast(new BufferedMeasureCodec(cubeDesc.getMeasures())); + NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(cubeSegment)); + + final Broadcast<NDCuboidBuilder> vNDCuboidBuilder = sc.broadcast(ndCuboidBuilder); + final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); + + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final int measureNum = cubeDesc.getMeasures().size(); + final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc, + AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), + vCodec.getValue(), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + + boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; + boolean allNormalMeasure = true; + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); + allNormalMeasure = allNormalMeasure && needAggr[i]; + } + logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); + + // encode with dimension encoding, transform to <byte[], Object[]> RDD + final JavaPairRDD<byte[], Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, byte[], Object[]>() { + @Override + public Tuple2<byte[], Object[]> call(Row row) throws Exception { + String[] rowArray = rowToArray(row); + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + return new Tuple2<>(rowKey, result); + } + + private String[] rowToArray(Row row) { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; + } + + }); + + + final CuboidReducerFunction2 reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue()); + CuboidReducerFunction2 baseCuboidReducerFunction = reducerFunction2; + if (allNormalMeasure == false) { + baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), vCodec.getValue(), needAggr); + } + + // aggregate to calculate base cuboid + final JavaPairRDD<byte[], Object[]> baseCuboidRDD = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction); + persistent(baseCuboidRDD, vCodec.getValue(), outputPath, 0, sc.hadoopConfiguration()); + + // aggregate to ND cuboids + final int totalLevels = cubeDesc.getBuildLevel(); + + JavaPairRDD<byte[], Object[]> parentRDD = baseCuboidRDD; + for (int level = 1; level <= totalLevels; level++) { + JavaPairRDD<byte[], Object[]> childRDD = parentRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<byte[], Object[]>, byte[], Object[]>() { + + transient boolean initialized = false; + + RowKeySplitter rowKeySplitter = new RowKeySplitter(vCubeSegment.getValue(), 65, 256); + + @Override + public Iterable<Tuple2<byte[], Object[]>> call(Tuple2<byte[], Object[]> tuple2) throws Exception { + if (initialized == false) { + prepare(); + initialized = true; + } + + List<Tuple2<byte[], Object[]>> tuples = Lists.newArrayList(); + byte[] key = tuple2._1(); + long cuboidId = rowKeySplitter.split(key); + Cuboid parentCuboid = Cuboid.findById(vCubeDesc.getValue(), cuboidId); + + Collection<Long> myChildren = vCuboidScheduler.getValue().getSpanningCuboid(cuboidId); + + // if still empty or null + if (myChildren == null || myChildren.size() == 0) { + return tuples; + } + + for (Long child : myChildren) { + Cuboid childCuboid = Cuboid.findById(vCubeDesc.getValue(), child); + Pair<Integer, ByteArray> result = vNDCuboidBuilder.getValue().buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + + byte[] newKey = new byte[result.getFirst()]; + System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); + + tuples.add(new Tuple2<>(newKey, tuple2._2())); + } + + return tuples; + } + }).reduceByKey(reducerFunction2); + + // persistent rdd to hdfs + persistent(childRDD, vCodec.getValue(), outputPath, level, sc.hadoopConfiguration()); + parentRDD = childRDD; + } + + logger.info("Finished on calculating all level cuboids."); + + } + + private void persistent(final JavaPairRDD<byte[], Object[]> rdd, final BufferedMeasureCodec codec, final String hdfsBaseLocation, int level, Configuration conf) { + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + final JavaPairRDD<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> serializedRDD = rdd.mapToPair(new PairFunction<Tuple2<byte[], Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<byte[], Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1()), new org.apache.hadoop.io.Text(encodedBytes)); + } + }); + logger.debug("Persisting RDD for level " + level + " into " + cuboidOutputPath); + serializedRDD.saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + logger.debug("Done: persisting RDD for level " + level); + } + + class CuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { + BufferedMeasureCodec codec; + CubeDesc cubeDesc; + int measureNum; + transient ThreadLocal<MeasureAggregators> current = new ThreadLocal<>(); + + CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec) { + this.codec = codec; + this.cubeDesc = cubeDesc; + this.measureNum = measureNum; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (current.get() == null) { + current.set(new MeasureAggregators(cubeDesc.getMeasures())); + } + Object[] result = new Object[measureNum]; + current.get().reset(); + current.get().aggregate(input1); + current.get().aggregate(input2); + current.get().collectStates(result); + return result; + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + current = new ThreadLocal(); + } + } + + class BaseCuboidReducerFunction2 extends CuboidReducerFunction2 { + boolean[] needAggr; + + BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, BufferedMeasureCodec codec, boolean[] needAggr) { + super(measureNum, cubeDesc, codec); + this.needAggr = needAggr; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + if (current.get() == null) { + current.set(new MeasureAggregators(cubeDesc.getMeasures())); + } + current.get().reset(); + Object[] result = new Object[measureNum]; + current.get().aggregate(input1, needAggr); + current.get().aggregate(input2, needAggr); + current.get().collectStates(result); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 7c88372..644f73f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.Logger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -38,11 +39,16 @@ public class SparkExecutable extends AbstractExecutable { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutable.class); private static final String CLASS_NAME = "className"; + private static final String JARS = "jars"; public void setClassName(String className) { this.setParam(CLASS_NAME, className); } + public void setJars(String jars) { + this.setParam(JARS, jars); + } + private String formatArgs() { StringBuilder stringBuilder = new StringBuilder(); for (Map.Entry<String, String> entry : getParams().entrySet()) { @@ -50,6 +56,9 @@ public class SparkExecutable extends AbstractExecutable { tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" "); if (entry.getKey().equals(CLASS_NAME)) { stringBuilder.insert(0, tmp); + } else if (entry.getKey().equals(JARS)) { + // JARS is for spark-submit, not for app + continue; } else { stringBuilder.append(tmp); } @@ -65,12 +74,22 @@ public class SparkExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final KylinConfig config = context.getConfig(); Preconditions.checkNotNull(config.getSparkHome()); - Preconditions.checkNotNull(config.getSparkMaster()); + Preconditions.checkNotNull(config.getKylinJobJarPath()); + String sparkConf = config.getSparkConfFile(); + String jars = this.getParam(JARS); + + String jobJar = config.getKylinJobJarPath(); + + if (StringUtils.isEmpty(jars)) { + jars = jobJar; + } + try { - String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --master %s %s %s", config.getSparkHome(), config.getSparkMaster(), config.getKylinSparkJobJarPath(), formatArgs()); + String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs()); logger.info("cmd:" + cmd); final StringBuilder output = new StringBuilder(); - config.getCliCommandExecutor().execute(cmd, new Logger() { + CliCommandExecutor exec = new CliCommandExecutor(); + exec.execute(cmd, new Logger() { @Override public void log(String message) { output.append(message); @@ -84,4 +103,5 @@ public class SparkExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/examples/test_case_data/sandbox/kylin-spark-conf.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-spark-conf.properties b/examples/test_case_data/sandbox/kylin-spark-conf.properties new file mode 100644 index 0000000..ca65994 --- /dev/null +++ b/examples/test_case_data/sandbox/kylin-spark-conf.properties @@ -0,0 +1,27 @@ +spark.yarn.submit.file.replication=1 +spark.yarn.executor.memoryOverhead=200 +spark.yarn.driver.memoryOverhead=384 +#spark.master=local[4] +spark.master=yarn +spark.submit.deployMode=cluster +spark.eventLog.enabled=true +spark.yarn.scheduler.heartbeat.interval-ms=5000 +spark.yarn.preserve.staging.files=true +spark.yarn.queue=default +spark.yarn.containerLauncherMaxThreads=25 +spark.yarn.max.executor.failures=3 +spark.eventLog.dir=hdfs\:///spark-history +spark.history.kerberos.enabled=true +spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider +spark.history.ui.port=18080 +spark.history.fs.logDirectory=hdfs\:///spark-history +spark.executor.memory=1G +spark.storage.memoryFraction=0.3 +spark.executor.cores=1 +spark.executor.instances=1 +spark.history.kerberos.keytab=none +spark.history.kerberos.principal=none +spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +spark.driver.extraJavaOptions=-Dhdp.version=current +spark.yarn.am.extraJavaOptions=-Dhdp.version=current +spark.executor.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 20bc427..db8eb7a 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -63,7 +63,7 @@ kylin.job.retry=0 # you will have to specify kylin.job.remote-cli-hostname, kylin.job.remote-cli-username and kylin.job.remote-cli-password # It should not be set to "true" unless you're NOT running Kylin.sh on a hadoop client machine # (Thus kylin instance has to ssh to another real hadoop client machine to execute hbase,hive,hadoop commands) -kylin.job.use-remote-cli=false +kylin.job.use-remote-cli=true # Only necessary when kylin.job.use-remote-cli=true kylin.job.remote-cli-hostname=sandbox @@ -154,3 +154,12 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600 # Env DEV|QA|PROD kylin.env=DEV +kylin.source.hive.keep-flat-table=true + +### Spark ### +#kylin.engine.spark.spark-home=/usr/hdp/2.2.4.2-2/spark +#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf +kylin.engine.spark.env.hadoop-conf-dir=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox +kylin.engine.spark.spark-home=/Users/shishaofeng/spark-1.6.3-bin-hadoop2.6 +kylin.engine.spark.properties-file=/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/kylin-spark-conf.properties +kylin.engine.spark.conf.jars= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2ff4c54..73a4b4d 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ <commons-math3.version>3.6.1</commons-math3.version> <!-- Spark --> - <spark.version>1.3.0</spark.version> + <spark.version>1.6.3</spark.version> <!-- Utility --> <log4j.version>1.2.17</log4j.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 978f477..f5b9f7e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -327,10 +327,10 @@ public class CubeController extends BasicController { throw new InternalErrorException("Cannot find cube '" + cubeName + "'"); } - if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) { - int num = cube.getBuildingSegments().size(); - throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first.")); - } +// if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) { +// int num = cube.getBuildingSegments().size(); +// throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first.")); +// } return cubeService.purgeCube(cube); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/78e6cd5b/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index cf92fb1..f3b61ef 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -249,6 +249,13 @@ <scope>provided</scope> </dependency> + <!-- Spark dependency --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId>