http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java new file mode 100644 index 0000000..07b636b --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import com.google.common.collect.Sets; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Dictionary; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.ParameterDesc; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +public class BaseCuboidBuilder implements java.io.Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidBuilder.class); + public static final String HIVE_NULL = "\\N"; + protected String cubeName; + protected Cuboid baseCuboid; + protected CubeDesc cubeDesc; + protected CubeSegment cubeSegment; + protected Set<String> nullStrs; + protected CubeJoinedFlatTableEnrich intermediateTableDesc; + protected MeasureIngester<?>[] aggrIngesters; + protected Map<TblColRef, Dictionary<String>> dictionaryMap; + protected AbstractRowKeyEncoder rowKeyEncoder; + protected BufferedMeasureCodec measureCodec; + + protected KylinConfig kylinConfig; + + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc, + AbstractRowKeyEncoder rowKeyEncoder, MeasureIngester<?>[] aggrIngesters, Map<TblColRef, Dictionary<String>> dictionaryMap) { + this.kylinConfig = kylinConfig; + this.cubeDesc = cubeDesc; + this.cubeSegment = cubeSegment; + this.intermediateTableDesc = intermediateTableDesc; + this.rowKeyEncoder = rowKeyEncoder; + this.aggrIngesters = aggrIngesters; + this.dictionaryMap = dictionaryMap; + + init(); + measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + } + + public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) { + this.kylinConfig = kylinConfig; + this.cubeDesc = cubeDesc; + this.cubeSegment = cubeSegment; + this.intermediateTableDesc = intermediateTableDesc; + + init(); + rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); + measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); + dictionaryMap = cubeSegment.buildDictionaryMap(); + + } + + private void init() { + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + initNullBytes(); + } + + private void initNullBytes() { + nullStrs = Sets.newHashSet(); + nullStrs.add(HIVE_NULL); + String[] nullStrings = cubeDesc.getNullStrings(); + if (nullStrings != null) { + for (String s : nullStrings) { + nullStrs.add(s); + } + } + } + + protected boolean isNull(String v) { + return nullStrs.contains(v); + } + + public byte[] buildKey(String[] flatRow) { + int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + List<TblColRef> columns = baseCuboid.getColumns(); + String[] colValues = new String[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow); + } + return rowKeyEncoder.encode(colValues); + } + + public ByteBuffer buildValue(String[] flatRow) { + return measureCodec.encode(buildValueObjects(flatRow)); + } + + public Object[] buildValueObjects(String[] flatRow) { + Object[] measures = new Object[cubeDesc.getMeasures().size()]; + for (int i = 0; i < measures.length; i++) { + measures[i] = buildValueOf(i, flatRow); + } + + return measures; + } + + public void resetAggrs() { + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + aggrIngesters[i].reset(); + } + } + + private Object buildValueOf(int idxOfMeasure, String[] flatRow) { + MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); + FunctionDesc function = measure.getFunction(); + int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; + + int paramCount = function.getParameterCount(); + String[] inputToMeasure = new String[paramCount]; + + // pick up parameter values + ParameterDesc param = function.getParameter(); + int colParamIdx = 0; // index among parameters of column type + for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { + String value; + if (function.isCount()) { + value = "1"; + } else if (param.isColumnType()) { + value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow); + } else { + value = param.getValue(); + } + inputToMeasure[i] = value; + } + + return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); + } + + private String getCell(int i, String[] flatRow) { + if (isNull(flatRow[i])) + return null; + else + return flatRow[i]; + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index ffba181..4011915 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -262,6 +262,11 @@ public class CubeStatsReader { return ret; } + public List<Long> getCuboidsByLayer(int level) { + List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer(); + return layeredCuboids.get(level); + } + private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) { long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc()); int dimensionCount = Long.bitCount(baseCuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java new file mode 100644 index 0000000..4e98618 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.cube.kv.RowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + */ +public class NDCuboidBuilder implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(NDCuboidBuilder.class); + protected String cubeName; + protected String segmentID; + protected CubeSegment cubeSegment; + private RowKeySplitter rowKeySplitter; + private RowKeyEncoderProvider rowKeyEncoderProvider; + private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; + private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); + + public NDCuboidBuilder(CubeSegment cubeSegment) { + this.cubeSegment = cubeSegment; + this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + this.rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); + } + + public NDCuboidBuilder(CubeSegment cubeSegment, RowKeyEncoderProvider rowKeyEncoderProvider) { + this.cubeSegment = cubeSegment; + this.rowKeyEncoderProvider = rowKeyEncoderProvider; + this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + } + + + public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { + RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); + + int offset = 0; + + // rowkey columns + long mask = Long.highestOneBit(parentCuboid.getId()); + long parentCuboidId = parentCuboid.getId(); + long childCuboidId = childCuboid.getId(); + long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); + int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId + for (int i = 0; i < parentCuboidIdActualLength; i++) { + if ((mask & parentCuboidId) > 0) {// if the this bit position equals + // 1 + if ((mask & childCuboidId) > 0) {// if the child cuboid has this + // column + System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); + offset += splitBuffers[index].length; + } + index++; + } + mask = mask >> 1; + } + + int fullKeySize = rowkeyEncoder.getBytesLength(); + while (newKeyBuf.array().length < fullKeySize) { + newKeyBuf.set(new byte[newKeyBuf.length() * 2]); + } + newKeyBuf.set(0, fullKeySize); + + rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); + + return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 7b719e0..d08e29a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -18,38 +18,25 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.MeasureIngester; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; /** */ @@ -59,131 +46,37 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K public static final byte[] ONE = Bytes.toBytes("1"); protected String cubeName; protected String segmentID; - protected Cuboid baseCuboid; protected CubeInstance cube; protected CubeDesc cubeDesc; protected CubeSegment cubeSegment; - protected Set<String> nullStrs; - protected CubeJoinedFlatTableEnrich intermediateTableDesc; - protected String intermediateTableRowDelimiter; - protected byte byteRowDelimiter; protected int counter; - protected MeasureIngester<?>[] aggrIngesters; - protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected Object[] measures; - protected AbstractRowKeyEncoder rowKeyEncoder; - protected BufferedMeasureCodec measureCodec; private int errorRecordCounter; protected Text outputKey = new Text(); protected Text outputValue = new Text(); + private BaseCuboidBuilder baseCuboidBuilder; + @Override protected void setup(Context context) throws IOException { super.bindCurrentConfiguration(context.getConfiguration()); - cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); - intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); - if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { - throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); - } - - byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0]; - - KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(); - - cube = CubeManager.getInstance(config).getCube(cubeName); + final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata(); + cube = CubeManager.getInstance(kylinConfig).getCube(cubeName); cubeDesc = cube.getDescriptor(); cubeSegment = cube.getSegmentById(segmentID); + CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc); - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - - intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - - rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); - - measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures()); - measures = new Object[cubeDesc.getMeasures().size()]; - - aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures()); - dictionaryMap = cubeSegment.buildDictionaryMap(); - - initNullBytes(); - } - - private void initNullBytes() { - nullStrs = Sets.newHashSet(); - nullStrs.add(HIVE_NULL); - String[] nullStrings = cubeDesc.getNullStrings(); - if (nullStrings != null) { - for (String s : nullStrings) { - nullStrs.add(s); - } - } - } - - protected boolean isNull(String v) { - return nullStrs.contains(v); - } - - protected byte[] buildKey(String[] flatRow) { - int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); - List<TblColRef> columns = baseCuboid.getColumns(); - String[] colValues = new String[columns.size()]; - for (int i = 0; i < columns.size(); i++) { - colValues[i] = getCell(rowKeyColumnIndexes[i], flatRow); - } - return rowKeyEncoder.encode(colValues); } - private ByteBuffer buildValue(String[] flatRow) { - - for (int i = 0; i < measures.length; i++) { - measures[i] = buildValueOf(i, flatRow); - } - - return measureCodec.encode(measures); - } - - private Object buildValueOf(int idxOfMeasure, String[] flatRow) { - MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure); - FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; - - int paramCount = function.getParameterCount(); - String[] inputToMeasure = new String[paramCount]; - - // pick up parameter values - ParameterDesc param = function.getParameter(); - int colParamIdx = 0; // index among parameters of column type - for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { - String value; - if (function.isCount()) { - value = "1"; - } else if (param.isColumnType()) { - value = getCell(colIdxOnFlatTable[colParamIdx++], flatRow); - } else { - value = param.getValue(); - } - inputToMeasure[i] = value; - } - - return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap); - } - - private String getCell(int i, String[] flatRow) { - if (isNull(flatRow[i])) - return null; - else - return flatRow[i]; - } protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException { - byte[] rowKey = buildKey(flatRow); + byte[] rowKey = baseCuboidBuilder.buildKey(flatRow); outputKey.set(rowKey, 0, rowKey.length); - ByteBuffer valueBuf = buildValue(flatRow); + ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow); outputValue.set(valueBuf.array(), 0, valueBuf.position()); context.write(outputKey, outputValue); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index 01cdd4a..b924edc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -18,29 +18,27 @@ package org.apache.kylin.engine.mr.steps; -import java.io.IOException; -import java.util.Collection; - import org.apache.hadoop.io.Text; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.common.RowKeySplitter; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.kv.RowKeyEncoder; -import org.apache.kylin.cube.kv.RowKeyEncoderProvider; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collection; + /** * @author George Song (ysong1) * @@ -59,10 +57,9 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { private int handleCounter; private int skipCounter; - private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE]; - private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE); private RowKeySplitter rowKeySplitter; - private RowKeyEncoderProvider rowKeyEncoderProvider; + + private NDCuboidBuilder ndCuboidBuilder; @Override protected void setup(Context context) throws IOException { @@ -76,48 +73,13 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); cubeSegment = cube.getSegmentById(segmentID); cubeDesc = cube.getDescriptor(); - + ndCuboidBuilder = new NDCuboidBuilder(cubeSegment); // initialize CubiodScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); - rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); - rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment); } - private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) { - RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid); - - int offset = 0; - - // rowkey columns - long mask = Long.highestOneBit(parentCuboid.getId()); - long parentCuboidId = parentCuboid.getId(); - long childCuboidId = childCuboid.getId(); - long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId()); - int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId - for (int i = 0; i < parentCuboidIdActualLength; i++) { - if ((mask & parentCuboidId) > 0) {// if the this bit position equals - // 1 - if ((mask & childCuboidId) > 0) {// if the child cuboid has this - // column - System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length); - offset += splitBuffers[index].length; - } - index++; - } - mask = mask >> 1; - } - int fullKeySize = rowkeyEncoder.getBytesLength(); - while (newKeyBuf.array().length < fullKeySize) { - newKeyBuf.set(new byte[newKeyBuf.length() * 2]); - } - newKeyBuf.set(0, fullKeySize); - - rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf); - - return fullKeySize; - } @Override public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException { @@ -143,8 +105,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { for (Long child : myChildren) { Cuboid childCuboid = Cuboid.findById(cubeDesc, child); - int fullKeySize = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); - outputKey.set(newKeyBuf.array(), 0, fullKeySize); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + outputKey.set(result.getSecond().array(), 0, result.getFirst()); context.write(outputKey, value); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java index 97dd750..29ca9b8 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java @@ -41,7 +41,6 @@ import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.junit.After; @@ -161,7 +160,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase { } private Text newValueText(BufferedMeasureCodec codec, String sum, String min, String max, int count, int item_count) { - Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new LongMutable(count), new LongMutable(item_count) }; + Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new Long(count), new Long(item_count) }; ByteBuffer buf = codec.encode(values); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/pom.xml ---------------------------------------------------------------------- diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml index 504a966..a931fac 100644 --- a/engine-spark/pom.xml +++ b/engine-spark/pom.xml @@ -47,6 +47,11 @@ <artifactId>kylin-core-job</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-mr</artifactId> + </dependency> + <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> @@ -115,6 +120,10 @@ <artifactId>maven-model</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java new file mode 100644 index 0000000..a7a4151 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine2.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark; + +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.MRBatchCubingEngine2; +import org.apache.kylin.job.execution.DefaultChainedExecutable; + +/** + */ +public class SparkBatchCubingEngine2 extends MRBatchCubingEngine2 { + @Override + public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { + return new SparkBatchCubingJobBuilder2(newSegment, submitter).build(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java new file mode 100644 index 0000000..9532d31 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark; + +import org.apache.hadoop.util.ClassUtil; +import org.apache.kylin.common.util.StringUtil; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class SparkBatchCubingJobBuilder2 extends BatchCubingJobBuilder2 { + + private static final Logger logger = LoggerFactory.getLogger(SparkBatchCubingJobBuilder2.class); + + public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { + super(newSegment, submitter); + } + + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { + + } + + @Override + protected AbstractExecutable createInMemCubingStep(String jobId, String cuboidRootPath) { + IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); + final SparkExecutable sparkExecutable = new SparkExecutable(); + sparkExecutable.setClassName(SparkCubingByLayer.class.getName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), seg.getUuid()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_PATH.getOpt(), flatTableDesc.getTableName()); + sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), "/Users/shishaofeng/workspace/kylin-15/examples/test_case_data/sandbox/"); //FIXME + sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), cuboidRootPath); + + StringBuilder jars = new StringBuilder(); + + StringUtil.appendWithSeparator(jars, findJar("org.htrace.HTraceConfiguration")); // htrace-core.jar + StringUtil.appendWithSeparator(jars, findJar("org.cloudera.htrace.HTraceConfiguration")); + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.client.HConnection")); // hbase-client.jar + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.HBaseConfiguration")); // hbase-common.jar + StringUtil.appendWithSeparator(jars, findJar("org.apache.hadoop.hbase.util.ByteStringer")); // hbase-protocol.jar + + StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars()); + sparkExecutable.setJars(jars.toString()); + // sparkExecutable.setJars("/Users/shishaofeng/.m2/repository/org/cloudera/htrace/htrace-core/2.01/htrace-core-2.01.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-protocol/0.98.8-hadoop2/hbase-protocol-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2/repository/org/apache/hbase/hbase-common/0.98.8-hadoop2/hbase-common-0.98.8-hadoop2.jar,/Users/shishaofeng/.m2//repository/org/apache/hbase/hbase-client/0.98.8-hadoop2/hbase-client-0.98.8-hadoop2.jar"); + + sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE + " with Spark"); + return sparkExecutable; + + } + + private String findJar(String className) { + try { + return ClassUtil.findContainingJar(Class.forName(className)); + } catch (ClassNotFoundException e) { + logger.error("failed to locate jar for class " + className, e); + } + + return ""; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 6e894dd..f06c338 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -84,6 +84,7 @@ import org.apache.kylin.engine.spark.util.IteratorUtils; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -153,6 +154,20 @@ public class SparkCubing extends AbstractApplication { return options; } + public static KylinConfig loadKylinPropsAndMetadata(String folder) throws IOException { + File metaDir = new File(folder); + if (!metaDir.getAbsolutePath().equals(System.getProperty(KylinConfig.KYLIN_CONF))) { + System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath()); + logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath()); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + System.out.println("setting metadataUrl to " + metaDir.getAbsolutePath()); + kylinConfig.setMetadataUrl(metaDir.getAbsolutePath()); + return kylinConfig; + } else { + return KylinConfig.getInstanceFromEnv(); + } + } + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { ClassUtil.addClasspath(confPath); final File[] files = new File(confPath).listFiles(new FileFilter() { @@ -462,7 +477,7 @@ public class SparkCubing extends AbstractApplication { }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf); } - private static void prepare() throws Exception { + public static void prepare() throws Exception { final File file = new File(SparkFiles.get("kylin.properties")); final String confPath = file.getParentFile().getAbsolutePath(); System.out.println("conf directory:" + confPath); @@ -526,12 +541,18 @@ public class SparkCubing extends AbstractApplication { } } - private Collection<String> getKyroClasses() { + public static Collection<String> getKyroClasses() { Set<Class> kyroClasses = Sets.newHashSet(); kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class)); kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class)); + kyroClasses.addAll(new Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class)); + kyroClasses.add(HashMap.class); kyroClasses.add(org.apache.spark.sql.Row[].class); kyroClasses.add(org.apache.spark.sql.Row.class); @@ -541,11 +562,15 @@ public class SparkCubing extends AbstractApplication { kyroClasses.add(org.apache.spark.sql.types.StructField.class); kyroClasses.add(org.apache.spark.sql.types.DateType$.class); kyroClasses.add(org.apache.spark.sql.types.Metadata.class); - kyroClasses.add(Object[].class); kyroClasses.add(org.apache.spark.sql.types.StringType$.class); kyroClasses.add(Hashing.murmur3_128().getClass()); - kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class); + kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class); + kyroClasses.add(Object[].class); + kyroClasses.add(int[].class); + kyroClasses.add(byte[].class); kyroClasses.add(byte[][].class); + kyroClasses.add(String[].class); + kyroClasses.add(String[][].class); kyroClasses.add(org.apache.spark.sql.types.Decimal.class); kyroClasses.add(scala.math.BigDecimal.class); kyroClasses.add(java.math.BigDecimal.class); @@ -553,6 +578,61 @@ public class SparkCubing extends AbstractApplication { kyroClasses.add(java.math.RoundingMode.class); kyroClasses.add(java.util.ArrayList.class); kyroClasses.add(java.util.LinkedList.class); + kyroClasses.add(java.util.HashSet.class); + kyroClasses.add(java.util.LinkedHashSet.class); + kyroClasses.add(java.util.LinkedHashMap.class); + kyroClasses.add(java.util.TreeMap.class); + kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class); + + kyroClasses.add(java.util.HashMap.class); + kyroClasses.add(java.util.Properties.class); + kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class); + kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class); + kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class); + kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class); + kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class); + kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class); + kyroClasses.add(org.apache.kylin.common.util.Array.class); + kyroClasses.add(org.apache.kylin.metadata.model.Segments.class); + kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class); + kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class); + kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class); + kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class); + kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class); + kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class); + kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class); + kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class); + kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class); + kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class); + kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class); + kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class); + kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class); + kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class); + kyroClasses.add(org.apache.kylin.measure.topn.DoubleDeltaSerializer.class); + kyroClasses.add(org.apache.kylin.measure.topn.Counter.class); + + try { + kyroClasses.add(Class.forName("com.google.common.collect.EmptyImmutableList")); + } catch (ClassNotFoundException e) { + logger.error("failed to load class", e); + } ArrayList<String> result = Lists.newArrayList(); for (Class kyroClass : kyroClasses) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java new file mode 100644 index 0000000..53c1f96 --- /dev/null +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.engine.spark; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.AbstractApplication; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.common.RowKeySplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; +import org.apache.kylin.cube.kv.RowKeyEncoderProvider; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; +import org.apache.kylin.engine.mr.BatchCubingJobBuilder2; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.common.BaseCuboidBuilder; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.NDCuboidBuilder; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.measure.MeasureIngester; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkFiles; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.HiveContext; +import org.apache.spark.storage.StorageLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FileFilter; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses; + +/** + */ +public class SparkCubingByLayer extends AbstractApplication implements Serializable { + + protected static final Logger logger = LoggerFactory.getLogger(SparkCubingByLayer.class); + + public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable"); + public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg().isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME); + public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true).withDescription("Cube Segment Id").create("segmentId"); + public static final Option OPTION_CONF_PATH = OptionBuilder.withArgName("confPath").hasArg().isRequired(true).withDescription("Configuration Path").create("confPath"); + public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg().isRequired(true).withDescription("Cube output path").create(BatchConstants.ARG_OUTPUT); + + private Options options; + + public SparkCubingByLayer() { + options = new Options(); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_CONF_PATH); + options.addOption(OPTION_OUTPUT_PATH); + } + + @Override + protected Options getOptions() { + return options; + } + + private void setupClasspath(JavaSparkContext sc, String confPath) throws Exception { + ClassUtil.addClasspath(confPath); + final File[] files = new File(confPath).listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + if (pathname.getAbsolutePath().endsWith(".xml")) { + return true; + } + if (pathname.getAbsolutePath().endsWith(".properties")) { + return true; + } + return false; + } + }); + for (File file : files) { + sc.addFile(file.getAbsolutePath()); + } + } + + private static final void prepare() { + final File file = new File(SparkFiles.get("kylin.properties")); + final String confPath = file.getParentFile().getAbsolutePath(); + logger.info("conf directory:" + confPath); + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + ClassUtil.addClasspath(confPath); + } + + @Override + protected void execute(OptionsHelper optionsHelper) throws Exception { + final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH); + final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME); + final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID); + final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH); + final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH); + + SparkConf conf = new SparkConf().setAppName("Cubing Application"); + //serialization conf + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.set("spark.kryo.registrationRequired", "true"); + final Iterable<String> allClasses = Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()), new Predicate<String>() { + @Override + public boolean apply(@Nullable String input) { + return input != null && input.trim().length() > 0; + } + }); + conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ",")); + + JavaSparkContext sc = new JavaSparkContext(conf); + setupClasspath(sc, confPath); + HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath)); + + HiveContext sqlContext = new HiveContext(sc.sc()); + final DataFrame intermediateTable = sqlContext.table(hiveTable); + + System.setProperty(KylinConfig.KYLIN_CONF, confPath); + final KylinConfig envConfig = KylinConfig.getInstanceFromEnv(); + final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName); + final CubeDesc cubeDesc = cubeInstance.getDescriptor(); + final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); + final CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); + + final KylinConfig kylinConfig = cubeDesc.getConfig(); + final Broadcast<CubeDesc> vCubeDesc = sc.broadcast(cubeDesc); + final Broadcast<CubeSegment> vCubeSegment = sc.broadcast(cubeSegment); + final NDCuboidBuilder ndCuboidBuilder = new NDCuboidBuilder(vCubeSegment.getValue(), new RowKeyEncoderProvider(vCubeSegment.getValue())); + + final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new CuboidScheduler(vCubeDesc.getValue())); + + final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + final int measureNum = cubeDesc.getMeasures().size(); + final BaseCuboidBuilder baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), cubeSegment.buildDictionaryMap()); + + int countMeasureIndex = 0; + for (MeasureDesc measureDesc : cubeDesc.getMeasures()) { + if (measureDesc.getFunction().isCount() == true) { + break; + } else { + countMeasureIndex++; + } + } + final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig); + boolean[] needAggr = new boolean[cubeDesc.getMeasures().size()]; + boolean allNormalMeasure = true; + for (int i = 0; i < cubeDesc.getMeasures().size(); i++) { + needAggr[i] = !cubeDesc.getMeasures().get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid(); + allNormalMeasure = allNormalMeasure && needAggr[i]; + } + logger.info("All measure are normal (agg on all cuboids) ? : " + allNormalMeasure); + + StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); + + // encode with dimension encoding, transform to <ByteArray, Object[]> RDD + final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, Object[]>() { + transient boolean initialized = false; + + @Override + public Tuple2<ByteArray, Object[]> call(Row row) throws Exception { + if (initialized == false) { + prepare(); + initialized = true; + } + + String[] rowArray = rowToArray(row); + baseCuboidBuilder.resetAggrs(); + byte[] rowKey = baseCuboidBuilder.buildKey(rowArray); + Object[] result = baseCuboidBuilder.buildValueObjects(rowArray); + return new Tuple2<>(new ByteArray(rowKey), result); + } + + private String[] rowToArray(Row row) { + String[] result = new String[row.size()]; + for (int i = 0; i < row.size(); i++) { + final Object o = row.get(i); + if (o != null) { + result[i] = o.toString(); + } else { + result[i] = null; + } + } + return result; + } + + }); + + logger.info("encodedBaseRDD partition number: " + encodedBaseRDD.getNumPartitions()); + Long totalCount = 0L; + if (kylinConfig.isSparkSanityCheckEnabled()) { + totalCount = encodedBaseRDD.count(); + logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count()); + } + + final MeasureAggregators measureAggregators = new MeasureAggregators(cubeDesc.getMeasures()); + final BaseCuboidReducerFunction2 baseCuboidReducerFunction = new BaseCuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators); + BaseCuboidReducerFunction2 reducerFunction2 = baseCuboidReducerFunction; + if (allNormalMeasure == false) { + reducerFunction2 = new CuboidReducerFunction2(measureNum, vCubeDesc.getValue(), measureAggregators, needAggr); + } + + final int totalLevels = cubeDesc.getBuildLevel(); + JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels]; + int level = 0; + int partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); + + // aggregate to calculate base cuboid + allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel); + saveToHDFS(allRDDs[0], vCubeDesc.getValue(), outputPath, 0, sc.hadoopConfiguration()); + + // aggregate to ND cuboids + PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> flatMapFunction = new CuboidFlatMap(vCubeSegment.getValue(), vCubeDesc.getValue(), vCuboidScheduler.getValue(), ndCuboidBuilder); + + for (level = 1; level < totalLevels; level++) { + partition = estimateRDDPartitionNum(level, cubeStatsReader, kylinConfig); + logger.info("Level " + level + " partition number: " + partition); + allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition).persist(storageLevel); + if (kylinConfig.isSparkSanityCheckEnabled() == true) { + sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex); + } + saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, level, sc.hadoopConfiguration()); + allRDDs[level - 1].unpersist(); + } + allRDDs[totalLevels - 1].unpersist(); + logger.info("Finished on calculating all level cuboids."); + } + + private static int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) { + double baseCuboidSize = statsReader.estimateLayerSize(level); + float rddCut = kylinConfig.getSparkRDDPartitionCutMB(); + int partition = (int) (baseCuboidSize / rddCut); + partition = Math.max(kylinConfig.getSparkMinPartition(), partition); + partition = Math.min(kylinConfig.getSparkMaxPartition(), partition); + logger.debug("Estimated level " + level + " partition number: " + partition); + return partition; + } + + private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, Configuration conf) { + final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level); + rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() { + BufferedMeasureCodec codec = new BufferedMeasureCodec(cubeDesc.getMeasures()); + @Override + public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + ByteBuffer valueBuf = codec.encode(tuple2._2()); + byte[] encodedBytes = new byte[valueBuf.position()]; + System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position()); + return new Tuple2<>(new org.apache.hadoop.io.Text(tuple2._1().array()), new org.apache.hadoop.io.Text(encodedBytes)); + } + }).saveAsNewAPIHadoopFile(cuboidOutputPath, org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, SequenceFileOutputFormat.class, conf); + logger.info("Persisting RDD for level " + level + " into " + cuboidOutputPath); + } + + class BaseCuboidReducerFunction2 implements Function2<Object[], Object[], Object[]> { + CubeDesc cubeDesc; + int measureNum; + MeasureAggregators aggregators; + + BaseCuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, MeasureAggregators aggregators) { + this.cubeDesc = cubeDesc; + this.measureNum = measureNum; + this.aggregators = aggregators; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + Object[] result = new Object[measureNum]; + aggregators.aggregate(input1, input2, result); + return result; + } + } + + class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 { + boolean[] needAggr; + + CuboidReducerFunction2(int measureNum, CubeDesc cubeDesc, MeasureAggregators aggregators, boolean[] needAggr) { + super(measureNum, cubeDesc, aggregators); + this.needAggr = needAggr; + } + + @Override + public Object[] call(Object[] input1, Object[] input2) throws Exception { + Object[] result = new Object[measureNum]; + aggregators.aggregate(input1, input2, result, needAggr); + return result; + } + } + + private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> EMTPY_ITERATOR = Lists.newArrayListWithCapacity(0); + + class CuboidFlatMap implements PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> { + + CubeSegment cubeSegment; + CubeDesc cubeDesc; + CuboidScheduler cuboidScheduler; + NDCuboidBuilder ndCuboidBuilder; + RowKeySplitter rowKeySplitter; + transient boolean initialized = false; + + CuboidFlatMap(CubeSegment cubeSegment, CubeDesc cubeDesc, CuboidScheduler cuboidScheduler, NDCuboidBuilder ndCuboidBuilder) { + this.cubeSegment = cubeSegment; + this.cubeDesc = cubeDesc; + this.cuboidScheduler = cuboidScheduler; + this.ndCuboidBuilder = ndCuboidBuilder; + this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256); + } + + @Override + public Iterable<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception { + if (initialized == false) { + prepare(); + initialized = true; + } + + byte[] key = tuple2._1().array(); + long cuboidId = rowKeySplitter.split(key); + Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId); + + Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId); + + // if still empty or null + if (myChildren == null || myChildren.size() == 0) { + return EMTPY_ITERATOR; + } + + List<Tuple2<ByteArray, Object[]>> tuples = Lists.newArrayListWithCapacity(myChildren.size()); + for (Long child : myChildren) { + Cuboid childCuboid = Cuboid.findById(cubeDesc, child); + Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers()); + + byte[] newKey = new byte[result.getFirst()]; + System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst()); + + tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2())); + } + + return tuples; + } + } + + //sanity check + + private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long totalCount, int thisLevel, CubeStatsReader cubeStatsReader, final int countMeasureIndex) { + int thisCuboidNum = cubeStatsReader.getCuboidsByLayer(thisLevel).size(); + Long count2 = getRDDCountSum(rdd, countMeasureIndex); + if (count2 != totalCount * thisCuboidNum) { + throw new IllegalStateException(String.format("Sanity check failed, level %s, total count(*) is %s; cuboid number %s", thisLevel, count2, thisCuboidNum)); + } else { + logger.info("sanity check success for level " + thisLevel + ", count(*) is " + (count2 / thisCuboidNum)); + } + } + + private Long getRDDCountSum(JavaPairRDD<ByteArray, Object[]> rdd, final int countMeasureIndex) { + final ByteArray ONE = new ByteArray(); + Long count = rdd.mapValues(new Function<Object[], Long>() { + @Override + public Long call(Object[] objects) throws Exception { + return (Long) objects[countMeasureIndex]; // assume the first measure is COUNT(*) + } + }).reduce(new Function2<Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>, Tuple2<ByteArray, Long>>() { + @Override + public Tuple2<ByteArray, Long> call(Tuple2<ByteArray, Long> longTuple2, Tuple2<ByteArray, Long> longTuple22) throws Exception { + return new Tuple2<>(ONE, longTuple2._2() + longTuple22._2()); + } + })._2(); + return count; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 7c88372..644f73f 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.Logger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -38,11 +39,16 @@ public class SparkExecutable extends AbstractExecutable { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SparkExecutable.class); private static final String CLASS_NAME = "className"; + private static final String JARS = "jars"; public void setClassName(String className) { this.setParam(CLASS_NAME, className); } + public void setJars(String jars) { + this.setParam(JARS, jars); + } + private String formatArgs() { StringBuilder stringBuilder = new StringBuilder(); for (Map.Entry<String, String> entry : getParams().entrySet()) { @@ -50,6 +56,9 @@ public class SparkExecutable extends AbstractExecutable { tmp.append("-").append(entry.getKey()).append(" ").append(entry.getValue()).append(" "); if (entry.getKey().equals(CLASS_NAME)) { stringBuilder.insert(0, tmp); + } else if (entry.getKey().equals(JARS)) { + // JARS is for spark-submit, not for app + continue; } else { stringBuilder.append(tmp); } @@ -65,12 +74,22 @@ public class SparkExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { final KylinConfig config = context.getConfig(); Preconditions.checkNotNull(config.getSparkHome()); - Preconditions.checkNotNull(config.getSparkMaster()); + Preconditions.checkNotNull(config.getKylinJobJarPath()); + String sparkConf = config.getSparkConfFile(); + String jars = this.getParam(JARS); + + String jobJar = config.getKylinJobJarPath(); + + if (StringUtils.isEmpty(jars)) { + jars = jobJar; + } + try { - String cmd = String.format("%s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --master %s %s %s", config.getSparkHome(), config.getSparkMaster(), config.getKylinSparkJobJarPath(), formatArgs()); + String cmd = String.format("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class \"org.apache.kylin.common.util.SparkEntry\" --properties-file %s --jars %s %s %s", config.getSparkHadoopConfDir(), config.getSparkHome(), sparkConf, jars, jobJar, formatArgs()); logger.info("cmd:" + cmd); final StringBuilder output = new StringBuilder(); - config.getCliCommandExecutor().execute(cmd, new Logger() { + CliCommandExecutor exec = new CliCommandExecutor(); + exec.execute(cmd, new Logger() { @Override public void log(String message) { output.append(message); @@ -84,4 +103,5 @@ public class SparkExecutable extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java index 4f407ff..793cd87 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java @@ -38,7 +38,7 @@ import scala.Tuple2; */ public final class DefaultTupleConverter implements TupleConverter { - private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>(); + private final static transient ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>(); private final CubeSegment segment; private final int measureCount; private final Map<TblColRef, Integer> columnLengthMap; http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/examples/test_case_data/sandbox/kylin-spark-conf.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin-spark-conf.properties b/examples/test_case_data/sandbox/kylin-spark-conf.properties new file mode 100644 index 0000000..b4a0c97 --- /dev/null +++ b/examples/test_case_data/sandbox/kylin-spark-conf.properties @@ -0,0 +1,28 @@ +spark.yarn.submit.file.replication=1 +spark.yarn.executor.memoryOverhead=200 +spark.yarn.driver.memoryOverhead=384 +#spark.master=local[4] +#spark.submit.deployMode=client +spark.master=yarn +spark.submit.deployMode=cluster +spark.eventLog.enabled=true +spark.yarn.scheduler.heartbeat.interval-ms=5000 +spark.yarn.preserve.staging.files=true +spark.yarn.queue=default +spark.yarn.containerLauncherMaxThreads=25 +spark.yarn.max.executor.failures=3 +spark.eventLog.dir=hdfs\:///spark-history +spark.history.kerberos.enabled=true +spark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider +spark.history.ui.port=18080 +spark.history.fs.logDirectory=hdfs\:///spark-history +spark.executor.memory=1G +spark.storage.memoryFraction=0.3 +spark.executor.cores=1 +spark.executor.instances=1 +spark.history.kerberos.keytab=none +spark.history.kerberos.principal=none +spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/apps/spark/spark-assembly-1.6.3-hadoop2.6.0.jar +spark.driver.extraJavaOptions=-Dhdp.version=current +spark.yarn.am.extraJavaOptions=-Dhdp.version=current +spark.executor.extraJavaOptions=-Dhdp.version=current http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/examples/test_case_data/sandbox/kylin.properties ---------------------------------------------------------------------- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 1724619..a011911 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -79,6 +79,8 @@ kylin.job.remote-cli-working-dir=/tmp/kylin # Max count of concurrent jobs running kylin.job.max-concurrent-jobs=10 +kylin.source.hive.redistribute-flat-table=false + # Time interval to check hadoop job status kylin.engine.mr.yarn-check-interval-seconds=10 @@ -154,3 +156,11 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 3600 # Env DEV|QA|PROD kylin.env=DEV +kylin.source.hive.keep-flat-table=true + +### Spark as Engine ### +#kylin.engine.spark.env.hadoop-conf-dir=/etc/hadoop/conf +kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox +kylin.engine.spark.spark-home=/usr/local/spark +kylin.engine.spark.properties-file=../examples/test_case_data/sandbox/kylin-spark-conf.properties +kylin.engine.spark.sanity-check-enabled=false \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0b323cf..0c9a8cb 100644 --- a/pom.xml +++ b/pom.xml @@ -84,7 +84,7 @@ <commons-math3.version>3.6.1</commons-math3.version> <!-- Spark --> - <spark.version>1.3.0</spark.version> + <spark.version>1.6.3</spark.version> <!-- Utility --> <log4j.version>1.2.17</log4j.version> http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 978f477..f5b9f7e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -327,10 +327,10 @@ public class CubeController extends BasicController { throw new InternalErrorException("Cannot find cube '" + cubeName + "'"); } - if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) { - int num = cube.getBuildingSegments().size(); - throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first.")); - } +// if (cube.getSegments() != null && cube.getBuildingSegments().size() > 0) { +// int num = cube.getBuildingSegments().size(); +// throw new InternalErrorException("Cannot purge cube '" + cubeName + "' as there is " + num + " building " + (num > 1 ? "segment(s)." : "segment. Discard the related job first.")); +// } return cubeService.purgeCube(cube); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/server/pom.xml ---------------------------------------------------------------------- diff --git a/server/pom.xml b/server/pom.xml index 981d64c..1d6bdc9 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -249,6 +249,13 @@ <scope>provided</scope> </dependency> + <!-- Spark dependency --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.10</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index fc2b982..763b537 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import com.google.common.collect.Sets; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,8 +57,6 @@ import org.apache.kylin.metadata.model.TableDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Sets; - public class HiveMRInput implements IMRInput { public static String getTableNameForHCat(TableDesc table) { @@ -74,7 +73,7 @@ public class HiveMRInput implements IMRInput { public IMRTableInputFormat getTableInputFormat(TableDesc table) { return new HiveTableInputFormat(getTableNameForHCat(table)); } - + @Override public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) { return new IMRBatchMergeInputSide() { http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java index f0ad6e0..9b1a00d 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java @@ -31,7 +31,6 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnDesc; import org.apache.kylin.measure.BufferedMeasureCodec; import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.datatype.LongMutable; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.junit.After; @@ -60,8 +59,8 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { BigDecimal sum = new BigDecimal("333.1234567"); BigDecimal min = new BigDecimal("333.1111111"); BigDecimal max = new BigDecimal("333.1999999"); - LongMutable count = new LongMutable(2); - LongMutable item_count = new LongMutable(100); + Long count = new Long(2); + Long item_count = new Long(100); ByteBuffer buf = codec.encode(new Object[] { sum, min, max, count, item_count }); buf.flip(); @@ -91,7 +90,7 @@ public class RowValueDecoderTest extends LocalFileMetadataTestCase { BigDecimal min = new BigDecimal("333.1111111"); BigDecimal max = new BigDecimal("333.1999999"); LongWritable count = new LongWritable(2); - LongMutable item_count = new LongMutable(100); + Long item_count = new Long(100); codec.encode(new Object[] { sum, min, max, count, item_count }); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a8bc054/tool/pom.xml ---------------------------------------------------------------------- diff --git a/tool/pom.xml b/tool/pom.xml index 8dba3f7..ccd546a 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -40,6 +40,10 @@ </dependency> <dependency> <groupId>org.apache.kylin</groupId> + <artifactId>kylin-engine-spark</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> <artifactId>kylin-source-kafka</artifactId> </dependency> <dependency>