KYLIN-1966 futher refactor, decouple flat table and cube
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/29ba46be Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/29ba46be Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/29ba46be Branch: refs/heads/1.5.x-CDH5.7 Commit: 29ba46beee3c9330c54a23265fdc183333f750d8 Parents: 1b34c38 Author: Yang Li <liy...@apache.org> Authored: Mon Aug 22 08:37:12 2016 +0800 Committer: Yang Li <liy...@apache.org> Committed: Mon Aug 22 08:37:12 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeSegment.java | 8 +- .../inmemcubing/AbstractInMemCubeBuilder.java | 7 +- .../cube/inmemcubing/DoggedCubeBuilder.java | 7 +- .../cube/inmemcubing/InMemCubeBuilder.java | 7 +- .../InMemCubeBuilderInputConverter.java | 15 +- .../org/apache/kylin/cube/model/CubeDesc.java | 3 +- .../cube/model/CubeJoinedFlatTableDesc.java | 42 +----- .../cube/model/CubeJoinedFlatTableEnrich.java | 140 +++++++++++++++++++ .../org/apache/kylin/cube/util/CubingUtils.java | 7 +- .../org/apache/kylin/engine/EngineFactory.java | 12 ++ .../apache/kylin/engine/IBatchCubingEngine.java | 8 ++ .../apache/kylin/job/JoinedFlatTableTest.java | 13 +- .../metadata/model/IJoinedFlatTableDesc.java | 2 + .../kylin/engine/mr/MRBatchCubingEngine.java | 13 ++ .../kylin/engine/mr/MRBatchCubingEngine2.java | 13 ++ .../java/org/apache/kylin/engine/mr/MRUtil.java | 5 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 7 +- .../mr/steps/FactDistinctColumnsMapperBase.java | 7 +- .../engine/mr/steps/InMemCuboidMapper.java | 5 +- .../engine/spark/SparkBatchCubingEngine.java | 12 ++ .../apache/kylin/engine/spark/SparkCubing.java | 29 ++-- .../engine/spark/SparkCubingJobBuilder.java | 5 +- engine-streaming/pom.xml | 4 + .../streaming/cube/StreamingCubeBuilder.java | 9 +- .../ITDoggedCubeBuilderStressTest.java | 5 +- .../inmemcubing/ITDoggedCubeBuilderTest.java | 7 +- .../inmemcubing/ITInMemCubeBuilderTest.java | 19 +-- .../kylin/rest/controller/CubeController.java | 5 +- .../kylin/source/kafka/KafkaStreamingInput.java | 4 +- 29 files changed, 310 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 0797ab3..aaa88f1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -35,18 +35,16 @@ import org.apache.kylin.common.util.ShardingHash; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.metadata.model.IBuildable; -import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.IRealization; import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.JsonBackReference; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -530,10 +528,6 @@ public class CubeSegment implements Comparable<CubeSegment>, IBuildable { return cubeInstance; } - public IJoinedFlatTableDesc getJoinedFlatTableDesc() { - return new CubeJoinedFlatTableDesc(this); - } - public String getIndexPath() { return indexPath; } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java index d417d11..0bfaab3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java @@ -28,6 +28,7 @@ import org.apache.kylin.gridtable.GTScanRequest; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.GridTable; import org.apache.kylin.gridtable.IGTScanner; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,18 +40,22 @@ abstract public class AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class); + final protected IJoinedFlatTableDesc flatDesc; final protected CubeDesc cubeDesc; final protected Map<TblColRef, Dictionary<String>> dictionaryMap; protected int taskThreadCount = 4; protected int reserveMemoryMB = 100; - public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + public AbstractInMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + if (flatDesc == null) + throw new NullPointerException(); if (cubeDesc == null) throw new NullPointerException(); if (dictionaryMap == null) throw new IllegalArgumentException("dictionary cannot be null"); + this.flatDesc = flatDesc; this.cubeDesc = cubeDesc; this.dictionaryMap = dictionaryMap; } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index 15f2241..69f1f82 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -38,6 +38,7 @@ import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.gridtable.GTScanRequestBuilder; import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.measure.MeasureAggregators; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +55,8 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { private int splitRowThreshold = Integer.MAX_VALUE; private int unitRows = 1000; - public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - super(cubeDesc, dictionaryMap); + public DoggedCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + super(cubeDesc, flatDesc, dictionaryMap); // check memory more often if a single row is big if (cubeDesc.hasMemoryHungryMeasures()) @@ -270,7 +271,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { RuntimeException exception; public SplitThread() { - this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap); + this.builder = new InMemCubeBuilder(cubeDesc, flatDesc, dictionaryMap); this.builder.setConcurrentThreads(taskThreadCount); this.builder.setReserveMemoryMB(reserveMemoryMB); } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java index 36d1296..e4908b8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java @@ -48,6 +48,7 @@ import org.apache.kylin.gridtable.IGTScanner; import org.apache.kylin.measure.topn.Counter; import org.apache.kylin.measure.topn.TopNCounter; import org.apache.kylin.metadata.datatype.DoubleMutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -86,8 +87,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { private Object[] totalSumForSanityCheck; private ICuboidCollector resultCollector; - public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { - super(cubeDesc, dictionaryMap); + public InMemCubeBuilder(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + super(cubeDesc, flatDesc, dictionaryMap); this.cuboidScheduler = new CuboidScheduler(cubeDesc); this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); this.totalCuboidCount = cuboidScheduler.getCuboidCount(); @@ -514,7 +515,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder { this.info = info; this.input = input; this.record = new GTRecord(info); - this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info); + this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, flatDesc, dictionaryMap, info); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index 607f6bb..ab44f63 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -23,11 +23,12 @@ import java.util.Map; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.gridtable.GTInfo; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -40,7 +41,7 @@ public class InMemCubeBuilderInputConverter { public static final byte[] HIVE_NULL = Bytes.toBytes("\\N"); - private final CubeJoinedFlatTableDesc intermediateTableDesc; + private final CubeJoinedFlatTableEnrich flatDesc; private final MeasureDesc[] measureDescs; private final MeasureIngester<?>[] measureIngesters; private final int measureCount; @@ -48,9 +49,9 @@ public class InMemCubeBuilderInputConverter { private final GTInfo gtInfo; protected List<byte[]> nullBytes; - public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) { + public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) { this.gtInfo = gtInfo; - this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc); + this.flatDesc = new CubeJoinedFlatTableEnrich(flatDesc, cubeDesc); this.measureCount = cubeDesc.getMeasures().size(); this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]); this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures()); @@ -74,11 +75,11 @@ public class InMemCubeBuilderInputConverter { } private Object[] buildKey(List<String> row) { - int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length; + int keySize = flatDesc.getRowKeyColumnIndexes().length; Object[] key = new Object[keySize]; for (int i = 0; i < keySize; i++) { - key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]); + key[i] = row.get(flatDesc.getRowKeyColumnIndexes()[i]); if (key[i] != null && isNull(Bytes.toBytes((String) key[i]))) { key[i] = null; } @@ -98,7 +99,7 @@ public class InMemCubeBuilderInputConverter { private Object buildValueOf(int idxOfMeasure, List<String> row) { MeasureDesc measure = measureDescs[idxOfMeasure]; FunctionDesc function = measure.getFunction(); - int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure]; + int[] colIdxOnFlatTable = flatDesc.getMeasureColumnIndexes()[idxOfMeasure]; int paramCount = function.getParameterCount(); String[] inputToMeasure = new String[paramCount]; http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 2c83972..30290cd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -78,7 +78,7 @@ import com.google.common.collect.Sets; */ @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE) -public class CubeDesc extends RootPersistentEntity { +public class CubeDesc extends RootPersistentEntity implements IEngineAware { private static final Logger logger = LoggerFactory.getLogger(CubeDesc.class); public static class CannotFilterExtendedColumnException extends RuntimeException { @@ -1004,6 +1004,7 @@ public class CubeDesc extends RootPersistentEntity { this.storageType = storageType; } + @Override public int getEngineType() { return engineType; } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 1f9d772..6aeb617 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.kylin.common.util.BytesSplitter; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; @@ -42,8 +41,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { private final CubeSegment cubeSegment; private int columnCount; - private int[] rowKeyColumnIndexes; // the column index on flat table - private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table private List<TblColRef> columnList = Lists.newArrayList(); private Map<TblColRef, Integer> columnIndexMap; @@ -65,9 +62,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { // check what columns from hive tables are required, and index them private void parseCubeDesc() { - long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); - Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - if (cubeSegment == null) { this.tableName = "kylin_intermediate_" + cubeDesc.getName(); } else { @@ -81,34 +75,15 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { columnIndex++; } - // build index for rowkey columns - List<TblColRef> cuboidColumns = baseCuboid.getColumns(); - int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length; - rowKeyColumnIndexes = new int[rowkeyColCount]; - for (int i = 0; i < rowkeyColCount; i++) { - TblColRef col = cuboidColumns.get(i); - Integer dimIdx = columnIndexMap.get(col); - if (dimIdx == null) { - throw new RuntimeException("Can't find column " + col); - } - rowKeyColumnIndexes[i] = dimIdx; - } - List<MeasureDesc> measures = cubeDesc.getMeasures(); int measureSize = measures.size(); - measureColumnIndexes = new int[measureSize][]; for (int i = 0; i < measureSize; i++) { FunctionDesc func = measures.get(i).getFunction(); List<TblColRef> colRefs = func.getParameter().getColRefs(); - if (colRefs == null) { - measureColumnIndexes[i] = null; - } else { - measureColumnIndexes[i] = new int[colRefs.size()]; + if (colRefs != null) { for (int j = 0; j < colRefs.size(); j++) { TblColRef c = colRefs.get(j); - measureColumnIndexes[i][j] = columnList.indexOf(c); - if (measureColumnIndexes[i][j] < 0) { - measureColumnIndexes[i][j] = columnIndex; + if (columnList.indexOf(c) < 0) { columnIndexMap.put(c, columnIndex); columnList.add(c); columnIndex++; @@ -148,18 +123,6 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { // TODO: check data types here } - public CubeDesc getCubeDesc() { - return cubeDesc; - } - - public int[] getRowKeyColumnIndexes() { - return rowKeyColumnIndexes; - } - - public int[][] getMeasureColumnIndexes() { - return measureColumnIndexes; - } - @Override public String getTableName() { return tableName; @@ -175,6 +138,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { return cubeDesc.getModel(); } + @Override public int getColumnIndex(TblColRef colRef) { Integer index = columnIndexMap.get(colRef); if (index == null) http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java new file mode 100644 index 0000000..5212859 --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.model; + +import java.util.List; + +import org.apache.kylin.common.util.BytesSplitter; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.metadata.model.DataModelDesc; +import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.MeasureDesc; +import org.apache.kylin.metadata.model.TblColRef; + +/** + * An enrich of IJoinedFlatTableDesc for cubes + */ +public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { + + private CubeDesc cubeDesc; + private IJoinedFlatTableDesc flatDesc; + private int[] rowKeyColumnIndexes; // the column index on flat table + private int[][] measureColumnIndexes; // [i] is the i.th measure related column index on flat table + + public CubeJoinedFlatTableEnrich(IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { + // != works due to object cache + if (cubeDesc.getModel() != flatDesc.getDataModel()) + throw new IllegalArgumentException(); + + this.cubeDesc = cubeDesc; + this.flatDesc = flatDesc; + parseCubeDesc(); + } + + // check what columns from hive tables are required, and index them + private void parseCubeDesc() { + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + + // build index for rowkey columns + List<TblColRef> cuboidColumns = baseCuboid.getColumns(); + int rowkeyColCount = cubeDesc.getRowkey().getRowKeyColumns().length; + rowKeyColumnIndexes = new int[rowkeyColCount]; + for (int i = 0; i < rowkeyColCount; i++) { + TblColRef col = cuboidColumns.get(i); + rowKeyColumnIndexes[i] = flatDesc.getColumnIndex(col); + } + + List<MeasureDesc> measures = cubeDesc.getMeasures(); + int measureSize = measures.size(); + measureColumnIndexes = new int[measureSize][]; + for (int i = 0; i < measureSize; i++) { + FunctionDesc func = measures.get(i).getFunction(); + List<TblColRef> colRefs = func.getParameter().getColRefs(); + if (colRefs == null) { + measureColumnIndexes[i] = null; + } else { + measureColumnIndexes[i] = new int[colRefs.size()]; + for (int j = 0; j < colRefs.size(); j++) { + TblColRef c = colRefs.get(j); + measureColumnIndexes[i][j] = flatDesc.getColumnIndex(c); + } + } + } + } + + // sanity check the input record (in bytes) matches what's expected + public void sanityCheck(BytesSplitter bytesSplitter) { + int columnCount = flatDesc.getAllColumns().size(); + if (columnCount != bytesSplitter.getBufferSize()) { + throw new IllegalArgumentException("Expect " + columnCount + " columns, but see " + bytesSplitter.getBufferSize() + " -- " + bytesSplitter); + } + + // TODO: check data types here + } + + public CubeDesc getCubeDesc() { + return cubeDesc; + } + + public int[] getRowKeyColumnIndexes() { + return rowKeyColumnIndexes; + } + + public int[][] getMeasureColumnIndexes() { + return measureColumnIndexes; + } + + @Override + public String getTableName() { + return flatDesc.getTableName(); + } + + @Override + public List<TblColRef> getAllColumns() { + return flatDesc.getAllColumns(); + } + + @Override + public DataModelDesc getDataModel() { + return flatDesc.getDataModel(); + } + + @Override + public int getColumnIndex(TblColRef colRef) { + return flatDesc.getColumnIndex(colRef); + } + + @Override + public long getSourceOffsetStart() { + return flatDesc.getSourceOffsetStart(); + } + + @Override + public long getSourceOffsetEnd() { + return flatDesc.getSourceOffsetEnd(); + } + + @Override + public TblColRef getDistributedBy() { + return flatDesc.getDistributedBy(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java index b7f79e1..aa4610f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java @@ -48,12 +48,13 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; import org.slf4j.Logger; @@ -74,8 +75,8 @@ public class CubingUtils { private static Logger logger = LoggerFactory.getLogger(CubingUtils.class); - public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, Iterable<List<String>> streams) { - CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(cubeDesc); + public static Map<Long, HyperLogLogPlusCounter> sampling(CubeDesc cubeDesc, IJoinedFlatTableDesc flatDescIn, Iterable<List<String>> streams) { + final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(flatDescIn, cubeDesc); final int rowkeyLength = cubeDesc.getRowkey().getRowKeyColumns().length; final List<Long> allCuboidIds = new CuboidScheduler(cubeDesc).getAllCuboidIds(); final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java index 79d2f81..7044a3e 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java +++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java @@ -23,8 +23,10 @@ import java.util.Map; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.ImplementationSwitch; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IEngineAware; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class EngineFactory { @@ -46,6 +48,16 @@ public class EngineFactory { return streamingEngines.get(aware.getEngineType()); } + /** Mark deprecated to indicate for test purpose only */ + @Deprecated + public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { + return batchEngine(cubeDesc).getJoinedFlatTableDesc(cubeDesc); + } + + public static IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { + return batchEngine(newSegment).getJoinedFlatTableDesc(newSegment); + } + /** Build a new cube segment, typically its time range appends to the end of current cube. */ public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return batchEngine(newSegment).createBatchCubingJob(newSegment, submitter); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java index 556893c..754dbde 100644 --- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java +++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java @@ -19,9 +19,17 @@ package org.apache.kylin.engine; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public interface IBatchCubingEngine { + + /** Mark deprecated to indicate for test purpose only */ + @Deprecated + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc); + + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment); /** Build a new cube segment, typically its time range appends to the end of current cube. */ public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java index 17a7178..0faf22a 100644 --- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java @@ -27,8 +27,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -42,7 +43,7 @@ import org.junit.Test; public class JoinedFlatTableTest extends LocalFileMetadataTestCase { CubeInstance cube = null; - CubeJoinedFlatTableDesc intermediateTableDesc = null; + IJoinedFlatTableDesc flatTableDesc = null; String fakeJobUUID = "abc-def"; CubeSegment cubeSegment = null; @@ -51,7 +52,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { this.createTestMetadata(); cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready"); cubeSegment = cube.getSegments().get(0); - intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); } @After @@ -61,7 +62,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenCreateTableDDL() { - String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp"); + String ddl = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, "/tmp"); System.out.println(ddl); System.out.println("The length for the ddl is " + ddl.length()); @@ -69,14 +70,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase { @Test public void testGenDropTableDDL() { - String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc); + String ddl = JoinedFlatTable.generateDropTableStatement(flatTableDesc); System.out.println(ddl); assertEquals(101, ddl.length()); } @Test public void testGenerateInsertSql() throws IOException { - String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); + String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv())); System.out.println(sqls); int length = sqls.length(); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index 55ea71f..f3a4107 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -29,6 +29,8 @@ public interface IJoinedFlatTableDesc { DataModelDesc getDataModel(); List<TblColRef> getAllColumns(); + + int getColumnIndex(TblColRef colRef); long getSourceOffsetStart(); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java index 5198db1..681c545 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java @@ -19,12 +19,25 @@ package org.apache.kylin.engine.mr; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.IBatchCubingEngine; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class MRBatchCubingEngine implements IBatchCubingEngine { @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { + return new CubeJoinedFlatTableDesc(cubeDesc); + } + + @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { + return new CubeJoinedFlatTableDesc(newSegment); + } + + @Override public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return new BatchCubingJobBuilder(newSegment, submitter).build(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java index b3af7d7..d9fdcb9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java @@ -19,12 +19,25 @@ package org.apache.kylin.engine.mr; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.IBatchCubingEngine; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; public class MRBatchCubingEngine2 implements IBatchCubingEngine { @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { + return new CubeJoinedFlatTableDesc(cubeDesc); + } + + @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { + return new CubeJoinedFlatTableDesc(newSegment); + } + + @Override public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) { return new BatchCubingJobBuilder2(newSegment, submitter).build(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java index 877358b..2c3b77f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java @@ -23,6 +23,7 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide; @@ -30,6 +31,7 @@ import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2; import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.storage.StorageFactory; @@ -37,7 +39,8 @@ import org.apache.kylin.storage.StorageFactory; public class MRUtil { public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) { - return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg.getJoinedFlatTableDesc()); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); + return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc); } public static IMRTableInputFormat getTableInputFormat(String tableName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java index 4786505..588b087 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java @@ -37,7 +37,8 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.kv.AbstractRowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; @@ -65,7 +66,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL protected CubeDesc cubeDesc; protected CubeSegment cubeSegment; protected List<byte[]> nullBytes; - protected CubeJoinedFlatTableDesc intermediateTableDesc; + protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected String intermediateTableRowDelimiter; protected byte byteRowDelimiter; protected int counter; @@ -102,7 +103,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); - intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); bytesSplitter = new BytesSplitter(200, 16384); rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java index 20259cb..3fa966d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java @@ -30,7 +30,8 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.MRUtil; @@ -54,7 +55,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K protected Text outputValue = new Text(); protected int errorRecordCounter = 0; - protected CubeJoinedFlatTableDesc intermediateTableDesc; + protected CubeJoinedFlatTableEnrich intermediateTableDesc; protected int[] dictionaryColumnIndex; @Override @@ -72,7 +73,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat(); - intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc); + intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc); dictionaryColumnIndex = new int[factDictCols.size()]; for (int i = 0; i < factDictCols.size(); i++) { TblColRef colRef = factDictCols.get(i); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java index 24c37ce..1d90d01 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java @@ -39,12 +39,14 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.ByteArrayWritable; import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat; import org.apache.kylin.engine.mr.KylinMapper; import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +81,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID); cubeSegment = cube.getSegmentById(segmentID); flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat(); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap(); @@ -92,7 +95,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr dictionaryMap.put(col, cubeSegment.getDictionary(col)); } - DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); cubeBuilder.setReserveMemoryMB(calculateReserveMB(context.getConfiguration())); ExecutorService executorService = Executors.newSingleThreadExecutor(); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java index 4eec233..08ed207 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngine.java @@ -18,8 +18,10 @@ package org.apache.kylin.engine.spark; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.IBatchCubingEngine; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; /** */ @@ -52,4 +54,14 @@ public class SparkBatchCubingEngine implements IBatchCubingEngine { public Class<?> getStorageInterface() { return null; } + + @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeDesc cubeDesc) { + throw new UnsupportedOperationException(); + } + + @Override + public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index 3ccbcc8..5c2def2 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -69,12 +69,13 @@ import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder; import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder; import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.cube.model.RowKeyDesc; import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; @@ -84,6 +85,7 @@ import org.apache.kylin.measure.BufferedMeasureEncoder; import org.apache.kylin.measure.MeasureAggregators; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; @@ -175,9 +177,10 @@ public class SparkCubing extends AbstractApplication { final CubeManager cubeManager = CubeManager.getInstance(kylinConfig); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); final String[] columns = intermediateTable.columns(); + final CubeSegment seg = cubeInstance.getSegmentById(segmentId); final CubeDesc cubeDesc = cubeInstance.getDescriptor(); final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap(); - final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc); + final CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(seg), cubeDesc); final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns(); final long start = System.currentTimeMillis(); final RowKeyDesc rowKey = cubeDesc.getRowkey(); @@ -186,7 +189,7 @@ public class SparkCubing extends AbstractApplication { if (!rowKey.isUseDictionary(col)) { continue; } - final int rowKeyColumnIndex = flatTableDesc.getRowKeyColumnIndexes()[i]; + final int rowKeyColumnIndex = flatDesc.getRowKeyColumnIndexes()[i]; tblColRefMap.put(rowKeyColumnIndex, col); } @@ -228,18 +231,19 @@ public class SparkCubing extends AbstractApplication { }))); } final long end = System.currentTimeMillis(); - CubingUtils.writeDictionary(cubeInstance.getSegmentById(segmentId), dictionaryMap, start, end); + CubingUtils.writeDictionary(seg, dictionaryMap, start, end); try { CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance); - cubeBuilder.setToUpdateSegs(cubeInstance.getSegmentById(segmentId)); + cubeBuilder.setToUpdateSegs(seg); cubeManager.updateCube(cubeBuilder); } catch (IOException e) { throw new RuntimeException("Failed to deal with the request: " + e.getLocalizedMessage()); } } - private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName) throws Exception { + private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName, String segmentId) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName); + CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId); CubeDesc cubeDesc = cubeInstance.getDescriptor(); CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc); List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds(); @@ -248,9 +252,9 @@ public class SparkCubing extends AbstractApplication { zeroValue.put(id, new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision())); } - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc); + CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc); - final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes(); + final int[] rowKeyColumnIndexes = flatDesc.getRowKeyColumnIndexes(); final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size()); @@ -318,9 +322,7 @@ public class SparkCubing extends AbstractApplication { return samplingResult; } - /* - return hfile location - */ + /** return hfile location */ private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception { CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName); CubeDesc cubeDesc = cubeInstance.getDescriptor(); @@ -365,7 +367,8 @@ public class SparkCubing extends AbstractApplication { LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue(); System.out.println("load properties finished"); - AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); + AbstractInMemCubeBuilder inMemCubeBuilder = new DoggedCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeInstance.getSegmentById(segmentId), columnLengthMap)); Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter)); try { @@ -611,7 +614,7 @@ public class SparkCubing extends AbstractApplication { } }); - final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName); + final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName, segmentId); final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult); final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java index 05246f4..edd9460 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingJobBuilder.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.IMROutput2; @@ -48,11 +49,11 @@ public class SparkCubingJobBuilder extends JobBuilderSupport { } public DefaultChainedExecutable build() { - final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config); + final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); final String jobId = result.getId(); inputSide.addStepPhase1_CreateFlatTable(result); - final IJoinedFlatTableDesc joinedFlatTableDesc = seg.getJoinedFlatTableDesc(); + final IJoinedFlatTableDesc joinedFlatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final String tableName = joinedFlatTableDesc.getTableName(); logger.info("intermediate table:" + tableName); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/pom.xml ---------------------------------------------------------------------- diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml index e25a133..095871c 100644 --- a/engine-streaming/pom.xml +++ b/engine-streaming/pom.xml @@ -45,6 +45,10 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-core-storage</artifactId> </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-job</artifactId> + </dependency> <!-- Env & Test --> <dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java ---------------------------------------------------------------------- diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java index 465a983..180f0b8 100644 --- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java +++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java @@ -55,9 +55,11 @@ import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.inmemcubing.ICuboidWriter; import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder; import org.apache.kylin.cube.util.CubingUtils; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.streaming.StreamingBatchBuilder; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.metadata.model.IBuildable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -84,8 +86,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { try { CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); + final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor()); + LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(); - InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap); + InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), flatDesc, dictionaryMap); final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, cuboidWriter)); processedRowCount = streamingBatch.getMessages().size(); for (StreamingMessage streamingMessage : streamingBatch.getMessages()) { @@ -129,9 +133,10 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder { public Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch) { final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName); + final IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cubeInstance.getDescriptor()); long start = System.currentTimeMillis(); - final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { + final Map<Long, HyperLogLogPlusCounter> samplingResult = CubingUtils.sampling(cubeInstance.getDescriptor(), flatDesc, Lists.transform(streamingBatch.getMessages(), new Function<StreamingMessage, List<String>>() { @Nullable @Override public List<String> apply(@Nullable StreamingMessage input) { http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java index ef69d57..4d23979 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderStressTest.java @@ -31,7 +31,9 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -79,7 +81,8 @@ public class ITDoggedCubeBuilderStressTest extends LocalFileMetadataTestCase { ExecutorService executorService = Executors.newSingleThreadExecutor(); long randSeed = System.currentTimeMillis(); - DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); + DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); doggedBuilder.setConcurrentThreads(THREADS); { http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java index 8923744..8827dff 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java @@ -40,7 +40,9 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -87,7 +89,8 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { ExecutorService executorService = Executors.newSingleThreadExecutor(); long randSeed = System.currentTimeMillis(); - DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); + DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); doggedBuilder.setConcurrentThreads(THREADS); doggedBuilder.setSplitRowThreshold(SPLIT_ROWS); FileRecordWriter doggedResult = new FileRecordWriter(); @@ -99,7 +102,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { doggedResult.close(); } - InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + InMemCubeBuilder inmemBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); inmemBuilder.setConcurrentThreads(THREADS); FileRecordWriter inmemResult = new FileRecordWriter(); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java index 87b222e..6612375 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java @@ -42,11 +42,13 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.gridtable.GTRecord; import org.apache.kylin.metadata.model.FunctionDesc; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; import org.junit.After; @@ -109,7 +111,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { private void testBuildInner() throws Exception { - InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), dictionaryMap); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); + InMemCubeBuilder cubeBuilder = new InMemCubeBuilder(cube.getDescriptor(), flatDesc, dictionaryMap); //DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap); cubeBuilder.setConcurrentThreads(nThreads); @@ -149,8 +152,8 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { } static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<List<String>> queue, int count, long randSeed) throws IOException, InterruptedException { - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor()); - int nColumns = flatTableDesc.getAllColumns().size(); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); + int nColumns = flatDesc.getAllColumns().size(); @SuppressWarnings("unchecked") Set<String>[] distinctSets = new Set[nColumns]; @@ -190,15 +193,15 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException { Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); CubeDesc desc = cube.getDescriptor(); - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc); - int nColumns = flatTableDesc.getAllColumns().size(); + CubeJoinedFlatTableEnrich flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(desc), desc); + int nColumns = flatDesc.getAllColumns().size(); List<TblColRef> columns = Cuboid.getBaseCuboid(desc).getColumns(); for (int c = 0; c < columns.size(); c++) { TblColRef col = columns.get(c); if (desc.getRowkey().isUseDictionary(col)) { logger.info("Building dictionary for " + col); - List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]); + List<byte[]> valueList = readValueList(flatTable, nColumns, flatDesc.getRowKeyColumnIndexes()[c]); Dictionary<String> dict = DictionaryGenerator.buildDictionary(col.getType(), new IterableDictionaryValueEnumerator(valueList)); result.put(col, dict); } @@ -211,7 +214,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { if (dictCols.isEmpty()) continue; - int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; + int[] flatTableIdx = flatDesc.getMeasureColumnIndexes()[measureIdx]; List<TblColRef> paramCols = func.getParameter().getColRefs(); for (int i = 0; i < paramCols.size(); i++) { TblColRef col = paramCols.get(i); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 4e56f74..57b0965 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -34,11 +34,12 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.dimension.DimensionEncodingFactory; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.exception.BadRequestException; @@ -135,7 +136,7 @@ public class CubeController extends BasicController { public GeneralResponse getSql(@PathVariable String cubeName, @PathVariable String segmentName) { CubeInstance cube = cubeService.getCubeManager().getCube(cubeName); CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.READY); - CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeSegment); + IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(cubeSegment); String sql = JoinedFlatTable.generateSelectDataStatement(flatTableDesc); GeneralResponse repsonse = new GeneralResponse(); http://git-wip-us.apache.org/repos/asf/kylin/blob/29ba46be/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index e055d9e..c3bdb75 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -46,7 +46,7 @@ import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.streaming.IStreamingInput; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; @@ -90,7 +90,7 @@ public class KafkaStreamingInput implements IStreamingInput { try { final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming); - List<TblColRef> columns = new CubeJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns(); + List<TblColRef> columns = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()).getAllColumns(); final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); final ExecutorService executorService = Executors.newCachedThreadPool();